Re: [External Email] Re: [Spark Core]: What's difference among spark.shuffle.io.threads

2023-08-19 Thread Nebi Aydin
Here's the executor logs
```

java.io.IOException: Connection from
ip-172-31-16-143.ec2.internal/172.31.16.143:7337 closed
at 
org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
at 
org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at 
io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at 
org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
2023-08-19 04:33:53,429 INFO shuffle.RetryingBlockFetcher: Retrying
fetch (1/10) for 757 outstanding blocks after 6 ms
```
And within node manager logs from the failing host I got these logs below

```

2023-08-19 07:38:38,498 ERROR
org.apache.spark.network.server.ChunkFetchRequestHandler
(shuffle-server-4-59): Error sending result
ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=279757106070,chunkIndex=642],buffer=FileSegmentManagedBuffer[file=/mnt2/yarn/usercache/zeppelin/appcache/application_1691862880080_0016/blockmgr-36010488-99a9-4780-b65f-40e0f2f8f150/37/shuffle_6_784261_0.data,offset=2856408,length=338]]
to /172.31.23.144:35102; closing connection
java.nio.channels.ClosedChannelException

```


Also here's my configurations

[image: Screenshot 2023-08-19 at 8.47.08 AM.png]


On Sat, Aug 19, 2023 at 4:36 AM Mich Talebzadeh 
wrote:

> That error message *FetchFailedException: Failed to connect to
>  on port 7337 *happens when a task running on one executor
> node tries to fetch data from another executor node but fails to establish
> a connection to the specified port (7337 in this case). In a nutshell it is
> performing network IO among your executors.
>
> Check the following:
>
> - Any network issue or connectivity problems anong nodes that your
> executors are running on
> - any executor failure causing this error. Check the executor logs
> - Concurrency and Thread Issues: If there are too many concurrent
> connections or thread limitations,
>   it could result in failed connections. *Adjust
> spark.shuffle.io.clientThreads*
> - It might be prudent to do the same to *spark.shuffle.io.server.Threads*
> - Check how stable your environment is. Observe any issues reported in
> Spark UI
>
> HTH
>
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  

Re: [External Email] Re: [Spark Core]: What's difference among spark.shuffle.io.threads

2023-08-19 Thread Mich Talebzadeh
That error message *FetchFailedException: Failed to connect to
 on port 7337 *happens when a task running on one executor
node tries to fetch data from another executor node but fails to establish
a connection to the specified port (7337 in this case). In a nutshell it is
performing network IO among your executors.

Check the following:

- Any network issue or connectivity problems anong nodes that your
executors are running on
- any executor failure causing this error. Check the executor logs
- Concurrency and Thread Issues: If there are too many concurrent
connections or thread limitations,
  it could result in failed connections. *Adjust
spark.shuffle.io.clientThreads*
- It might be prudent to do the same to *spark.shuffle.io.server.Threads*
- Check how stable your environment is. Observe any issues reported in
Spark UI

HTH


Mich Talebzadeh,
Solutions Architect/Engineering Lead
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 18 Aug 2023 at 23:30, Nebi Aydin  wrote:

>
> Hi, sorry for duplicates. First time user :)
> I keep getting fetchfailedexception 7337 port closed. Which is external
> shuffle service port.
> I was trying to tune these parameters.
> I have around 1000 executors and 5000 cores.
> I tried to set spark.shuffle.io.serverThreads to 2k. Should I also set 
> spark.shuffle.io.clientThreads
> to 2000?
> Does shuffle client threads allow one executor to fetch from multiple
> nodes shuffle service?
>
> Thanks
> On Fri, Aug 18, 2023 at 17:42 Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> These two threads that you sent seem to be duplicates of each other?
>>
>> Anyhow I trust that you are familiar with the concept of shuffle in
>> Spark. Spark Shuffle is an expensive operation since it involves the
>> following
>>
>>-
>>
>>Disk I/O
>>-
>>
>>Involves data serialization and deserialization
>>-
>>
>>Network I/O
>>
>> Basically these are based on the concept of map/reduce in Spark and these
>> parameters you posted relate to various aspects of threading and
>> concurrency.
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 18 Aug 2023 at 20:39, Nebi Aydin 
>> wrote:
>>
>>>
>>> I want to learn differences among below thread configurations.
>>>
>>> spark.shuffle.io.serverThreads
>>> spark.shuffle.io.clientThreads
>>> spark.shuffle.io.threads
>>> spark.rpc.io.serverThreads
>>> spark.rpc.io.clientThreads
>>> spark.rpc.io.threads
>>>
>>> Thanks.
>>>
>>


Re: [External Email] Re: [Spark Core]: What's difference among spark.shuffle.io.threads

2023-08-18 Thread Nebi Aydin
Hi, sorry for duplicates. First time user :)
I keep getting fetchfailedexception 7337 port closed. Which is external
shuffle service port.
I was trying to tune these parameters.
I have around 1000 executors and 5000 cores.
I tried to set spark.shuffle.io.serverThreads to 2k. Should I also set
spark.shuffle.io.clientThreads
to 2000?
Does shuffle client threads allow one executor to fetch from multiple nodes
shuffle service?

Thanks
On Fri, Aug 18, 2023 at 17:42 Mich Talebzadeh 
wrote:

> Hi,
>
> These two threads that you sent seem to be duplicates of each other?
>
> Anyhow I trust that you are familiar with the concept of shuffle in Spark.
> Spark Shuffle is an expensive operation since it involves the following
>
>-
>
>Disk I/O
>-
>
>Involves data serialization and deserialization
>-
>
>Network I/O
>
> Basically these are based on the concept of map/reduce in Spark and these
> parameters you posted relate to various aspects of threading and
> concurrency.
>
> HTH
>
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 18 Aug 2023 at 20:39, Nebi Aydin 
> wrote:
>
>>
>> I want to learn differences among below thread configurations.
>>
>> spark.shuffle.io.serverThreads
>> spark.shuffle.io.clientThreads
>> spark.shuffle.io.threads
>> spark.rpc.io.serverThreads
>> spark.rpc.io.clientThreads
>> spark.rpc.io.threads
>>
>> Thanks.
>>
>


Re: [Spark Core]: What's difference among spark.shuffle.io.threads

2023-08-18 Thread Mich Talebzadeh
Hi,

These two threads that you sent seem to be duplicates of each other?

Anyhow I trust that you are familiar with the concept of shuffle in Spark.
Spark Shuffle is an expensive operation since it involves the following

   -

   Disk I/O
   -

   Involves data serialization and deserialization
   -

   Network I/O

Basically these are based on the concept of map/reduce in Spark and these
parameters you posted relate to various aspects of threading and
concurrency.

HTH


Mich Talebzadeh,
Solutions Architect/Engineering Lead
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 18 Aug 2023 at 20:39, Nebi Aydin 
wrote:

>
> I want to learn differences among below thread configurations.
>
> spark.shuffle.io.serverThreads
> spark.shuffle.io.clientThreads
> spark.shuffle.io.threads
> spark.rpc.io.serverThreads
> spark.rpc.io.clientThreads
> spark.rpc.io.threads
>
> Thanks.
>


[Spark Core]: What's difference among spark.shuffle.io.threads

2023-08-18 Thread Nebi Aydin
I want to learn differences among below thread configurations.

spark.shuffle.io.serverThreads
spark.shuffle.io.clientThreads
spark.shuffle.io.threads
spark.rpc.io.serverThreads
spark.rpc.io.clientThreads
spark.rpc.io.threads

Thanks.


[Spark Core]: What's difference among spark.shuffle.io.threads

2023-08-18 Thread Nebi Aydin
I want to learn differences among below thread configurations.

spark.shuffle.io.serverThreads
spark.shuffle.io.clientThreads
spark.shuffle.io.threads
spark.rpc.io.serverThreads
spark.rpc.io.clientThreads
spark.rpc.io.threads

Thanks.


Re: The performance difference when running Apache Spark on K8s and traditional server

2023-07-27 Thread Mich Talebzadeh
Spark on tin boxes like Google Dataproc or AWS EC2 often utilise YARN
resource manager. YARN  is the most widely used resource manager not just
for Spark but for other artefacts as well. On-premise YARN is used
extensively. In Cloud it is also used widely in Infrastructure as a Service
such as Google Dataproc which I mentioned.

With regard to your questions:

Q1: What are the causes and reasons for Spark on K8s to be slower than
Serverful?
--> It should be noted that Spark on Kubernetes is work in progress and as
of now there is future work outstanding.  It is not in parity with Spark on
Yarn

Q2: How or is there a scenario to show the most apparent difference in
performance and cost of these two environments (Serverless (K8S) and
Serverful (Traditional server)?
--> Simple. One experiment is worth 10 hypothesis  Install spark on
serverful and install spark on k8s and run the same workload and observer
the performance through SPARK GUI for the same workload

See this article of mine to help you with some features. A bit dated but
still covers concepts

Spark on Kubernetes, A Practitioner’s Guide
<https://www.linkedin.com/pulse/spark-kubernetes-practitioners-guide-mich-talebzadeh-ph-d-/?trackingId=01Fj2t28THWpLEldU0Q9ow%3D%3D>

HTH

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 27 Jul 2023 at 18:20, Trường Trần Phan An 
wrote:

> Hi all,
>
> I am learning about the performance difference of Spark when performing a
> JOIN problem on Serverless (K8S) and Serverful (Traditional server)
> environments.
>
> Through experiment, Spark on K8s tends to run slower than Serverful.
> Through understanding the architecture, I know that Spark runs on K8s as
> Containers (Pods) so it takes a certain time to initialize, but when I look
> at each job, stage, and task, Spark K8s tends to be slower. Serverful.
>
> *I have some questions:*
> Q1: What are the causes and reasons for Spark on K8s to be slower than
> Serverful?
> Q2: How or is there a scenario to show the most apparent difference in
> performance and cost of these two environments (Serverless (K8S) and
> Serverful (Traditional server)?
>
> Thank you so much!
>
> Best regards,
> Truong
>
>
>


The performance difference when running Apache Spark on K8s and traditional server

2023-07-27 Thread Trường Trần Phan An
Hi all,

I am learning about the performance difference of Spark when performing a
JOIN problem on Serverless (K8S) and Serverful (Traditional server)
environments.

Through experiment, Spark on K8s tends to run slower than Serverful.
Through understanding the architecture, I know that Spark runs on K8s as
Containers (Pods) so it takes a certain time to initialize, but when I look
at each job, stage, and task, Spark K8s tends to be slower. Serverful.

*I have some questions:*
Q1: What are the causes and reasons for Spark on K8s to be slower than
Serverful?
Q2: How or is there a scenario to show the most apparent difference in
performance and cost of these two environments (Serverless (K8S) and
Serverful (Traditional server)?

Thank you so much!

Best regards,
Truong


Re: to find Difference of locations in Spark Dataframe rows

2022-06-09 Thread Bjørn Jørgensen
If KM is kilometre then you must replace val distance = atan2(sqrt(a), sqrt
(-a + 1)) * 2 * 6371
to val distance = atan2(sqrt(a), sqrt(-a + 1)) * 2 * 12742

Have a look at this gnist Spherical distance calcualtion based on latitude
and longitude with Apache Spark


tir. 7. jun. 2022 kl. 19:39 skrev Chetan Khatri :

> Hi Dear Spark Users,
>
> It has been many years that I have worked on Spark, Please help me. Thanks
> much
>
> I have different cities and their co-ordinates in DataFrame[Row], I want
> to find distance in KMs and then show only those records /cities which are
> 10 KMs far.
>
> I have a function created that can find the distance in KMs given two
> co-coordinates. But I don't know how to apply it to rows, like one to many
> and calculate the distance.
>
> Some code that I wrote, Sorry for the basic code.
>
> lass HouseMatching {
>   def main(args: Array[String]): Unit = {
>
> val search_property_id = args(0)
>
> // list of columns where the condition should be exact match
> val groupOneCriteria = List(
>   "occupied_by_tenant",
>   "water_index",
>   "electricity_index",
>   "elevator_index",
>   "heating_index",
>   "nb_bathtubs",
>   "nb_showers",
>   "nb_wc",
>   "nb_rooms",
>   "nb_kitchens"
> )
> // list of columns where the condition should be matching 80%
> val groupTwoCriteria = List(
>   "area",
>   "home_condition",
>   "building_age"
> )
> // list of columns where the condition should be found using Euclidean 
> distance
> val groupThreeCriteria = List(
>   "postal_code"
> )
>
> val region_or_city = "region"
>
> def haversineDistance(destination_latitude: Column, 
> destination_longitude: Column, origin_latitude: Column,
>   origin_longitude: Column): Column = {
>   val a = pow(sin(radians(destination_latitude - origin_latitude) / 2), 
> 2) +
> cos(radians(origin_latitude)) * cos(radians(destination_latitude)) *
>   pow(sin(radians(destination_longitude - origin_longitude) / 2), 2)
>   val distance = atan2(sqrt(a), sqrt(-a + 1)) * 2 * 6371
>   distance
> }
>
> val spark = SparkSession.builder().appName("real-estate-property-matcher")
>   .getOrCreate()
>
> val housingDataDF = 
> spark.read.csv("~/Downloads/real-estate-sample-data.csv")
>
> // searching for the property by `ref_id`
> val searchPropertyDF = housingDataDF.filter(col("ref_id") === 
> search_property_id)
>
> // Similar house in the same city (same postal code) and group one 
> condition
> val similarHouseAndSameCity = housingDataDF.join(searchPropertyDF, 
> groupThreeCriteria ++ groupOneCriteria,
>   "inner")
>
> // Similar house not in the same city but 10km range
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


to find Difference of locations in Spark Dataframe rows

2022-06-07 Thread Chetan Khatri
Hi Dear Spark Users,

It has been many years that I have worked on Spark, Please help me. Thanks
much

I have different cities and their co-ordinates in DataFrame[Row], I want to
find distance in KMs and then show only those records /cities which are 10
KMs far.

I have a function created that can find the distance in KMs given two
co-coordinates. But I don't know how to apply it to rows, like one to many
and calculate the distance.

Some code that I wrote, Sorry for the basic code.

lass HouseMatching {
  def main(args: Array[String]): Unit = {

val search_property_id = args(0)

// list of columns where the condition should be exact match
val groupOneCriteria = List(
  "occupied_by_tenant",
  "water_index",
  "electricity_index",
  "elevator_index",
  "heating_index",
  "nb_bathtubs",
  "nb_showers",
  "nb_wc",
  "nb_rooms",
  "nb_kitchens"
)
// list of columns where the condition should be matching 80%
val groupTwoCriteria = List(
  "area",
  "home_condition",
  "building_age"
)
// list of columns where the condition should be found using
Euclidean distance
val groupThreeCriteria = List(
  "postal_code"
)

val region_or_city = "region"

def haversineDistance(destination_latitude: Column,
destination_longitude: Column, origin_latitude: Column,
  origin_longitude: Column): Column = {
  val a = pow(sin(radians(destination_latitude - origin_latitude) / 2), 2) +
cos(radians(origin_latitude)) * cos(radians(destination_latitude)) *
  pow(sin(radians(destination_longitude - origin_longitude) / 2), 2)
  val distance = atan2(sqrt(a), sqrt(-a + 1)) * 2 * 6371
  distance
}

val spark = SparkSession.builder().appName("real-estate-property-matcher")
  .getOrCreate()

val housingDataDF =
spark.read.csv("~/Downloads/real-estate-sample-data.csv")

// searching for the property by `ref_id`
val searchPropertyDF = housingDataDF.filter(col("ref_id") ===
search_property_id)

// Similar house in the same city (same postal code) and group one condition
val similarHouseAndSameCity = housingDataDF.join(searchPropertyDF,
groupThreeCriteria ++ groupOneCriteria,
  "inner")

// Similar house not in the same city but 10km range


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Raghavendra Ganesh
What is optimal depends on the context of the problem.
Is the intent here to find the best solution for top n values with a group
by ?

Both the solutions look sub-optimal to me. Window function would be
expensive as it needs an order by (which a top n solution shouldn't need).
It would be best to just group by department and use an aggregate function
which stores the top n values in a heap.
--
Raghavendra


On Mon, Feb 28, 2022 at 12:01 AM Sid  wrote:

> My bad.
>
> Aggregation Query:
>
> # Write your MySQL query statement below
>
>SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
> FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
> WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
>WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
> ORDER by E.DepartmentId, E.Salary DESC
>
> Time Taken: 1212 ms
>
> Windowing Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
> rnk from Department d join Employee e on e.departmentId=d.id ) a where
> rnk<=3
>
> Time Taken: 790 ms
>
> Thanks,
> Sid
>
>
> On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:
>
>> Those two queries are identical?
>>
>> On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:
>>
>>> Hi Team,
>>>
>>> I am aware that if windowing functions are used, then at first it loads
>>> the entire dataset into one window,scans and then performs the other
>>> mentioned operations for that particular window which could be slower when
>>> dealing with trillions / billions of records.
>>>
>>> I did a POC where I used an example to find the max 3 highest salary for
>>> an employee per department. So, I wrote a below queries and compared the
>>> time for it:
>>>
>>> Windowing Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 790 ms
>>>
>>> Aggregation Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 1212 ms
>>>
>>> But as per my understanding, the aggregation should have run faster. So,
>>> my whole point is if the dataset is huge I should force some kind of map
>>> reduce jobs like we have an option called df.groupby().reduceByGroups()
>>>
>>> So I think the aggregation query is taking more time since the dataset
>>> size here is smaller and as we all know that map reduce works faster when
>>> there is a huge volume of data. Haven't tested it yet on big data but
>>> needed some expert guidance over here.
>>>
>>> Please correct me if I am wrong.
>>>
>>> TIA,
>>> Sid
>>>
>>>
>>>
>>>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Mich Talebzadeh
AM I correct that with

.. WHERE (SELECT COUNT(DISTINCT(Salary))..

You will have to shuffle because of DISTINCTas each worker will have to
read data separately and perform the reduce task to get the local
distinct value
and one final shuffle to get the actual distinct
for all the data?



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 27 Feb 2022 at 20:31, Sean Owen  wrote:

> "count distinct' does not have that problem, whether in a group-by or not.
> I'm still not sure these are equivalent queries but maybe not seeing it.
> Windowing makes sense when you need the whole window, or when you need
> sliding windows to express the desired groups.
> It may be unnecessary when your query does not need the window, just a
> summary stat like 'max'. Depends.
>
> On Sun, Feb 27, 2022 at 2:14 PM Bjørn Jørgensen 
> wrote:
>
>> You are using distinct which collects everything to the driver. Soo use
>> the other one :)
>>
>> søn. 27. feb. 2022 kl. 21:00 skrev Sid :
>>
>>> Basically, I am trying two different approaches for the same problem and
>>> my concern is how it will behave in the case of big data if you talk about
>>> millions of records. Which one would be faster? Is using windowing
>>> functions a better way since it will load the entire dataset into a single
>>> window and do the operations?
>>>
>>
>>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sean Owen
"count distinct' does not have that problem, whether in a group-by or not.
I'm still not sure these are equivalent queries but maybe not seeing it.
Windowing makes sense when you need the whole window, or when you need
sliding windows to express the desired groups.
It may be unnecessary when your query does not need the window, just a
summary stat like 'max'. Depends.

On Sun, Feb 27, 2022 at 2:14 PM Bjørn Jørgensen 
wrote:

> You are using distinct which collects everything to the driver. Soo use
> the other one :)
>
> søn. 27. feb. 2022 kl. 21:00 skrev Sid :
>
>> Basically, I am trying two different approaches for the same problem and
>> my concern is how it will behave in the case of big data if you talk about
>> millions of records. Which one would be faster? Is using windowing
>> functions a better way since it will load the entire dataset into a single
>> window and do the operations?
>>
>
>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Bjørn Jørgensen
You are using distinct which collects everything to the driver. Soo use the
other one :)

søn. 27. feb. 2022 kl. 21:00 skrev Sid :

> Basically, I am trying two different approaches for the same problem and
> my concern is how it will behave in the case of big data if you talk about
> millions of records. Which one would be faster? Is using windowing
> functions a better way since it will load the entire dataset into a single
> window and do the operations?
>
> On Mon, Feb 28, 2022 at 12:26 AM Sean Owen  wrote:
>
>> Those queries look like they do fairly different things. One is selecting
>> top employees by salary, the other is ... selecting where there are less
>> than 3 distinct salaries or something.
>> Not sure what the intended comparison is then; these are not equivalent
>> ways of doing the same thing, or does not seem so as far as I can see.
>>
>> On Sun, Feb 27, 2022 at 12:30 PM Sid  wrote:
>>
>>> My bad.
>>>
>>> Aggregation Query:
>>>
>>> # Write your MySQL query statement below
>>>
>>>SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
>>> FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
>>> WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
>>>WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
>>> ORDER by E.DepartmentId, E.Salary DESC
>>>
>>> Time Taken: 1212 ms
>>>
>>> Windowing Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time Taken: 790 ms
>>>
>>> Thanks,
>>> Sid
>>>
>>>
>>> On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:
>>>
 Those two queries are identical?

 On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:

> Hi Team,
>
> I am aware that if windowing functions are used, then at first it
> loads the entire dataset into one window,scans and then performs the other
> mentioned operations for that particular window which could be slower when
> dealing with trillions / billions of records.
>
> I did a POC where I used an example to find the max 3 highest salary
> for an employee per department. So, I wrote a below queries and compared
> the time for it:
>
> Windowing Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc)
> as rnk from Department d join Employee e on e.departmentId=d.id ) a
> where rnk<=3
>
> Time taken: 790 ms
>
> Aggregation Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc)
> as rnk from Department d join Employee e on e.departmentId=d.id ) a
> where rnk<=3
>
> Time taken: 1212 ms
>
> But as per my understanding, the aggregation should have run faster.
> So, my whole point is if the dataset is huge I should force some kind of
> map reduce jobs like we have an option called 
> df.groupby().reduceByGroups()
>
> So I think the aggregation query is taking more time since the dataset
> size here is smaller and as we all know that map reduce works faster when
> there is a huge volume of data. Haven't tested it yet on big data but
> needed some expert guidance over here.
>
> Please correct me if I am wrong.
>
> TIA,
> Sid
>
>
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sid
Basically, I am trying two different approaches for the same problem and my
concern is how it will behave in the case of big data if you talk about
millions of records. Which one would be faster? Is using windowing
functions a better way since it will load the entire dataset into a single
window and do the operations?

On Mon, Feb 28, 2022 at 12:26 AM Sean Owen  wrote:

> Those queries look like they do fairly different things. One is selecting
> top employees by salary, the other is ... selecting where there are less
> than 3 distinct salaries or something.
> Not sure what the intended comparison is then; these are not equivalent
> ways of doing the same thing, or does not seem so as far as I can see.
>
> On Sun, Feb 27, 2022 at 12:30 PM Sid  wrote:
>
>> My bad.
>>
>> Aggregation Query:
>>
>> # Write your MySQL query statement below
>>
>>SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
>> FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
>> WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
>>WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
>> ORDER by E.DepartmentId, E.Salary DESC
>>
>> Time Taken: 1212 ms
>>
>> Windowing Query:
>>
>> select Department,Employee,Salary from (
>> select d.name as Department, e.name as Employee,e.salary as
>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>> rnk<=3
>>
>> Time Taken: 790 ms
>>
>> Thanks,
>> Sid
>>
>>
>> On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:
>>
>>> Those two queries are identical?
>>>
>>> On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:
>>>
 Hi Team,

 I am aware that if windowing functions are used, then at first it loads
 the entire dataset into one window,scans and then performs the other
 mentioned operations for that particular window which could be slower when
 dealing with trillions / billions of records.

 I did a POC where I used an example to find the max 3 highest salary
 for an employee per department. So, I wrote a below queries and compared
 the time for it:

 Windowing Query:

 select Department,Employee,Salary from (
 select d.name as Department, e.name as Employee,e.salary as
 Salary,dense_rank() over(partition by d.name order by e.salary desc)
 as rnk from Department d join Employee e on e.departmentId=d.id ) a
 where rnk<=3

 Time taken: 790 ms

 Aggregation Query:

 select Department,Employee,Salary from (
 select d.name as Department, e.name as Employee,e.salary as
 Salary,dense_rank() over(partition by d.name order by e.salary desc)
 as rnk from Department d join Employee e on e.departmentId=d.id ) a
 where rnk<=3

 Time taken: 1212 ms

 But as per my understanding, the aggregation should have run faster.
 So, my whole point is if the dataset is huge I should force some kind of
 map reduce jobs like we have an option called df.groupby().reduceByGroups()

 So I think the aggregation query is taking more time since the dataset
 size here is smaller and as we all know that map reduce works faster when
 there is a huge volume of data. Haven't tested it yet on big data but
 needed some expert guidance over here.

 Please correct me if I am wrong.

 TIA,
 Sid






Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sid
Hi Enrico,

Thanks for your time :)

Consider a huge data volume scenario, If I don't use any keywords like
distinct, which one would be faster ? Window with partitionBy or normal SQL
aggregation methods? and how does df.groupBy().reduceByGroups() work
internally ?

Thanks,
Sid

On Mon, Feb 28, 2022 at 12:59 AM Enrico Minack 
wrote:

> Sid,
>
> Your Aggregation Query selects all employees where less than three
> distinct salaries exist that are larger. So, both queries seem to do the
> same.
>
> The Windowing Query is explicit in what it does: give me the rank for
> salaries per department in the given order and pick the top 3 per
> department.
>
> The Aggregation Query is trying to get to this conclusion by constructing
> some comparison. The former is the better approach, the second scales badly
> as this is done by counting distinct salaries that are larger than each
> salary in E. This looks like a Cartesian product of Employees. You make
> this very hard to optimize or execute by the query engine.
>
> And as you say, your example is very small, so this will not give any
> insights into big data.
>
> Enrico
>
>
> Am 27.02.22 um 19:30 schrieb Sid:
>
> My bad.
>
> Aggregation Query:
>
> # Write your MySQL query statement below
>
>SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
> FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
> WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
>WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
> ORDER by E.DepartmentId, E.Salary DESC
>
> Time Taken: 1212 ms
>
> Windowing Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
> rnk from Department d join Employee e on e.departmentId=d.id ) a where
> rnk<=3
>
> Time Taken: 790 ms
>
> Thanks,
> Sid
>
>
> On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:
>
>> Those two queries are identical?
>>
>> On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:
>>
>>> Hi Team,
>>>
>>> I am aware that if windowing functions are used, then at first it loads
>>> the entire dataset into one window,scans and then performs the other
>>> mentioned operations for that particular window which could be slower when
>>> dealing with trillions / billions of records.
>>>
>>> I did a POC where I used an example to find the max 3 highest salary for
>>> an employee per department. So, I wrote a below queries and compared the
>>> time for it:
>>>
>>> Windowing Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 790 ms
>>>
>>> Aggregation Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 1212 ms
>>>
>>> But as per my understanding, the aggregation should have run faster. So,
>>> my whole point is if the dataset is huge I should force some kind of map
>>> reduce jobs like we have an option called df.groupby().reduceByGroups()
>>>
>>> So I think the aggregation query is taking more time since the dataset
>>> size here is smaller and as we all know that map reduce works faster when
>>> there is a huge volume of data. Haven't tested it yet on big data but
>>> needed some expert guidance over here.
>>>
>>> Please correct me if I am wrong.
>>>
>>> TIA,
>>> Sid
>>>
>>>
>>>
>>>
>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Enrico Minack

Sid,

Your Aggregation Query selects all employees where less than three 
distinct salaries exist that are larger. So, both queries seem to do the 
same.


The Windowing Query is explicit in what it does: give me the rank for 
salaries per department in the given order and pick the top 3 per 
department.


The Aggregation Query is trying to get to this conclusion by 
constructing some comparison. The former is the better approach, the 
second scales badly as this is done by counting distinct salaries that 
are larger than each salary in E. This looks like a Cartesian product of 
Employees. You make this very hard to optimize or execute by the query 
engine.


And as you say, your example is very small, so this will not give any 
insights into big data.


Enrico


Am 27.02.22 um 19:30 schrieb Sid:

My bad.

Aggregation Query:

# Write your MySQL query statement below

   SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
       WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
ORDER by E.DepartmentId, E.Salary DESC

Time Taken: 1212 ms

Windowing Query:

select Department,Employee,Salary from (
select d.name  as Department, e.name 
 as Employee,e.salary as Salary,dense_rank() 
over(partition by d.name  order by e.salary desc) as 
rnk from Department d join Employee e on e.departmentId=d.id 
 ) a where rnk<=3


Time Taken: 790 ms

Thanks,
Sid


On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:

Those two queries are identical?

On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:

Hi Team,

I am aware that if windowing functions are used, then at first
it loads the entire dataset into one window,scans and then
performs the other mentioned operations for that particular
window which could be slower when dealing with trillions /
billions of records.

I did a POC where I used an example to find the max 3 highest
salary for an employee per department. So, I wrote a below
queries and compared the time for it:

Windowing Query:

select Department,Employee,Salary from (
select d.name  as Department, e.name
 as Employee,e.salary as Salary,dense_rank()
over(partition by d.name  order by e.salary
desc) as rnk from Department d join Employee e on
e.departmentId=d.id  ) a where rnk<=3

Time taken: 790 ms

Aggregation Query:

select Department,Employee,Salary from (
select d.name  as Department, e.name
 as Employee,e.salary as Salary,dense_rank()
over(partition by d.name  order by e.salary
desc) as rnk from Department d join Employee e on
e.departmentId=d.id  ) a where rnk<=3

Time taken: 1212 ms

But as per my understanding, the aggregation should have run
faster. So, my whole point is if the dataset is huge I should
force some kind of map reduce jobs like we have an option
called df.groupby().reduceByGroups()

So I think the aggregation query is taking more time since the
dataset size here is smaller and as we all know that map
reduce works faster when there is a huge volume of data.
Haven't tested it yet on big data but needed some expert
guidance over here.

Please correct me if I am wrong.

TIA,
Sid




Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sean Owen
Those queries look like they do fairly different things. One is selecting
top employees by salary, the other is ... selecting where there are less
than 3 distinct salaries or something.
Not sure what the intended comparison is then; these are not equivalent
ways of doing the same thing, or does not seem so as far as I can see.

On Sun, Feb 27, 2022 at 12:30 PM Sid  wrote:

> My bad.
>
> Aggregation Query:
>
> # Write your MySQL query statement below
>
>SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
> FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
> WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
>WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
> ORDER by E.DepartmentId, E.Salary DESC
>
> Time Taken: 1212 ms
>
> Windowing Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
> rnk from Department d join Employee e on e.departmentId=d.id ) a where
> rnk<=3
>
> Time Taken: 790 ms
>
> Thanks,
> Sid
>
>
> On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:
>
>> Those two queries are identical?
>>
>> On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:
>>
>>> Hi Team,
>>>
>>> I am aware that if windowing functions are used, then at first it loads
>>> the entire dataset into one window,scans and then performs the other
>>> mentioned operations for that particular window which could be slower when
>>> dealing with trillions / billions of records.
>>>
>>> I did a POC where I used an example to find the max 3 highest salary for
>>> an employee per department. So, I wrote a below queries and compared the
>>> time for it:
>>>
>>> Windowing Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 790 ms
>>>
>>> Aggregation Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 1212 ms
>>>
>>> But as per my understanding, the aggregation should have run faster. So,
>>> my whole point is if the dataset is huge I should force some kind of map
>>> reduce jobs like we have an option called df.groupby().reduceByGroups()
>>>
>>> So I think the aggregation query is taking more time since the dataset
>>> size here is smaller and as we all know that map reduce works faster when
>>> there is a huge volume of data. Haven't tested it yet on big data but
>>> needed some expert guidance over here.
>>>
>>> Please correct me if I am wrong.
>>>
>>> TIA,
>>> Sid
>>>
>>>
>>>
>>>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sid
My bad.

Aggregation Query:

# Write your MySQL query statement below

   SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
   WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
ORDER by E.DepartmentId, E.Salary DESC

Time Taken: 1212 ms

Windowing Query:

select Department,Employee,Salary from (
select d.name as Department, e.name as Employee,e.salary as
Salary,dense_rank() over(partition by d.name order by e.salary desc) as rnk
from Department d join Employee e on e.departmentId=d.id ) a where rnk<=3

Time Taken: 790 ms

Thanks,
Sid


On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:

> Those two queries are identical?
>
> On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:
>
>> Hi Team,
>>
>> I am aware that if windowing functions are used, then at first it loads
>> the entire dataset into one window,scans and then performs the other
>> mentioned operations for that particular window which could be slower when
>> dealing with trillions / billions of records.
>>
>> I did a POC where I used an example to find the max 3 highest salary for
>> an employee per department. So, I wrote a below queries and compared the
>> time for it:
>>
>> Windowing Query:
>>
>> select Department,Employee,Salary from (
>> select d.name as Department, e.name as Employee,e.salary as
>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>> rnk<=3
>>
>> Time taken: 790 ms
>>
>> Aggregation Query:
>>
>> select Department,Employee,Salary from (
>> select d.name as Department, e.name as Employee,e.salary as
>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>> rnk<=3
>>
>> Time taken: 1212 ms
>>
>> But as per my understanding, the aggregation should have run faster. So,
>> my whole point is if the dataset is huge I should force some kind of map
>> reduce jobs like we have an option called df.groupby().reduceByGroups()
>>
>> So I think the aggregation query is taking more time since the dataset
>> size here is smaller and as we all know that map reduce works faster when
>> there is a huge volume of data. Haven't tested it yet on big data but
>> needed some expert guidance over here.
>>
>> Please correct me if I am wrong.
>>
>> TIA,
>> Sid
>>
>>
>>
>>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sean Owen
Those two queries are identical?

On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:

> Hi Team,
>
> I am aware that if windowing functions are used, then at first it loads
> the entire dataset into one window,scans and then performs the other
> mentioned operations for that particular window which could be slower when
> dealing with trillions / billions of records.
>
> I did a POC where I used an example to find the max 3 highest salary for
> an employee per department. So, I wrote a below queries and compared the
> time for it:
>
> Windowing Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
> rnk from Department d join Employee e on e.departmentId=d.id ) a where
> rnk<=3
>
> Time taken: 790 ms
>
> Aggregation Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
> rnk from Department d join Employee e on e.departmentId=d.id ) a where
> rnk<=3
>
> Time taken: 1212 ms
>
> But as per my understanding, the aggregation should have run faster. So,
> my whole point is if the dataset is huge I should force some kind of map
> reduce jobs like we have an option called df.groupby().reduceByGroups()
>
> So I think the aggregation query is taking more time since the dataset
> size here is smaller and as we all know that map reduce works faster when
> there is a huge volume of data. Haven't tested it yet on big data but
> needed some expert guidance over here.
>
> Please correct me if I am wrong.
>
> TIA,
> Sid
>
>
>
>


Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sid
Hi Team,

I am aware that if windowing functions are used, then at first it loads the
entire dataset into one window,scans and then performs the other mentioned
operations for that particular window which could be slower when dealing
with trillions / billions of records.

I did a POC where I used an example to find the max 3 highest salary for an
employee per department. So, I wrote a below queries and compared the time
for it:

Windowing Query:

select Department,Employee,Salary from (
select d.name as Department, e.name as Employee,e.salary as
Salary,dense_rank() over(partition by d.name order by e.salary desc) as rnk
from Department d join Employee e on e.departmentId=d.id ) a where rnk<=3

Time taken: 790 ms

Aggregation Query:

select Department,Employee,Salary from (
select d.name as Department, e.name as Employee,e.salary as
Salary,dense_rank() over(partition by d.name order by e.salary desc) as rnk
from Department d join Employee e on e.departmentId=d.id ) a where rnk<=3

Time taken: 1212 ms

But as per my understanding, the aggregation should have run faster. So, my
whole point is if the dataset is huge I should force some kind of map
reduce jobs like we have an option called df.groupby().reduceByGroups()

So I think the aggregation query is taking more time since the dataset size
here is smaller and as we all know that map reduce works faster when there
is a huge volume of data. Haven't tested it yet on big data but needed some
expert guidance over here.

Please correct me if I am wrong.

TIA,
Sid


Re: Difference in behavior for Spark 3.0 vs Spark 3.1 "create database "

2022-01-11 Thread Wenchen Fan
Hopefully, this StackOverflow answer can solve your problem:
https://stackoverflow.com/questions/47523037/how-do-i-configure-pyspark-to-write-to-hdfs-by-default

Spark doesn't control the behavior of qualifying paths. It's decided by
certain configs and/or config files.

On Tue, Jan 11, 2022 at 3:03 AM Pablo Langa Blanco  wrote:

> Hi Pralabh,
>
> If it helps, it is probably related to this change
> https://github.com/apache/spark/pull/28527
>
> Regards
>
> On Mon, Jan 10, 2022 at 10:42 AM Pralabh Kumar 
> wrote:
>
>> Hi Spark Team
>>
>> When creating a database via Spark 3.0 on Hive
>>
>> 1) spark.sql("create database test location '/user/hive'").  It creates
>> the database location on hdfs . As expected
>>
>> 2) When running the same command on 3.1 the database is created on the
>> local file system by default. I have to prefix with hdfs to create db on
>> hdfs.
>>
>> Why is there a difference in the behavior, Can you please point me to the
>> jira which causes this change.
>>
>> Note : spark.sql.warehouse.dir and hive.metastore.warehouse.dir both are
>> having default values(not explicitly set)
>>
>> Regards
>> Pralabh Kumar
>>
>


Difference in behavior for Spark 3.0 vs Spark 3.1 "create database "

2022-01-10 Thread Pralabh Kumar
Hi Spark Team

When creating a database via Spark 3.0 on Hive

1) spark.sql("create database test location '/user/hive'").  It creates the
database location on hdfs . As expected

2) When running the same command on 3.1 the database is created on the
local file system by default. I have to prefix with hdfs to create db on
hdfs.

Why is there a difference in the behavior, Can you please point me to the
jira which causes this change.

Note : spark.sql.warehouse.dir and hive.metastore.warehouse.dir both are
having default values(not explicitly set)

Regards
Pralabh Kumar


Find difference between two dataframes in spark structured streaming

2020-12-16 Thread act_coder
I am creating a spark structured streaming job, where I need to find the
difference between two dataframes.

Dataframe 1 :

[1, item1, value1]
[2, item2, value2]
[3, item3, value3]
[4, item4, value4]
[5, item5, value5]

Dataframe 2:

[4, item4, value4]
[5, item5, value5]

New Dataframe with difference between two D1-D2:

[1, item1, value1]
[2, item2, value2]
[3, item3, value3]

I tried using except() and left anti join(), but both are not being
supported on spark structured streaming.

Is there a way we can achieve this in structured streaming ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Huge difference in speed between pyspark and scalaspark

2020-05-13 Thread Steven Van Ingelgem
Public

Hello all,


We noticed a HUGE difference between using pyspark and spark in scala.
Pyspark runs:

  *   on my work computer in +-350 seconds
  *   on my home computer in +- 130 seconds (Windows defender enabled)
  *   on my home computer in +- 105 seconds (Windows defender disabled)
  *   on my home computer as Scala code in +- 7 seconds
  *

The script:
def setUp(self):
self.left = self.parallelize([
('Wim', 46),
('Klaas', 18)
]).toDF('name: string, age: int')

self.right = self.parallelize([
('Jiri', 25),
('Tomasz', 23)
]).toDF('name: string, age: int')

def test_simple_union(self):
sut = self.left.union(self.right)

self.assertDatasetEquals(sut, self.parallelize([
('Wim', 46),
('Klaas', 18),
('Jiri', 25),
('Tomasz', 23)
]).toDF('name: string, age: int')
)

Disclaimer <http://www.kbc.com/KBCmailDisclaimer>


Performance difference between Dataframe and Dataset especially on parquet data.

2019-06-12 Thread Shivam Sharma
Hi all,

As we know that parquet is stored in columnar format and filtering on the
column will require that column only instead of the complete record.

So when we are creating Dataset[Class] and doing group by on the column vs
same on steps DataFrame is performing differently. Operations on Dataset is
causing OOM issues with same execution parameters.

Thanks

-- 
Shivam Sharma
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Email:- 28shivamsha...@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
*


Is there a difference between --proxy-user or HADOOP_USER_NAME in a non-Kerberized YARN cluster?

2019-05-16 Thread Jeff Evans
Let's suppose we're dealing with a non-secured (i.e. not Kerberized)
YARN cluster.  When I invoke spark-submit, is there a practical
difference between specifying --proxy-user=foo (supposing
impersonation is properly set up) or setting the environment variable
HADOOP_USER_NAME=foo?  Thanks for any insights or links to docs.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: What is the difference for the following UDFs?

2019-05-14 Thread Qian He
Hi Jacek,

Thanks for your reply. Your provided case was actually same as my second
option in my original email. What I'm wondering was the difference between
those two regarding query performance or efficiency.

On Tue, May 14, 2019 at 3:51 PM Jacek Laskowski  wrote:

> Hi,
>
> For this particular case I'd use Column.substr (
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column),
> e.g.
>
> val ns = Seq(("hello world", 1, 5)).toDF("w", "b", "e")
> scala> ns.select($"w".substr($"b", $"e" - $"b" + 1) as "demo").show
> +-+
> | demo|
> +-+
> |hello|
> +-+
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, May 14, 2019 at 5:08 PM Qian He  wrote:
>
>> For example, I have a dataframe with 3 columns: URL, START, END. For each
>> url from URL column, I want to fetch a substring of it starting from START
>> and ending at END.
>> ++--+-+
>> |URL|START |END |
>> ++--+-+
>> |www.amazon.com  |4  |14 |
>> |www.yahoo.com |4  |13 |
>> |www.amazon.com  |4  |14 |
>> |www.google.com|4  |14 |
>>
>> I have UDF1:
>>
>> def getSubString = (input: String, start: Int, end: Int) => {
>>input.substring(start, end)
>> }
>> val udf1 = udf(getSubString)
>>
>> and another UDF2:
>>
>> def getColSubString()(c1: Column, c2: Column, c3: Column): Column = {
>>c1.substr(c2, c3-c2)
>> }
>>
>> Let's assume they can both generate the result I want. But, from performance 
>> perspective, is there any difference between those two UDFs?
>>
>>
>>


Re: What is the difference for the following UDFs?

2019-05-14 Thread Jacek Laskowski
Hi,

For this particular case I'd use Column.substr (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column),
e.g.

val ns = Seq(("hello world", 1, 5)).toDF("w", "b", "e")
scala> ns.select($"w".substr($"b", $"e" - $"b" + 1) as "demo").show
+-+
| demo|
+-+
|hello|
+-+

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Tue, May 14, 2019 at 5:08 PM Qian He  wrote:

> For example, I have a dataframe with 3 columns: URL, START, END. For each
> url from URL column, I want to fetch a substring of it starting from START
> and ending at END.
> ++--+-+
> |URL|START |END |
> ++--+-+
> |www.amazon.com  |4  |14 |
> |www.yahoo.com |4  |13 |
> |www.amazon.com  |4  |14 |
> |www.google.com|4  |14 |
>
> I have UDF1:
>
> def getSubString = (input: String, start: Int, end: Int) => {
>input.substring(start, end)
> }
> val udf1 = udf(getSubString)
>
> and another UDF2:
>
> def getColSubString()(c1: Column, c2: Column, c3: Column): Column = {
>c1.substr(c2, c3-c2)
> }
>
> Let's assume they can both generate the result I want. But, from performance 
> perspective, is there any difference between those two UDFs?
>
>
>


What is the difference for the following UDFs?

2019-05-14 Thread Qian He
For example, I have a dataframe with 3 columns: URL, START, END. For each
url from URL column, I want to fetch a substring of it starting from START
and ending at END.
++--+-+
|URL|START |END |
++--+-+
|www.amazon.com  |4  |14 |
|www.yahoo.com |4  |13 |
|www.amazon.com  |4  |14 |
|www.google.com|4  |14 |

I have UDF1:

def getSubString = (input: String, start: Int, end: Int) => {
   input.substring(start, end)
}
val udf1 = udf(getSubString)

and another UDF2:

def getColSubString()(c1: Column, c2: Column, c3: Column): Column = {
   c1.substr(c2, c3-c2)
}

Let's assume they can both generate the result I want. But, from
performance perspective, is there any difference between those two
UDFs?


Re: Difference between 'cores' config params: spark submit on k8s

2019-04-20 Thread Li Gao
hi Battini,

The limit is a k8s construct that tells k8s how much cpu/cores your driver
*can* consume.

when you have the same value for 'spark.driver.cores' and '
spark.kubernetes.driver.limit.cores' your driver then runs at the
'Guranteed' k8s quality of service class, which can make your driver less
chance gets evicted by the scheduler.

The same goes with the executor settings.

https://kubernetes.io/docs/tasks/configure-pod-container/quality-service-pod/

the QoS Guarantee is important when you are a mutitenant k8s cluster in
production.

Cheers,
Li


On Thu, Mar 7, 2019, 1:53 PM Battini Lakshman 
wrote:

> Hello,
>
> I understand we need to specify the 'spark.kubernetes.driver.limit.cores'
> and 'spark.kubernetes.executor.limit.cores' config parameters while
> submitting spark on k8s namespace with resource quota applied.
>
> There are also other config parameters 'spark.driver.cores' and
> 'spark.executor.cores' mentioned in documentation. What is the difference
> between '' and 'spark.kubernetes.driver.limit.cores' please.
>
> Thanks!
>
> Best Regards,
> Lakshman B.
>


Re: Difference between Checkpointing and Persist

2019-04-19 Thread Gene Pang
Hi Subash,

I'm not sure how the checkpointing works, but with
StorageLevel.MEMORY_AND_DISK, Spark will store the RDD in on-heap memory,
and spill to disk if necessary. However, the data is only usable by that
Spark job. Saving the RDD will write the data out to an external storage
system, like HDFS or Alluxio
<http://www.alluxio.org/docs/1.8/en/compute/Spark.html?utm_source=spark>.

There are some advantages of saving the RDD, mainly allowing different jobs
or even different frameworks to read that data. One possibility is to save
the RDD to Alluxio, which can store the data in-memory, improving the
throughput by avoiding the disk. Here is an article discussing different
ways to store RDDs
<http://www.alluxio.com/blog/effective-spark-rdds-with-alluxio?utm_source=spark>
.

Thanks,
Gene

On Thu, Apr 18, 2019 at 10:49 AM Subash Prabakar 
wrote:

> Hi All,
>
> I have a doubt about checkpointing and persist/saving.
>
> Say we have one RDD - containing huge data,
> 1. We checkpoint and perform join
> 2. We persist as StorageLevel.MEMORY_AND_DISK and perform join
> 3. We save that intermediate RDD and perform join (using same RDD - saving
> is to just persist intermediate result before joining)
>
>
> Which of the above is faster and whats the difference?
>
>
> Thanks,
> Subash
>


Re: Difference between Checkpointing and Persist

2019-04-18 Thread Vadim Semenov
saving/checkpointing would be preferable in case of a big data set because:

- the RDD gets saved to HDFS and the DAG gets truncated so if some
partitions/executors fail it won't result in recomputing everything

- you don't use memory for caching therefore the JVM heap is going to be
smaller which helps GC and overall there'll be more memory for other
operations

- by saving to HDFS you're removing potential hotspots since partitions can
be fetched from many DataNodes vs when you get a hot partition that gets
requested a lot by other executors you may end up with an overwhelmed
executor

> We save that intermediate RDD and perform join (using same RDD - saving
is to just persist intermediate result before joining)
Checkpointing is essentially saving the RDD and reading it back, however
you can't read checkpointed data if the job failed so it'd be nice to have
one part of the join saved in case of potential issues.

Overall, in my opinion, when working with big joins you should pay more
attention to reliability and fault-tolerance rather than pure speed as the
probability of having issues grows with increasing the dataset size and
cluster size.

On Thu, Apr 18, 2019 at 1:49 PM Subash Prabakar 
wrote:

> Hi All,
>
> I have a doubt about checkpointing and persist/saving.
>
> Say we have one RDD - containing huge data,
> 1. We checkpoint and perform join
> 2. We persist as StorageLevel.MEMORY_AND_DISK and perform join
> 3. We save that intermediate RDD and perform join (using same RDD - saving
> is to just persist intermediate result before joining)
>
>
> Which of the above is faster and whats the difference?
>
>
> Thanks,
> Subash
>


-- 
Sent from my iPhone


Re: Difference between Checkpointing and Persist

2019-04-18 Thread Jack Kolokasis

Hi,

    in my point of view a good approach is first persist your data in 
StorageLevel.Memory_And_Disk and then perform join. This will accelerate 
your computation because data will be presented in memory and in your 
local intermediate storage device.


--Iacovos

On 4/18/19 8:49 PM, Subash Prabakar wrote:

Hi All,

I have a doubt about checkpointing and persist/saving.

Say we have one RDD - containing huge data,
1. We checkpoint and perform join
2. We persist as StorageLevel.MEMORY_AND_DISK and perform join
3. We save that intermediate RDD and perform join (using same RDD - 
saving is to just persist intermediate result before joining)



Which of the above is faster and whats the difference?


Thanks,
Subash


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Difference between Checkpointing and Persist

2019-04-18 Thread Subash Prabakar
Hi All,

I have a doubt about checkpointing and persist/saving.

Say we have one RDD - containing huge data,
1. We checkpoint and perform join
2. We persist as StorageLevel.MEMORY_AND_DISK and perform join
3. We save that intermediate RDD and perform join (using same RDD - saving
is to just persist intermediate result before joining)


Which of the above is faster and whats the difference?


Thanks,
Subash


Re: what is the difference between udf execution and map(someLambda)?

2019-03-18 Thread Bobby Evans
Map and flatmap are RDD operations, a UDF is a dataframe operation.  The
big difference from a performance perspective is in the query optimizer.  A
udf defines the set of input fields it needs and the set of output fields
it will produce, map operates on the entire row at a time.  This means the
optimizer can move operations around, and potentially drop columns earlier
to try and make the overall processing more efficient.  A map operation
requires the entire row as input so the optimizer cannot do anything to it,
and does not know what the output is going to look like unless you
explicitly tell it.  But in reality, udfs are compiled down to map
operations on an RDD with some glue code to get the columns in the correct
place, so there should be little performance difference if you can manually
build a query that is similar to what the catalyst optimizer would have
built.

On Sun, Mar 17, 2019 at 1:42 PM kant kodali  wrote:

> Hi All,
>
> I am wondering what is the difference between UDF execution and
> map(someLambda)? you can assume someLambda ~= UDF. Any performance
> difference?
>
> Thanks!
>
>
>


what is the difference between udf execution and map(someLambda)?

2019-03-17 Thread kant kodali
Hi All,

I am wondering what is the difference between UDF execution and
map(someLambda)? you can assume someLambda ~= UDF. Any performance
difference?

Thanks!


Difference between 'cores' config params: spark submit on k8s

2019-03-07 Thread Battini Lakshman
Hello,

I understand we need to specify the 'spark.kubernetes.driver.limit.cores'
and 'spark.kubernetes.executor.limit.cores' config parameters while
submitting spark on k8s namespace with resource quota applied.

There are also other config parameters 'spark.driver.cores' and
'spark.executor.cores' mentioned in documentation. What is the difference
between 'spark.driver.cores' and 'spark.kubernetes.driver.limit.cores'
please.

Thanks!

Best Regards,
Lakshman B.


Difference between One map vs multiple maps

2019-03-04 Thread Yeikel
Considering that I have a Dataframe df , I could run
df.map(operation1).map(operation2) or run df.map(logic for both operations). 
In addition , I could also run df.map(operation3) where operation3 would be
:

return operation2(operation1())


Similarly , with UDFs, I could build a UDF that does two things or two
different ones and call them sequentially. 

Is there any performance differences (like casting back and forth from
Tungsten?) between the two? Or should I be more focused about separation of
concerns than performance for this case?

Thank you.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Difference between Typed and untyped transformation in dataset API

2019-02-23 Thread email
>From what I understand , if the transformation is untyped it will return a 
>Dataframe , otherwise it will return a Dataset.  In the source code you will 
>see that return type is a Dataframe instead of a Dataset and they should also 
>be annotated with @group untypedrel. Thus , you could check the signature of 
>the method to determine if it is untyped or not. 

 

In general , anything that changes the type of a column or adds a new column in 
a Dataset will be untyped. The idea of a Dataset is to stay constant when it 
comes to the schema. The moment you try to modify the schema , we need to 
fallback to a Dataframe. 

 

For example , withColumn is untyped because it transforms the Dataset(typed) to 
an untyped structure(Dataframe). 

 

From: Akhilanand  
Sent: Thursday, February 21, 2019 7:35 PM
To: user 
Subject: Difference between Typed and untyped transformation in dataset API

 

What is the key difference between Typed and untyped transformation in dataset 
API?

How do I determine if its typed or untyped?

Any gotchas when to use what apart from the reason that it does the job for me?

 

 



Difference between Typed and untyped transformation in dataset API

2019-02-21 Thread Akhilanand
What is the key difference between Typed and untyped transformation in
dataset API?
How do I determine if its typed or untyped?
Any gotchas when to use what apart from the reason that it does the job for
me?


Re: Difference between dataset and dataframe

2019-02-19 Thread Vadim Semenov
>
> 1) Is there any difference in terms performance when we use datasets over
> dataframes? Is it significant to choose 1 over other. I do realise there
> would be some overhead due case classes but how significant is that? Are
> there any other implications.


As long as you use the DataFrame functions the performance is going to be
the same since they operate directly with Tungsten rows, but as soon as you
try to do any typed-operations like `.map` performance is going to be hit
because Spark would have to create Java objects from Tungsten memory.

2) Is the Tungsten code generation done only for datasets or is there any
> internal process to generate bytecode for dataframes as well? Since its
> related to jvm , I think its just for datasets but I couldn’t find anything
> that tells it specifically. If its just for datasets , does that mean we
> miss out on the project tungsten optimisation for dataframes?


Code generation is done for both



On Mon, Feb 18, 2019 at 9:09 PM Akhilanand  wrote:

>
> Hello,
>
> I have been recently exploring about dataset and dataframes. I would
> really appreciate if someone could answer these questions:
>
> 1) Is there any difference in terms performance when we use datasets over
> dataframes? Is it significant to choose 1 over other. I do realise there
> would be some overhead due case classes but how significant is that? Are
> there any other implications.
>
> 2) Is the Tungsten code generation done only for datasets or is there any
> internal process to generate bytecode for dataframes as well? Since its
> related to jvm , I think its just for datasets but I couldn’t find anything
> that tells it specifically. If its just for datasets , does that mean we
> miss out on the project tungsten optimisation for dataframes?
>
>
>
> Regards,
> Akhilanand BV
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Sent from my iPhone


Re: Difference between dataset and dataframe

2019-02-19 Thread Koert Kuipers
dataframe operations are expressed as transformations on columns, basically
on locations inside the row objects. this specificity can be exploited by
catalyst to optimize these operations. since catalyst knows exactly what
positions in the row object you modified or not at any point and often also
what operation you did on them it can reason about these and do
optimizations like re-ordering of operations, compiling operations, and
running operations on serialized/internal formats.

when you use case classes and lamba operations not as much information is
available and the operation cannot be performed on the internal
representation. so conversions and/or deserializations are necessary.

On Tue, Feb 19, 2019 at 12:59 AM Lunagariya, Dhaval <
dhaval.lunagar...@citi.com> wrote:

> It does for dataframe also. Please try example.
>
>
>
> df1 = spark.range(2, 1000, 2)
>
> df2 = spark.range(2, 1000, 4)
>
> step1 = df1.repartition(5)
>
> step12 = df2.repartition(6)
>
> step2 = step1.selectExpr("id * 5 as id")
>
> step3 = step2.join(step12, ["id"])
>
> step4 = step3.selectExpr("sum(id)")
>
> step4.collect()
>
>
>
> step4._jdf.queryExecution().debug().codegen()
>
>
>
> You will see the generated code.
>
>
>
> Regards,
>
> Dhaval
>
>
>
> *From:* [External] Akhilanand 
> *Sent:* Tuesday, February 19, 2019 10:29 AM
> *To:* Koert Kuipers 
> *Cc:* user 
> *Subject:* Re: Difference between dataset and dataframe
>
>
>
> Thanks for the reply. But can you please tell why dataframes are
> performant than datasets? Any specifics would be helpful.
>
>
>
> Also, could you comment on the tungsten code gen part of my question?
>
>
> On Feb 18, 2019, at 10:47 PM, Koert Kuipers  wrote:
>
> in the api DataFrame is just Dataset[Row]. so this makes you think Dataset
> is the generic api. interestingly enough under the hood everything is
> really Dataset[Row], so DataFrame is really the "native" language for spark
> sql, not Dataset.
>
>
>
> i find DataFrame to be significantly more performant. in general if you
> use Dataset you miss out on some optimizations. also Encoders are not very
> pleasant to work with.
>
>
>
> On Mon, Feb 18, 2019 at 9:09 PM Akhilanand 
> wrote:
>
>
> Hello,
>
> I have been recently exploring about dataset and dataframes. I would
> really appreciate if someone could answer these questions:
>
> 1) Is there any difference in terms performance when we use datasets over
> dataframes? Is it significant to choose 1 over other. I do realise there
> would be some overhead due case classes but how significant is that? Are
> there any other implications.
>
> 2) Is the Tungsten code generation done only for datasets or is there any
> internal process to generate bytecode for dataframes as well? Since its
> related to jvm , I think its just for datasets but I couldn’t find anything
> that tells it specifically. If its just for datasets , does that mean we
> miss out on the project tungsten optimisation for dataframes?
>
>
>
> Regards,
> Akhilanand BV
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


RE: Difference between dataset and dataframe

2019-02-18 Thread Lunagariya, Dhaval
It does for dataframe also. Please try example.

df1 = spark.range(2, 1000, 2)
df2 = spark.range(2, 1000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")
step4.collect()

step4._jdf.queryExecution().debug().codegen()

You will see the generated code.

Regards,
Dhaval

From: [External] Akhilanand 
Sent: Tuesday, February 19, 2019 10:29 AM
To: Koert Kuipers 
Cc: user 
Subject: Re: Difference between dataset and dataframe

Thanks for the reply. But can you please tell why dataframes are performant 
than datasets? Any specifics would be helpful.

Also, could you comment on the tungsten code gen part of my question?

On Feb 18, 2019, at 10:47 PM, Koert Kuipers 
mailto:ko...@tresata.com>> wrote:
in the api DataFrame is just Dataset[Row]. so this makes you think Dataset is 
the generic api. interestingly enough under the hood everything is really 
Dataset[Row], so DataFrame is really the "native" language for spark sql, not 
Dataset.

i find DataFrame to be significantly more performant. in general if you use 
Dataset you miss out on some optimizations. also Encoders are not very pleasant 
to work with.

On Mon, Feb 18, 2019 at 9:09 PM Akhilanand 
mailto:akhilanand...@gmail.com>> wrote:

Hello,

I have been recently exploring about dataset and dataframes. I would really 
appreciate if someone could answer these questions:

1) Is there any difference in terms performance when we use datasets over 
dataframes? Is it significant to choose 1 over other. I do realise there would 
be some overhead due case classes but how significant is that? Are there any 
other implications.

2) Is the Tungsten code generation done only for datasets or is there any 
internal process to generate bytecode for dataframes as well? Since its related 
to jvm , I think its just for datasets but I couldn’t find anything that tells 
it specifically. If its just for datasets , does that mean we miss out on the 
project tungsten optimisation for dataframes?



Regards,
Akhilanand BV

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>


Recall: Difference between dataset and dataframe

2019-02-18 Thread Lunagariya, Dhaval
Lunagariya, Dhaval [CCC-OT] would like to recall the message, "Difference 
between dataset and dataframe".
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Difference between dataset and dataframe

2019-02-18 Thread Lunagariya, Dhaval
It does for dataframe also. Please try example.

df1 = spark.range(2, 1000, 2)
df2 = spark.range(2, 1000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")
step4.collect()

step4._jdf.queryExecution().debug().codegen()

You will see the generated code.

Best Regards
Dhaval Lunagariya
CitiRisk Retail, ETS, Pune
Desk : +91-20-6709 8557 | M : +91 7755966916

From: [External] Akhilanand 
Sent: Tuesday, February 19, 2019 10:29 AM
To: Koert Kuipers 
Cc: user 
Subject: Re: Difference between dataset and dataframe

Thanks for the reply. But can you please tell why dataframes are performant 
than datasets? Any specifics would be helpful.

Also, could you comment on the tungsten code gen part of my question?

On Feb 18, 2019, at 10:47 PM, Koert Kuipers 
mailto:ko...@tresata.com>> wrote:
in the api DataFrame is just Dataset[Row]. so this makes you think Dataset is 
the generic api. interestingly enough under the hood everything is really 
Dataset[Row], so DataFrame is really the "native" language for spark sql, not 
Dataset.

i find DataFrame to be significantly more performant. in general if you use 
Dataset you miss out on some optimizations. also Encoders are not very pleasant 
to work with.

On Mon, Feb 18, 2019 at 9:09 PM Akhilanand 
mailto:akhilanand...@gmail.com>> wrote:

Hello,

I have been recently exploring about dataset and dataframes. I would really 
appreciate if someone could answer these questions:

1) Is there any difference in terms performance when we use datasets over 
dataframes? Is it significant to choose 1 over other. I do realise there would 
be some overhead due case classes but how significant is that? Are there any 
other implications.

2) Is the Tungsten code generation done only for datasets or is there any 
internal process to generate bytecode for dataframes as well? Since its related 
to jvm , I think its just for datasets but I couldn’t find anything that tells 
it specifically. If its just for datasets , does that mean we miss out on the 
project tungsten optimisation for dataframes?



Regards,
Akhilanand BV

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>


Re: Difference between dataset and dataframe

2019-02-18 Thread Akhilanand
Thanks for the reply. But can you please tell why dataframes are performant 
than datasets? Any specifics would be helpful.

Also, could you comment on the tungsten code gen part of my question?


> On Feb 18, 2019, at 10:47 PM, Koert Kuipers  wrote:
> 
> in the api DataFrame is just Dataset[Row]. so this makes you think Dataset is 
> the generic api. interestingly enough under the hood everything is really 
> Dataset[Row], so DataFrame is really the "native" language for spark sql, not 
> Dataset.
> 
> i find DataFrame to be significantly more performant. in general if you use 
> Dataset you miss out on some optimizations. also Encoders are not very 
> pleasant to work with.
> 
>> On Mon, Feb 18, 2019 at 9:09 PM Akhilanand  wrote:
>> 
>> Hello, 
>> 
>> I have been recently exploring about dataset and dataframes. I would really 
>> appreciate if someone could answer these questions:
>> 
>> 1) Is there any difference in terms performance when we use datasets over 
>> dataframes? Is it significant to choose 1 over other. I do realise there 
>> would be some overhead due case classes but how significant is that? Are 
>> there any other implications. 
>> 
>> 2) Is the Tungsten code generation done only for datasets or is there any 
>> internal process to generate bytecode for dataframes as well? Since its 
>> related to jvm , I think its just for datasets but I couldn’t find anything 
>> that tells it specifically. If its just for datasets , does that mean we 
>> miss out on the project tungsten optimisation for dataframes?
>> 
>> 
>> 
>> Regards,
>> Akhilanand BV
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


Re: Difference between dataset and dataframe

2019-02-18 Thread Koert Kuipers
in the api DataFrame is just Dataset[Row]. so this makes you think Dataset
is the generic api. interestingly enough under the hood everything is
really Dataset[Row], so DataFrame is really the "native" language for spark
sql, not Dataset.

i find DataFrame to be significantly more performant. in general if you use
Dataset you miss out on some optimizations. also Encoders are not very
pleasant to work with.

On Mon, Feb 18, 2019 at 9:09 PM Akhilanand  wrote:

>
> Hello,
>
> I have been recently exploring about dataset and dataframes. I would
> really appreciate if someone could answer these questions:
>
> 1) Is there any difference in terms performance when we use datasets over
> dataframes? Is it significant to choose 1 over other. I do realise there
> would be some overhead due case classes but how significant is that? Are
> there any other implications.
>
> 2) Is the Tungsten code generation done only for datasets or is there any
> internal process to generate bytecode for dataframes as well? Since its
> related to jvm , I think its just for datasets but I couldn’t find anything
> that tells it specifically. If its just for datasets , does that mean we
> miss out on the project tungsten optimisation for dataframes?
>
>
>
> Regards,
> Akhilanand BV
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Difference between dataset and dataframe

2019-02-18 Thread Akhilanand


Hello, 

I have been recently exploring about dataset and dataframes. I would really 
appreciate if someone could answer these questions:

1) Is there any difference in terms performance when we use datasets over 
dataframes? Is it significant to choose 1 over other. I do realise there would 
be some overhead due case classes but how significant is that? Are there any 
other implications. 

2) Is the Tungsten code generation done only for datasets or is there any 
internal process to generate bytecode for dataframes as well? Since its related 
to jvm , I think its just for datasets but I couldn’t find anything that tells 
it specifically. If its just for datasets , does that mean we miss out on the 
project tungsten optimisation for dataframes?



Regards,
Akhilanand BV

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [SPARK SQL] Difference between 'Hive on spark' and Spark SQL

2018-12-20 Thread Jörn Franke
If you have already a lot of queries then it makes sense to look at Hive (in a 
recent version)+TEZ+Llap and all tables in ORC format partitioned and sorted on 
filter columns. That would be the most easiest way and can improve performance 
significantly .

If you want to use Spark, eg because you want to use additional features and it 
could become part of your strategy justifying the investment:
* hive on Spark - I don’t think it is as much used as the above combination. I 
am also not sure if it supports recent Spark versions and all Hive features. It 
would also not really allow you to use Spark features beyond Hive features . 
Basically you just set a different engine in Hive and execute the queries as 
you do now. 
* spark.sql : you would have to write all your Hive queries as Spark queries 
and potentially integrate or rewrite HiveUdfs. Given that you can use 
HiveContext to execute queries it may not require so much effort to rewrite 
then. The pushdown possibilities are available in Spark. You have to write 
Spark programs to execute queries. There are some servers that you can connect 
to using SQL queries but their maturity varies.

In the end you have to make an assessment of all your queries and investigate 
if they can be executed using either of the options

> Am 20.12.2018 um 08:17 schrieb l...@china-inv.cn:
> 
> Hi, All, 
> 
> We are starting to migrate our data to Hadoop platform in hoping to use 'Big 
> Data' technologies to  
> improve our business. 
> 
> We are new in the area and want to get some help from you. 
> 
> Currently all our data is put into Hive and some complicated SQL query 
> statements are run daily. 
> 
> We want to improve the performance of these queries and have two options at 
> hand: 
> a. Turn on 'Hive on spark' feature and run HQLs and 
> b. Run those query statements with spark SQL 
> 
> What the difference between these options? 
> 
> Another question is: 
> There is a hive setting 'hive.optimze.ppd' to enable 'predicated pushdown' 
> query optimize 
> Is ther equivalent option in spark sql or the same setting also works for 
> spark SQL? 
> 
> Thanks in advance 
> 
> Boying 
> 
> 
>
> 本邮件内容包含保密信息。如阁下并非拟发送的收件人,请您不要阅读、保存、对外披露或复制本邮件的任何内容,或者打开本邮件的任何附件。请即回复邮件告知发件人,并立刻将该邮件及其附件从您的电脑系统中全部删除,不胜感激。
> 
>   
> This email message may contain confidential and/or privileged information. If 
> you are not the intended recipient, please do not read, save, forward, 
> disclose or copy the contents of this email or open any file attached to this 
> email. We will be grateful if you could advise the sender immediately by 
> replying this email, and delete this email and any attachment or links to 
> this email completely and immediately from your computer system.
> 
> 
> 


[SPARK SQL] Difference between 'Hive on spark' and Spark SQL

2018-12-19 Thread luby
Hi, All,

We are starting to migrate our data to Hadoop platform in hoping to use 
'Big Data' technologies to 
improve our business.

We are new in the area and want to get some help from you.

Currently all our data is put into Hive and some complicated SQL query 
statements are run daily.

We want to improve the performance of these queries and have two options 
at hand: 
a. Turn on 'Hive on spark' feature and run HQLs and
b. Run those query statements with spark SQL

What the difference between these options?

Another question is:
There is a hive setting 'hive.optimze.ppd' to enable 'predicated pushdown' 
query optimize
Is ther equivalent option in spark sql or the same setting also works for 
spark SQL?

Thanks in advance

Boying


 
本邮件内容包含保密信息。如阁下并非拟发送的收件人,请您不要阅读、保存、对外
披露或复制本邮件的任何内容,或者打开本邮件的任何附件。请即回复邮件告知发件
人,并立刻将该邮件及其附件从您的电脑系统中全部删除,不胜感激。

 
This email message may contain confidential and/or privileged information. 
If you are not the intended recipient, please do not read, save, forward, 
disclose or copy the contents of this email or open any file attached to 
this email. We will be grateful if you could advise the sender immediately 
by replying this email, and delete this email and any attachment or links 
to this email completely and immediately from your computer system. 





SPARK-25959 - Difference in featureImportances results on computed vs saved models

2018-11-06 Thread Suraj Nayak
Hi Spark Users,

I tried to implement GBT and found that the feature Importance computed
while the model was fit is different when the same model was saved into a
storage and loaded back.



I also found that once the persistent model is loaded and saved back again
and loaded, the feature importance remains the same.



Not sure if its bug while storing and reading the model first time or am
missing some parameter that need to be set before saving the model (thus
model is picking some defaults - causing feature importance to change)



*Below is the test code:*

val testDF = Seq(
(1, 3, 2, 1, 1),
(3, 2, 1, 2, 0),
(2, 2, 1, 1, 0),
(3, 4, 2, 2, 0),
(2, 2, 1, 3, 1)
).toDF("a", "b", "c", "d", "e")


val featureColumns = testDF.columns.filter(_ != "e")
// Assemble the features into a vector
val assembler = new VectorAssembler().setInputCols
(featureColumns).setOutputCol("features")
// Transform the data to get the feature data set
val featureDF = assembler.transform(testDF)

// Train a GBT model.
val gbt = new GBTClassifier()
.setLabelCol("e")
.setFeaturesCol("features")
.setMaxDepth(2)
.setMaxBins(5)
.setMaxIter(10)
.setSeed(10)
.fit(featureDF)

gbt.transform(featureDF).show(false)

// Write out the model

featureColumns.zip(gbt.featureImportances.toArray).sortBy(-_
._2).take(20).foreach(println)
/* Prints

(d,0.5931875075767403)
(a,0.3747184548362353)
(b,0.03209403758702444)
(c,0.0)

*/
gbt.write.overwrite().save("file:///tmp/test123")

println("Reading model again")
val gbtload = GBTClassificationModel.load("file:///tmp/test123")

featureColumns.zip(gbtload.featureImportances.toArray).sortB
y(-_._2).take(20).foreach(println)

/*

Prints

(d,0.6455841215290767)
(a,0.3316126797964181)
(b,0.022803198674505094)
(c,0.0)

*/


gbtload.write.overwrite().save("file:///tmp/test123_rewrite")

val gbtload2 = GBTClassificationModel.load("file:///tmp/test123_rewrite")

featureColumns.zip(gbtload2.featureImportances.toArray).sort
By(-_._2).take(20).foreach(println)

/* prints
(d,0.6455841215290767)
(a,0.3316126797964181)
(b,0.022803198674505094)
(c,0.0)

*/

Any help is appreciated in making sure the feature importance is
maintenaned as is while the model is first stored.

Thanks!


Re: java vs scala for Apache Spark - is there a performance difference ?

2018-10-30 Thread Jörn Franke
Older versions of Spark had indeed a lower performance on Python and R due to a 
conversion need between JVM datatypes and python/r datatypes. This changed in 
Spark 2.2, I think, with the integration of Apache Arrow.  However, what you do 
after the conversion in those languages can be still slower than, for instance, 
in Java if you do not use Spark only functions. It could be also faster (eg you 
use a python module implemented natively in C and if there is no translation 
into c datatypes needed). 
Scala has in certain cases a more elegant syntax than Java (if you do not use 
Lambda). Sometimes this elegant syntax can lead to (unintentional) inefficient 
things for which there is a better way to express them (eg implicit 
conversions, use of collection methods etc). However there are better ways and 
you just have to spot these issues in the source code and address them, if 
needed. 
So a comparison does not make really sense between those languages - it always 
depends.

> Am 30.10.2018 um 07:00 schrieb akshay naidu :
> 
> how about Python. 
> java vs scala vs python vs R
> which is better.
> 
>> On Sat, Oct 27, 2018 at 3:34 AM karan alang  wrote:
>> Hello 
>> - is there a "performance" difference when using Java or Scala for Apache 
>> Spark ?
>> 
>> I understand, there are other obvious differences (less code with scala, 
>> easier to focus on logic etc), 
>> but wrt performance - i think there would not be much of a difference since 
>> both of them are JVM based, 
>> pls. let me know if this is not the case.
>> 
>> thanks!


Re: java vs scala for Apache Spark - is there a performance difference ?

2018-10-30 Thread akshay naidu
how about Python.
java vs scala vs python vs R
which is better.

On Sat, Oct 27, 2018 at 3:34 AM karan alang  wrote:

> Hello
> - is there a "performance" difference when using Java or Scala for Apache
> Spark ?
>
> I understand, there are other obvious differences (less code with scala,
> easier to focus on logic etc),
> but wrt performance - i think there would not be much of a difference
> since both of them are JVM based,
> pls. let me know if this is not the case.
>
> thanks!
>


Re: java vs scala for Apache Spark - is there a performance difference ?

2018-10-29 Thread Gourav Sengupta
I genuinely do not think that Scala for Spark needs us to be super in
Scala. There is infact a tutorial called as "Just enough Scala for Spark"
which even with my IQ does not take more than 40 mins to go through. Also
the sytax of Scala is almost always similar to that of Python.

Data processing is much more amenable to functional thinking and therefore
Scala suits best also Spark is written in Scala.

Regards,
Gourav

On Mon, Oct 29, 2018 at 11:33 PM kant kodali  wrote:

> Most people when they compare two different programming languages 99% of
> the time it all seems to boil down to syntax sugar.
>
> Performance I doubt Scala is ever faster than Java given that Scala likes
> Heap more than Java. I had also written some pointless micro-benchmarking
> code like (Random String Generation, hash computations, etc..) on Java,
> Scala and Golang and Java had outperformed both Scala and Golang as well on
> many occasions.
>
> Now that Java 11 had released things seem to get even better given the
> startup time is also very low.
>
> I am happy to change my view as long as I can see some code and benchmarks!
>
>
>
> On Mon, Oct 29, 2018 at 1:58 PM Jean Georges Perrin  wrote:
>
>> did not see anything, but curious if you find something.
>>
>> I think one of the big benefit of using Java, for data engineering in the
>> context of  Spark, is that you do not have to train a lot of your team to
>> Scala. Now if you want to do data science, Java is probably not the best
>> tool yet...
>>
>> On Oct 26, 2018, at 6:04 PM, karan alang  wrote:
>>
>> Hello
>> - is there a "performance" difference when using Java or Scala for Apache
>> Spark ?
>>
>> I understand, there are other obvious differences (less code with scala,
>> easier to focus on logic etc),
>> but wrt performance - i think there would not be much of a difference
>> since both of them are JVM based,
>> pls. let me know if this is not the case.
>>
>> thanks!
>>
>>
>>


Re: java vs scala for Apache Spark - is there a performance difference ?

2018-10-29 Thread kant kodali
Most people when they compare two different programming languages 99% of
the time it all seems to boil down to syntax sugar.

Performance I doubt Scala is ever faster than Java given that Scala likes
Heap more than Java. I had also written some pointless micro-benchmarking
code like (Random String Generation, hash computations, etc..) on Java,
Scala and Golang and Java had outperformed both Scala and Golang as well on
many occasions.

Now that Java 11 had released things seem to get even better given the
startup time is also very low.

I am happy to change my view as long as I can see some code and benchmarks!



On Mon, Oct 29, 2018 at 1:58 PM Jean Georges Perrin  wrote:

> did not see anything, but curious if you find something.
>
> I think one of the big benefit of using Java, for data engineering in the
> context of  Spark, is that you do not have to train a lot of your team to
> Scala. Now if you want to do data science, Java is probably not the best
> tool yet...
>
> On Oct 26, 2018, at 6:04 PM, karan alang  wrote:
>
> Hello
> - is there a "performance" difference when using Java or Scala for Apache
> Spark ?
>
> I understand, there are other obvious differences (less code with scala,
> easier to focus on logic etc),
> but wrt performance - i think there would not be much of a difference
> since both of them are JVM based,
> pls. let me know if this is not the case.
>
> thanks!
>
>
>


Re: java vs scala for Apache Spark - is there a performance difference ?

2018-10-29 Thread Jean Georges Perrin
did not see anything, but curious if you find something.

I think one of the big benefit of using Java, for data engineering in the 
context of  Spark, is that you do not have to train a lot of your team to 
Scala. Now if you want to do data science, Java is probably not the best tool 
yet...

> On Oct 26, 2018, at 6:04 PM, karan alang  wrote:
> 
> Hello 
> - is there a "performance" difference when using Java or Scala for Apache 
> Spark ?
> 
> I understand, there are other obvious differences (less code with scala, 
> easier to focus on logic etc), 
> but wrt performance - i think there would not be much of a difference since 
> both of them are JVM based, 
> pls. let me know if this is not the case.
> 
> thanks!



Re: java vs scala for Apache Spark - is there a performance difference ?

2018-10-26 Thread Battini Lakshman
On Oct 27, 2018 3:34 AM, "karan alang"  wrote:

Hello
- is there a "performance" difference when using Java or Scala for Apache
Spark ?

I understand, there are other obvious differences (less code with scala,
easier to focus on logic etc),
but wrt performance - i think there would not be much of a difference since
both of them are JVM based,
pls. let me know if this is not the case.

thanks!


java vs scala for Apache Spark - is there a performance difference ?

2018-10-26 Thread karan alang
Hello
- is there a "performance" difference when using Java or Scala for Apache
Spark ?

I understand, there are other obvious differences (less code with scala,
easier to focus on logic etc),
but wrt performance - i think there would not be much of a difference since
both of them are JVM based,
pls. let me know if this is not the case.

thanks!


Re: Timestamp Difference/operations

2018-10-16 Thread Paras Agarwal
Thanks Srabasti,


I am trying to convert teradata to spark sql.


TERADATA:
select * from Table1 where Date '1974-01-02' > CAST(birth_date AS TIMESTAMP(0)) 
+ (TIME '12:34:34' - TIME '00:00:00' HOUR TO SECOND);

HIVE ( With some tweaks i can write):
SELECT * FROM foodmart.trimmed_employee WHERE Date '1974-01-02' > 
CAST(CAST(CURRENT_DATE AS TIMESTAMP) + (CAST('2000-01-01 12:34:34' AS 
TIMESTAMP) - (CAST('2000-01-01 00:00:00' AS TIMESTAMP))) AS DATE)

SPARK (SO need spark equivalent):

SELECT * FROM foodmart.trimmed_employee WHERE Date '1974-01-02' > 
CAST(CAST(CURRENT_DATE AS TIMESTAMP) + (??) AS DATE)


Need to fill above ?? so that i can process.


Thanks & Regards,

Paras

9130006036


From: Srabasti Banerjee 
Sent: Tuesday, October 16, 2018 6:45:26 AM
To: Paras Agarwal; John Zhuge
Cc: user; dev
Subject: Re: Timestamp Difference/operations

Hi Paras,
Check out the link Spark Scala: DateDiff of two columns by hour or 
minute<https://stackoverflow.com/questions/37058016/spark-scala-datediff-of-two-columns-by-hour-or-minute>
<https://stackoverflow.com/questions/37058016/spark-scala-datediff-of-two-columns-by-hour-or-minute>





Spark Scala: DateDiff of two columns by hour or minute

I have two timestamp columns in a dataframe that I'd like to get the minute 
difference of, or alternatively, the hour difference of. Currently I'm able to 
get the day difference, with rounding, by ...




Looks like you can get the difference in seconds as well.
Hopefully this helps!
Are you looking for a specific usecase? Can you please elaborate with an 
example?

Thanks
Srabasti Banerjee


Sent from Yahoo Mail on 
Android<https://go.onelink.me/107872968?pid=InProduct=Global_Internal_YGrowth_AndroidEmailSig__AndroidUsers_wl=ym_sub1=Internal_sub2=Global_YGrowth_sub3=EmailSignature>

On Sun, Oct 14, 2018 at 23:41, Paras Agarwal
 wrote:

Thanks John,


Actually need full date and  time difference not just date difference,

which I guess not supported.


Let me know if its possible, or any UDF available for the same.


Thanks And Regards,

Paras


From: John Zhuge 
Sent: Friday, October 12, 2018 9:48:47 PM
To: Paras Agarwal
Cc: user; dev
Subject: Re: Timestamp Difference/operations

Yeah, operator "-" does not seem to be supported, however, you can use 
"datediff" function:

In [9]: select datediff(CAST('2000-02-01 12:34:34' AS TIMESTAMP), 
CAST('2000-01-01 00:00:00' AS TIMESTAMP))
Out[9]:
+--+
| datediff(CAST(CAST(2000-02-01 12:34:34 AS TIMESTAMP) AS DATE), 
CAST(CAST(2000-01-01 00:00:00 AS TIMESTAMP) AS DATE)) |
+--+
| 31
   |
+--+

In [10]: select datediff('2000-02-01 12:34:34', '2000-01-01 00:00:00')
Out[10]:
++
| datediff(CAST(2000-02-01 12:34:34 AS DATE), CAST(2000-01-01 00:00:00 AS 
DATE)) |
++
| 31
 |
++

In [11]: select datediff(timestamp '2000-02-01 12:34:34', timestamp '2000-01-01 
00:00:00')
Out[11]:
+--+
| datediff(CAST(TIMESTAMP('2000-02-01 12:34:34.0') AS DATE), 
CAST(TIMESTAMP('2000-01-01 00:00:00.0') AS DATE)) |
+--+
| 31
   |
+--+

On Fri, Oct 12, 2018 at 7:01 AM Paras Agarwal 
mailto:paras.agar...@datametica.com>> wrote:

Hello Spark Community,

Currently in hive we can do operations on Timestamp Like :
CAST('2000-01-01 12:34:34' AS TIMESTAMP) - CAST('2000-01-01 00:00:00' AS 
TIMESTAMP)

Seems its not supporting in spark.
Is there any way available.

Kindly provide some insight on this.


Paras
9130006036


--
John


Re: Timestamp Difference/operations

2018-10-15 Thread Brandon Geise
How about 

 

select unix_timestamp(timestamp2) – unix_timestamp(timestamp1)?

 

From: Paras Agarwal 
Date: Monday, October 15, 2018 at 2:41 AM
To: John Zhuge 
Cc: user , dev 
Subject: Re: Timestamp Difference/operations

 

Thanks John,

 

Actually need full date and  time difference not just date difference, 

which I guess not supported.

 

Let me know if its possible, or any UDF available for the same.

 

Thanks And Regards,

Paras

From: John Zhuge 
Sent: Friday, October 12, 2018 9:48:47 PM
To: Paras Agarwal
Cc: user; dev
Subject: Re: Timestamp Difference/operations 

 

Yeah, operator "-" does not seem to be supported, however, you can use 
"datediff" function: 

 

In [9]: select datediff(CAST('2000-02-01 12:34:34' AS TIMESTAMP), 
CAST('2000-01-01 00:00:00' AS TIMESTAMP))

Out[9]:

+--+

| datediff(CAST(CAST(2000-02-01 12:34:34 AS TIMESTAMP) AS DATE), 
CAST(CAST(2000-01-01 00:00:00 AS TIMESTAMP) AS DATE)) |

+--+

| 31
   |

+--+

 

In [10]: select datediff('2000-02-01 12:34:34', '2000-01-01 00:00:00')

Out[10]:

++

| datediff(CAST(2000-02-01 12:34:34 AS DATE), CAST(2000-01-01 00:00:00 AS 
DATE)) |

++

| 31
 |

++

 

In [11]: select datediff(timestamp '2000-02-01 12:34:34', timestamp '2000-01-01 
00:00:00')

Out[11]:

+--+

| datediff(CAST(TIMESTAMP('2000-02-01 12:34:34.0') AS DATE), 
CAST(TIMESTAMP('2000-01-01 00:00:00.0') AS DATE)) |

+--+

| 31
   |

+--+

 

On Fri, Oct 12, 2018 at 7:01 AM Paras Agarwal  
wrote:

Hello Spark Community,

Currently in hive we can do operations on Timestamp Like :
CAST('2000-01-01 12:34:34' AS TIMESTAMP) - CAST('2000-01-01 00:00:00' AS 
TIMESTAMP)

Seems its not supporting in spark.
Is there any way available.

Kindly provide some insight on this.


Paras
9130006036


 

-- 

John



Re: Timestamp Difference/operations

2018-10-15 Thread Paras Agarwal
Thanks John,


Actually need full date and  time difference not just date difference,

which I guess not supported.


Let me know if its possible, or any UDF available for the same.


Thanks And Regards,

Paras


From: John Zhuge 
Sent: Friday, October 12, 2018 9:48:47 PM
To: Paras Agarwal
Cc: user; dev
Subject: Re: Timestamp Difference/operations

Yeah, operator "-" does not seem to be supported, however, you can use 
"datediff" function:

In [9]: select datediff(CAST('2000-02-01 12:34:34' AS TIMESTAMP), 
CAST('2000-01-01 00:00:00' AS TIMESTAMP))
Out[9]:
+--+
| datediff(CAST(CAST(2000-02-01 12:34:34 AS TIMESTAMP) AS DATE), 
CAST(CAST(2000-01-01 00:00:00 AS TIMESTAMP) AS DATE)) |
+--+
| 31
   |
+--+

In [10]: select datediff('2000-02-01 12:34:34', '2000-01-01 00:00:00')
Out[10]:
++
| datediff(CAST(2000-02-01 12:34:34 AS DATE), CAST(2000-01-01 00:00:00 AS 
DATE)) |
++
| 31
 |
++

In [11]: select datediff(timestamp '2000-02-01 12:34:34', timestamp '2000-01-01 
00:00:00')
Out[11]:
+--+
| datediff(CAST(TIMESTAMP('2000-02-01 12:34:34.0') AS DATE), 
CAST(TIMESTAMP('2000-01-01 00:00:00.0') AS DATE)) |
+--+
| 31
   |
+--+

On Fri, Oct 12, 2018 at 7:01 AM Paras Agarwal 
mailto:paras.agar...@datametica.com>> wrote:

Hello Spark Community,

Currently in hive we can do operations on Timestamp Like :
CAST('2000-01-01 12:34:34' AS TIMESTAMP) - CAST('2000-01-01 00:00:00' AS 
TIMESTAMP)

Seems its not supporting in spark.
Is there any way available.

Kindly provide some insight on this.


Paras
9130006036


--
John


Re: Timestamp Difference/operations

2018-10-12 Thread John Zhuge
Yeah, operator "-" does not seem to be supported, however, you can use
"datediff" function:

In [9]: select datediff(CAST('2000-02-01 12:34:34' AS TIMESTAMP),
CAST('2000-01-01 00:00:00' AS TIMESTAMP))
Out[9]:
+--+
| datediff(CAST(CAST(2000-02-01 12:34:34 AS TIMESTAMP) AS DATE),
CAST(CAST(2000-01-01 00:00:00 AS TIMESTAMP) AS DATE)) |
+--+
| 31
   |
+--+

In [10]: select datediff('2000-02-01 12:34:34', '2000-01-01 00:00:00')
Out[10]:
++
| datediff(CAST(2000-02-01 12:34:34 AS DATE), CAST(2000-01-01 00:00:00 AS
DATE)) |
++
| 31
 |
++

In [11]: select datediff(timestamp '2000-02-01 12:34:34', timestamp
'2000-01-01 00:00:00')
Out[11]:
+--+
| datediff(CAST(TIMESTAMP('2000-02-01 12:34:34.0') AS DATE),
CAST(TIMESTAMP('2000-01-01 00:00:00.0') AS DATE)) |
+--+
| 31
   |
+--+

On Fri, Oct 12, 2018 at 7:01 AM Paras Agarwal 
wrote:

> Hello Spark Community,
>
> Currently in hive we can do operations on Timestamp Like :
> CAST('2000-01-01 12:34:34' AS TIMESTAMP) - CAST('2000-01-01 00:00:00' AS
> TIMESTAMP)
>
> Seems its not supporting in spark.
> Is there any way available.
>
> Kindly provide some insight on this.
>
>
> Paras
> 9130006036
>


-- 
John


Timestamp Difference/operations

2018-10-12 Thread Paras Agarwal
Hello Spark Community,

Currently in hive we can do operations on Timestamp Like :
CAST('2000-01-01 12:34:34' AS TIMESTAMP) - CAST('2000-01-01 00:00:00' AS 
TIMESTAMP)

Seems its not supporting in spark.
Is there any way available.

Kindly provide some insight on this.


Paras
9130006036


Re: Parallelism: behavioural difference in version 1.2 and 2.1!?

2018-08-29 Thread Jeevan K. Srivatsa
Dear Apostolos,

Thanks for the response!

Our version is built on 2.1, the problem is that the state-of-the-art
system I'm trying to compare is built on the version 1.2. So I have to deal
with it.

If I understand the level of parallelism correctly, --total-executor-cores
is set to the number or workers multiplied by the executor core of each
worker, in this case, 32 as well. I make use of the similar script in both
the cases, so it shouldn't change.

Thanks and regards,
Jeevan K. Srivatsa


On Wed, 29 Aug 2018 at 16:07, Apostolos N. Papadopoulos <
papad...@csd.auth.gr> wrote:

> Dear Jeevan,
>
> Spark 1.2 is quite old, and If I were you I would go for a newer version.
>
> However, is there a parallelism level (e.g., 20, 30) that works for both
> installations?
>
> regards,
>
> Apostolos
>
>
>
> On 29/08/2018 04:55 μμ, jeevan.ks wrote:
> > Hi,
> >
> > I've two systems. One is built on Spark 1.2 and the other on 2.1. I am
> > benchmarking both with the same benchmarks (wordcount, grep, sort, etc.)
> > with the same data set from S3 bucket (size ranges from 50MB to 10 GB).
> The
> > Spark cluster I made use of is r3.xlarge, 8 instances, 4 cores each, and
> > 28GB RAM. I observed a strange behaviour while running the benchmarks
> and is
> > as follows:
> >
> > - When I ran Spark 1.2 version with default partition number
> > (sc.defaultParallelism), the jobs would take forever to complete. So I
> > changed it to the number of cores, i.e., 32 times 3 = 96. This did a
> magic
> > and the jobs completed quickly.
> >
> > - However, when I tried the above magic number on the version 2.1, the
> jobs
> > are taking forever. Deafult parallelism works better, but not that
> > efficient.
> >
> > I'm having problem to rationalise this and compare both the systems. My
> > question is: what changes were made from 1.2 to 2.1 with respect to
> default
> > parallelism for this behaviour to occur? How can I have both versions
> behave
> > similary on the same software/hardware configuration so that I can
> compare?
> >
> > I'd really appreciate your help on this!
> >
> > Cheers,
> > Jeevan
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> --
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: papad...@csd.auth.gr
> twitter: @papadopoulos_ap
> web: http://delab.csd.auth.gr/~apostol
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Parallelism: behavioural difference in version 1.2 and 2.1!?

2018-08-29 Thread Apostolos N. Papadopoulos

Dear Jeevan,

Spark 1.2 is quite old, and If I were you I would go for a newer version.

However, is there a parallelism level (e.g., 20, 30) that works for both 
installations?


regards,

Apostolos



On 29/08/2018 04:55 μμ, jeevan.ks wrote:

Hi,

I've two systems. One is built on Spark 1.2 and the other on 2.1. I am
benchmarking both with the same benchmarks (wordcount, grep, sort, etc.)
with the same data set from S3 bucket (size ranges from 50MB to 10 GB). The
Spark cluster I made use of is r3.xlarge, 8 instances, 4 cores each, and
28GB RAM. I observed a strange behaviour while running the benchmarks and is
as follows:

- When I ran Spark 1.2 version with default partition number
(sc.defaultParallelism), the jobs would take forever to complete. So I
changed it to the number of cores, i.e., 32 times 3 = 96. This did a magic
and the jobs completed quickly.

- However, when I tried the above magic number on the version 2.1, the jobs
are taking forever. Deafult parallelism works better, but not that
efficient.

I'm having problem to rationalise this and compare both the systems. My
question is: what changes were made from 1.2 to 2.1 with respect to default
parallelism for this behaviour to occur? How can I have both versions behave
similary on the same software/hardware configuration so that I can compare?

I'd really appreciate your help on this!

Cheers,
Jeevan



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://delab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Parallelism: behavioural difference in version 1.2 and 2.1!?

2018-08-29 Thread jeevan.ks
Hi,

I've two systems. One is built on Spark 1.2 and the other on 2.1. I am
benchmarking both with the same benchmarks (wordcount, grep, sort, etc.)
with the same data set from S3 bucket (size ranges from 50MB to 10 GB). The
Spark cluster I made use of is r3.xlarge, 8 instances, 4 cores each, and
28GB RAM. I observed a strange behaviour while running the benchmarks and is
as follows:

- When I ran Spark 1.2 version with default partition number
(sc.defaultParallelism), the jobs would take forever to complete. So I
changed it to the number of cores, i.e., 32 times 3 = 96. This did a magic
and the jobs completed quickly.

- However, when I tried the above magic number on the version 2.1, the jobs
are taking forever. Deafult parallelism works better, but not that
efficient.

I'm having problem to rationalise this and compare both the systems. My
question is: what changes were made from 1.2 to 2.1 with respect to default
parallelism for this behaviour to occur? How can I have both versions behave
similary on the same software/hardware configuration so that I can compare?

I'd really appreciate your help on this!

Cheers,
Jeevan 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-14 Thread Supun Nakandala
Hi Weichen,

Thank you very much for the explanation.

On Fri, Oct 13, 2017 at 6:56 PM, Weichen Xu <weichen...@databricks.com>
wrote:

> Hi Supun,
>
> Dataframe API is NOT using the old RDD implementation under the covers,
> dataframe has its own implementation. (Dataframe use binary row format and
> columnar storage when cached). So dataframe has no relationship with the
> `RDD[Row]` you want get.
>
> When calling `df.rdd`, and then cache, it need to turn this dataframe into
> rdd, it will extract each row from dataframe, unserialize them, and compose
> the new RDD.
>
> Thanks!
>
> On Sat, Oct 14, 2017 at 6:17 AM, Stephen Boesch <java...@gmail.com> wrote:
>
>> @Vadim   Would it be true to say the `.rdd` *may* be creating a new job -
>> depending on whether the DataFrame/DataSet had already been materialized
>> via an action or checkpoint?   If the only prior operations on the
>> DataFrame had been transformations then the dataframe would still not have
>> been calculated.  In that case would it also be true that a subsequent
>> action/checkpoint on the DataFrame (not the rdd) would then generate a
>> separate job?
>>
>> 2017-10-13 14:50 GMT-07:00 Vadim Semenov <vadim.seme...@datadoghq.com>:
>>
>>> When you do `Dataset.rdd` you actually create a new job
>>>
>>> here you can see what it does internally:
>>> https://github.com/apache/spark/blob/master/sql/core/src/mai
>>> n/scala/org/apache/spark/sql/Dataset.scala#L2816-L2828
>>>
>>>
>>>
>>> On Fri, Oct 13, 2017 at 5:24 PM, Supun Nakandala <
>>> supun.nakand...@gmail.com> wrote:
>>>
>>>> Hi Weichen,
>>>>
>>>> Thank you for the reply.
>>>>
>>>> My understanding was Dataframe API is using the old RDD implementation
>>>> under the covers though it presents a different API. And calling
>>>> df.rdd will simply give access to the underlying RDD. Is this assumption
>>>> wrong? I would appreciate if you can shed more insights on this issue or
>>>> point me to documentation where I can learn them.
>>>>
>>>> Thank you in advance.
>>>>
>>>> On Fri, Oct 13, 2017 at 3:19 AM, Weichen Xu <weichen...@databricks.com>
>>>> wrote:
>>>>
>>>>> You should use `df.cache()`
>>>>> `df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from
>>>>> the original `df`. and then cache the new RDD.
>>>>>
>>>>> On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala <
>>>>> supun.nakand...@gmail.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have been experimenting with cache/persist/unpersist methods with
>>>>>> respect to both Dataframes and RDD APIs. However, I am experiencing
>>>>>> different behaviors Ddataframe API compared RDD API such Dataframes are 
>>>>>> not
>>>>>> getting cached when count() is called.
>>>>>>
>>>>>> Is there a difference between how these operations act wrt to
>>>>>> Dataframe and RDD APIs?
>>>>>>
>>>>>> Thank You.
>>>>>> -Supun
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Weichen Xu
Hi Supun,

Dataframe API is NOT using the old RDD implementation under the covers,
dataframe has its own implementation. (Dataframe use binary row format and
columnar storage when cached). So dataframe has no relationship with the
`RDD[Row]` you want get.

When calling `df.rdd`, and then cache, it need to turn this dataframe into
rdd, it will extract each row from dataframe, unserialize them, and compose
the new RDD.

Thanks!

On Sat, Oct 14, 2017 at 6:17 AM, Stephen Boesch <java...@gmail.com> wrote:

> @Vadim   Would it be true to say the `.rdd` *may* be creating a new job -
> depending on whether the DataFrame/DataSet had already been materialized
> via an action or checkpoint?   If the only prior operations on the
> DataFrame had been transformations then the dataframe would still not have
> been calculated.  In that case would it also be true that a subsequent
> action/checkpoint on the DataFrame (not the rdd) would then generate a
> separate job?
>
> 2017-10-13 14:50 GMT-07:00 Vadim Semenov <vadim.seme...@datadoghq.com>:
>
>> When you do `Dataset.rdd` you actually create a new job
>>
>> here you can see what it does internally:
>> https://github.com/apache/spark/blob/master/sql/core/src/mai
>> n/scala/org/apache/spark/sql/Dataset.scala#L2816-L2828
>>
>>
>>
>> On Fri, Oct 13, 2017 at 5:24 PM, Supun Nakandala <
>> supun.nakand...@gmail.com> wrote:
>>
>>> Hi Weichen,
>>>
>>> Thank you for the reply.
>>>
>>> My understanding was Dataframe API is using the old RDD implementation
>>> under the covers though it presents a different API. And calling
>>> df.rdd will simply give access to the underlying RDD. Is this assumption
>>> wrong? I would appreciate if you can shed more insights on this issue or
>>> point me to documentation where I can learn them.
>>>
>>> Thank you in advance.
>>>
>>> On Fri, Oct 13, 2017 at 3:19 AM, Weichen Xu <weichen...@databricks.com>
>>> wrote:
>>>
>>>> You should use `df.cache()`
>>>> `df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from
>>>> the original `df`. and then cache the new RDD.
>>>>
>>>> On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala <
>>>> supun.nakand...@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have been experimenting with cache/persist/unpersist methods with
>>>>> respect to both Dataframes and RDD APIs. However, I am experiencing
>>>>> different behaviors Ddataframe API compared RDD API such Dataframes are 
>>>>> not
>>>>> getting cached when count() is called.
>>>>>
>>>>> Is there a difference between how these operations act wrt to
>>>>> Dataframe and RDD APIs?
>>>>>
>>>>> Thank You.
>>>>> -Supun
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Stephen Boesch
@Vadim   Would it be true to say the `.rdd` *may* be creating a new job -
depending on whether the DataFrame/DataSet had already been materialized
via an action or checkpoint?   If the only prior operations on the
DataFrame had been transformations then the dataframe would still not have
been calculated.  In that case would it also be true that a subsequent
action/checkpoint on the DataFrame (not the rdd) would then generate a
separate job?

2017-10-13 14:50 GMT-07:00 Vadim Semenov <vadim.seme...@datadoghq.com>:

> When you do `Dataset.rdd` you actually create a new job
>
> here you can see what it does internally:
> https://github.com/apache/spark/blob/master/sql/core/src/
> main/scala/org/apache/spark/sql/Dataset.scala#L2816-L2828
>
>
>
> On Fri, Oct 13, 2017 at 5:24 PM, Supun Nakandala <
> supun.nakand...@gmail.com> wrote:
>
>> Hi Weichen,
>>
>> Thank you for the reply.
>>
>> My understanding was Dataframe API is using the old RDD implementation
>> under the covers though it presents a different API. And calling
>> df.rdd will simply give access to the underlying RDD. Is this assumption
>> wrong? I would appreciate if you can shed more insights on this issue or
>> point me to documentation where I can learn them.
>>
>> Thank you in advance.
>>
>> On Fri, Oct 13, 2017 at 3:19 AM, Weichen Xu <weichen...@databricks.com>
>> wrote:
>>
>>> You should use `df.cache()`
>>> `df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from
>>> the original `df`. and then cache the new RDD.
>>>
>>> On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala <
>>> supun.nakand...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have been experimenting with cache/persist/unpersist methods with
>>>> respect to both Dataframes and RDD APIs. However, I am experiencing
>>>> different behaviors Ddataframe API compared RDD API such Dataframes are not
>>>> getting cached when count() is called.
>>>>
>>>> Is there a difference between how these operations act wrt to Dataframe
>>>> and RDD APIs?
>>>>
>>>> Thank You.
>>>> -Supun
>>>>
>>>
>>>
>>
>


Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Vadim Semenov
When you do `Dataset.rdd` you actually create a new job

here you can see what it does internally:
https://github.com/apache/spark/blob/master/sql/core/
src/main/scala/org/apache/spark/sql/Dataset.scala#L2816-L2828



On Fri, Oct 13, 2017 at 5:24 PM, Supun Nakandala <supun.nakand...@gmail.com>
wrote:

> Hi Weichen,
>
> Thank you for the reply.
>
> My understanding was Dataframe API is using the old RDD implementation
> under the covers though it presents a different API. And calling
> df.rdd will simply give access to the underlying RDD. Is this assumption
> wrong? I would appreciate if you can shed more insights on this issue or
> point me to documentation where I can learn them.
>
> Thank you in advance.
>
> On Fri, Oct 13, 2017 at 3:19 AM, Weichen Xu <weichen...@databricks.com>
> wrote:
>
>> You should use `df.cache()`
>> `df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from the
>> original `df`. and then cache the new RDD.
>>
>> On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala <
>> supun.nakand...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have been experimenting with cache/persist/unpersist methods with
>>> respect to both Dataframes and RDD APIs. However, I am experiencing
>>> different behaviors Ddataframe API compared RDD API such Dataframes are not
>>> getting cached when count() is called.
>>>
>>> Is there a difference between how these operations act wrt to Dataframe
>>> and RDD APIs?
>>>
>>> Thank You.
>>> -Supun
>>>
>>
>>
>


Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Supun Nakandala
Hi Weichen,

Thank you for the reply.

My understanding was Dataframe API is using the old RDD implementation
under the covers though it presents a different API. And calling
df.rdd will simply give access to the underlying RDD. Is this assumption
wrong? I would appreciate if you can shed more insights on this issue or
point me to documentation where I can learn them.

Thank you in advance.

On Fri, Oct 13, 2017 at 3:19 AM, Weichen Xu <weichen...@databricks.com>
wrote:

> You should use `df.cache()`
> `df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from the
> original `df`. and then cache the new RDD.
>
> On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala <
> supun.nakand...@gmail.com> wrote:
>
>> Hi all,
>>
>> I have been experimenting with cache/persist/unpersist methods with
>> respect to both Dataframes and RDD APIs. However, I am experiencing
>> different behaviors Ddataframe API compared RDD API such Dataframes are not
>> getting cached when count() is called.
>>
>> Is there a difference between how these operations act wrt to Dataframe
>> and RDD APIs?
>>
>> Thank You.
>> -Supun
>>
>
>


Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Weichen Xu
You should use `df.cache()`
`df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from the
original `df`. and then cache the new RDD.

On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala <supun.nakand...@gmail.com>
wrote:

> Hi all,
>
> I have been experimenting with cache/persist/unpersist methods with
> respect to both Dataframes and RDD APIs. However, I am experiencing
> different behaviors Ddataframe API compared RDD API such Dataframes are not
> getting cached when count() is called.
>
> Is there a difference between how these operations act wrt to Dataframe
> and RDD APIs?
>
> Thank You.
> -Supun
>


Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Supun Nakandala
Hi all,

I have been experimenting with cache/persist/unpersist methods with respect
to both Dataframes and RDD APIs. However, I am experiencing different
behaviors Ddataframe API compared RDD API such Dataframes are not getting
cached when count() is called.

Is there a difference between how these operations act wrt to Dataframe and
RDD APIs?

Thank You.
-Supun


[Structured Streaming] How to compute the difference between two rows of a streaming dataframe?

2017-09-29 Thread 张万新
Hi,

I want to compute the difference between two rows in a streaming dataframe,
is there a feasible API? May be some function like the window function *lag
*in normal dataframe, but it seems that this function is unavailable in
streaming dataframe.

Thanks.


Re: Is there a difference between these aggregations

2017-07-24 Thread Aseem Bansal
Any difference between using agg or select to do the aggregations?

On Mon, Jul 24, 2017 at 5:08 PM, yohann jardin <yohannjar...@hotmail.com>
wrote:

> Seen directly in the code:
>
>
>   /**
>* Aggregate function: returns the average of the values in a group.
>* Alias for avg.
>*
>* @group agg_funcs
>* @since 1.4.0
>*/
>   def mean(e: Column): Column = avg(e)
>
>
> That's the same when the argument is the column name.
>
> So no difference between mean and avg functions.
>
>
> --
> *De :* Aseem Bansal <asmbans...@gmail.com>
> *Envoyé :* lundi 24 juillet 2017 13:34
> *À :* user
> *Objet :* Is there a difference between these aggregations
>
> If I want to aggregate mean and subtract from my column I can do either of
> the following in Spark 2.1.0 Java API. Is there any difference between
> these? Couldn't find anything from reading the docs.
>
> dataset.select(mean("mycol"))
> dataset.agg(mean("mycol"))
>
> dataset.select(avg("mycol"))
> dataset.agg(avg("mycol"))
>


RE: Is there a difference between these aggregations

2017-07-24 Thread yohann jardin
Seen directly in the code:


  /**
   * Aggregate function: returns the average of the values in a group.
   * Alias for avg.
   *
   * @group agg_funcs
   * @since 1.4.0
   */
  def mean(e: Column): Column = avg(e)



That's the same when the argument is the column name.

So no difference between mean and avg functions.



De : Aseem Bansal <asmbans...@gmail.com>
Envoyé : lundi 24 juillet 2017 13:34
À : user
Objet : Is there a difference between these aggregations

If I want to aggregate mean and subtract from my column I can do either of the 
following in Spark 2.1.0 Java API. Is there any difference between these? 
Couldn't find anything from reading the docs.

dataset.select(mean("mycol"))
dataset.agg(mean("mycol"))

dataset.select(avg("mycol"))
dataset.agg(avg("mycol"))


Is there a difference between these aggregations

2017-07-24 Thread Aseem Bansal
If I want to aggregate mean and subtract from my column I can do either of
the following in Spark 2.1.0 Java API. Is there any difference between
these? Couldn't find anything from reading the docs.

dataset.select(mean("mycol"))
dataset.agg(mean("mycol"))

dataset.select(avg("mycol"))
dataset.agg(avg("mycol"))


Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-26 Thread ??????????
Hi Kodali,


I feel puzzled about the 
"Kafka Streaming can indeed do map, reduce, join and window operations ".


Do you mean Kafka have API like map or Kafka do't have API but Kafka can do it 
please?
In my memory, kafka do not have API like map and so on.




 
---Original---
From: "kant kodali"<kanth...@gmail.com>
Date: 2017/6/12 06:41:27
To: "Mohammed Guller"<moham...@glassbeam.com>;
Cc: "user"<user@spark.apache.org>;"yohann 
jardin"<yohannjar...@hotmail.com>;"vaquar khan"<vaquar.k...@gmail.com>;"vincent 
gromakowski"<vincent.gromakow...@gmail.com>;
Subject: Re: What is the real difference between Kafka streaming and Spark 
Streaming?


Also another difference I see is some thing like Spark Sql where there are 
logical plans, physical plans, Code generation and all those optimizations I 
don't see them in Kafka Streaming at this time.

On Sun, Jun 11, 2017 at 2:19 PM, kant kodali <kanth...@gmail.com> wrote:
I appreciate the responses however I see the other side of the argument and I 
actually feel they are competitors now in Streaming space in some sense. 

Kafka Streaming can indeed do map, reduce, join and window operations and Like 
wise data can be ingested from many sources in Kafka and send the results out 
to many sinks. Look up "Kafka Connect"

Regarding Event at a time vs Micro-batch. I hear arguments from a group of 
people saying Spark Streaming is real time and other group of people is Kafka 
streaming is the true real time. so do we say Micro-batch is real time or Event 
at a time is real time?

It is well known fact that Spark is more popular with Data scientists who want 
to run ML Algorithms and so on but I also hear that people can use H2O package 
along with Kafka Streaming. so efficient each of these approaches are is 
something I have no clue.


The major difference I see is actually the Spark Scheduler I don't think Kafka 
Streaming has anything like this instead it just allows you to run lambda 
expressions on a stream and write it out to specific topic/partition and from 
there one can use Kafka Connect to write it out to any sink. so In short, All 
the optimizations built into spark scheduler don't seem to exist in Kafka 
Streaming so if I were to make a decision on which framework to use this is an 
additional question I would think about like "Do I want my stream to go through 
the scheduler and if so, why or why not"


Above all, please correct me if I am wrong :) 








On Sun, Jun 11, 2017 at 12:41 PM, Mohammed Guller <moham...@glassbeam.com> 
wrote:
  

  

  
Just to elaborate more on Vincent wrote ?C Kafka streaming provides true 
record-at-a-time processing capabilities whereas Spark Streaming provides 
micro-batching capabilities on top of Spark. Depending on your use  case, you 
may find one better than the other. Both provide stateless ad stateful stream 
processing capabilities.
 
 
 
A few more things to consider:
  
If you don??t already have a Spark cluster, but have Kafka cluster, it may be 
easier to use Kafka streaming since you don??t need to setup  and manage 
another cluster. 

On the other hand, if you already have a spark cluster, but don??t have a Kafka 
cluster (in case you are using some other messaging system),  Spark streaming 
is a better option.

If you already know and use Spark, you may find it easier to program with Spark 
Streaming API even if you are using Kafka. 

Spark Streaming may give you better throughput. So you have to decide what is 
more important for your stream processing application ?C latency  or throughput?

Kafka streaming is relatively new and less mature than Spark Streaming
 
 
  
Mohammed
 
 
 

From: vincent gromakowski [mailto:vincent.gromakow...@gmail.com] 
 Sent: Sunday, June 11, 2017 12:09 PM
 To: yohann jardin <yohannjar...@hotmail.com>
 Cc: kant kodali <kanth...@gmail.com>; vaquar khan <vaquar.k...@gmail.com>; 
user <user@spark.apache.org>
 Subject: Re: What is the real difference between Kafka streaming and Spark 
Streaming?
 
 
 
 

I think Kafka streams is good when the processing of each row is independant 
from each other (row parsing, data cleaning...)
 
  
Spark is better when processing group of rows (group by, ml, window func...)
 
   
 
  
Le 11 juin 2017 8:15 PM, "yohann jardin" <yohannjar...@hotmail.com> a ??crit :

Hey,
 
Kafka can also do streaming on its own:  
https://kafka.apache.org/documentation/streams
 I don??t know much about it unfortunately. I can only repeat what I heard in 
conferences, saying that one should give a try to Kafka streaming when its 
whole pipeline is using Kafka. I have no pros/cons to argument on this topic. 
  
Yohann Jardin
 
  
Le 6/11/2017 ?? 7:08 PM, vaquar khan a ??crit :
 
 

 Hi Kant,
 
 Kafka is the message broker that using as Producers and Consumers and Spark 
Strea

difference between spark-integrated hive and original hive

2017-06-17 Thread wuchang
I want to build hive and spark to make my hive based on spark engine.
I choose Hive 2.3.0 and Spark 2.0.0, which is claimed to be compatible by hive 
official document.
According to the hive officials document ,I  have to build spark without hive 
profile to avoid the conflict between original hive and spark-integrated hive. 
Yes, I build successfully , but then the problem comes:I cannot use spark-sql 
anymore because spark-sql relies on the hive library and my spark is a no-hive 
build.

I don’t know the relationship between hive-integrated hive and original hive. 
Below is the spark-integrated hive jars:

hive-beeline-1.2.1.spark2.jar
hive-cli-1.2.1.spark2.jar
hive-exec-1.2.1.spark2.jar
hive-jdbc-1.2.1.spark2.jar
hive-metastore-1.2.1.spark2.jar
spark-hive_2.11-2.0.0.jar
spark-hive-thriftserver_2.11-2.0.0.jar
It seems that Spark 2.0.0 relies on hive 1.2.1。

Can I just add my 2.3.0 hive's libs to the classpath of Spark?


Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-15 Thread Michael Armbrust
Continuous processing is still a work in progress.  I would really like to
at least have a basic version in Spark 2.3.

The announcement about 2.2 is that we are planning to remove the
experimental tag from Structured Streaming.

On Thu, Jun 15, 2017 at 11:53 AM, kant kodali <kanth...@gmail.com> wrote:

> vow! you caught the 007!  Is continuous processing mode available in 2.2?
> The ticket says the target version is 2.3 but the talk in the Video says
> 2.2 and beyond so I am just curious if it is available in 2.2 or should I
> try it from the latest build?
>
> Thanks!
>
> On Wed, Jun 14, 2017 at 5:32 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> This a good question. I really like using Kafka as a centralized source
>> for streaming data in an organization and, with Spark 2.2, we have full
>> support for reading and writing data to/from Kafka in both streaming and
>> batch
>> <https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html>.
>> I'll focus here on what I think the advantages are of Structured Streaming
>> over Kafka Streams (a stream processing library that reads from Kafka).
>>
>>  - *High level productive APIs* - Streaming queries in Spark can be
>> expressed using DataFrames, Datasets or even plain SQL.  Streaming
>> DataFrames/SQL are supported in Scala, Java, Python and even R.  This means
>> that for common operations like filtering, joining, aggregating, you can
>> use built-in operations.  For complicated custom logic you can use UDFs and
>> lambda functions. In contrast, Kafka Streams mostly requires you to express
>> your transformations using lambda functions.
>>  - *High Performance* - Since it is built on Spark SQL, streaming
>> queries take advantage of the Catalyst optimizer and the Tungsten execution
>> engine. This design leads to huge performance wins
>> <https://databricks.com/blog/2017/06/06/simple-super-fast-streaming-engine-apache-spark.html>,
>> which means you need less hardware to accomplish the same job.
>>  - *Ecosystem* - Spark has connectors for working with all kinds of data
>> stored in a variety of systems.  This means you can join a stream with data
>> encoded in parquet and stored in S3/HDFS.  Perhaps more importantly, it
>> also means that if you decide that you don't want to manage a Kafka cluster
>> anymore and would rather use Kinesis, you can do that too.  We recently
>> moved a bunch of our pipelines from Kafka to Kinesis and had to only change
>> a few lines of code! I think its likely that in the future Spark will also
>> have connectors for Google's PubSub and Azure's streaming offerings.
>>
>> Regarding latency, there has been a lot of discussion about the inherent
>> latencies of micro-batch.  Fortunately, we were very careful to leave
>> batching out of the user facing API, and as we demo'ed last week, this
>> makes it possible for the Spark Streaming to achieve sub-millisecond
>> latencies <https://www.youtube.com/watch?v=qAZ5XUz32yM>.  Watch
>> SPARK-20928 <https://issues.apache.org/jira/browse/SPARK-20928> for more
>> on this effort to eliminate micro-batch from Spark's execution model.
>>
>> At the far other end of the latency spectrum...  For those with jobs that
>> run in the cloud on data that arrives sporadically, you can run streaming
>> jobs that only execute every few hours or every few days, shutting the
>> cluster down in between.  This architecture can result in a huge cost
>> savings for some applications
>> <https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html>
>> .
>>
>> Michael
>>
>> On Sun, Jun 11, 2017 at 1:12 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am trying hard to figure out what is the real difference between Kafka
>>> Streaming vs Spark Streaming other than saying one can be used as part of
>>> Micro services (since Kafka streaming is just a library) and the other is a
>>> Standalone framework by itself.
>>>
>>> If I can accomplish same job one way or other this is a sort of a
>>> puzzling question for me so it would be great to know what Spark streaming
>>> can do that Kafka Streaming cannot do efficiently or whatever ?
>>>
>>> Thanks!
>>>
>>>
>>
>


Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-15 Thread kant kodali
vow! you caught the 007!  Is continuous processing mode available in 2.2?
The ticket says the target version is 2.3 but the talk in the Video says
2.2 and beyond so I am just curious if it is available in 2.2 or should I
try it from the latest build?

Thanks!

On Wed, Jun 14, 2017 at 5:32 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> This a good question. I really like using Kafka as a centralized source
> for streaming data in an organization and, with Spark 2.2, we have full
> support for reading and writing data to/from Kafka in both streaming and
> batch
> <https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html>.
> I'll focus here on what I think the advantages are of Structured Streaming
> over Kafka Streams (a stream processing library that reads from Kafka).
>
>  - *High level productive APIs* - Streaming queries in Spark can be
> expressed using DataFrames, Datasets or even plain SQL.  Streaming
> DataFrames/SQL are supported in Scala, Java, Python and even R.  This means
> that for common operations like filtering, joining, aggregating, you can
> use built-in operations.  For complicated custom logic you can use UDFs and
> lambda functions. In contrast, Kafka Streams mostly requires you to express
> your transformations using lambda functions.
>  - *High Performance* - Since it is built on Spark SQL, streaming queries
> take advantage of the Catalyst optimizer and the Tungsten execution engine.
> This design leads to huge performance wins
> <https://databricks.com/blog/2017/06/06/simple-super-fast-streaming-engine-apache-spark.html>,
> which means you need less hardware to accomplish the same job.
>  - *Ecosystem* - Spark has connectors for working with all kinds of data
> stored in a variety of systems.  This means you can join a stream with data
> encoded in parquet and stored in S3/HDFS.  Perhaps more importantly, it
> also means that if you decide that you don't want to manage a Kafka cluster
> anymore and would rather use Kinesis, you can do that too.  We recently
> moved a bunch of our pipelines from Kafka to Kinesis and had to only change
> a few lines of code! I think its likely that in the future Spark will also
> have connectors for Google's PubSub and Azure's streaming offerings.
>
> Regarding latency, there has been a lot of discussion about the inherent
> latencies of micro-batch.  Fortunately, we were very careful to leave
> batching out of the user facing API, and as we demo'ed last week, this
> makes it possible for the Spark Streaming to achieve sub-millisecond
> latencies <https://www.youtube.com/watch?v=qAZ5XUz32yM>.  Watch
> SPARK-20928 <https://issues.apache.org/jira/browse/SPARK-20928> for more
> on this effort to eliminate micro-batch from Spark's execution model.
>
> At the far other end of the latency spectrum...  For those with jobs that
> run in the cloud on data that arrives sporadically, you can run streaming
> jobs that only execute every few hours or every few days, shutting the
> cluster down in between.  This architecture can result in a huge cost
> savings for some applications
> <https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html>
> .
>
> Michael
>
> On Sun, Jun 11, 2017 at 1:12 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am trying hard to figure out what is the real difference between Kafka
>> Streaming vs Spark Streaming other than saying one can be used as part of
>> Micro services (since Kafka streaming is just a library) and the other is a
>> Standalone framework by itself.
>>
>> If I can accomplish same job one way or other this is a sort of a
>> puzzling question for me so it would be great to know what Spark streaming
>> can do that Kafka Streaming cannot do efficiently or whatever ?
>>
>> Thanks!
>>
>>
>


Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-14 Thread Michael Armbrust
This a good question. I really like using Kafka as a centralized source for
streaming data in an organization and, with Spark 2.2, we have full support
for reading and writing data to/from Kafka in both streaming and batch
<https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html>.
I'll focus here on what I think the advantages are of Structured Streaming
over Kafka Streams (a stream processing library that reads from Kafka).

 - *High level productive APIs* - Streaming queries in Spark can be
expressed using DataFrames, Datasets or even plain SQL.  Streaming
DataFrames/SQL are supported in Scala, Java, Python and even R.  This means
that for common operations like filtering, joining, aggregating, you can
use built-in operations.  For complicated custom logic you can use UDFs and
lambda functions. In contrast, Kafka Streams mostly requires you to express
your transformations using lambda functions.
 - *High Performance* - Since it is built on Spark SQL, streaming queries
take advantage of the Catalyst optimizer and the Tungsten execution engine.
This design leads to huge performance wins
<https://databricks.com/blog/2017/06/06/simple-super-fast-streaming-engine-apache-spark.html>,
which means you need less hardware to accomplish the same job.
 - *Ecosystem* - Spark has connectors for working with all kinds of data
stored in a variety of systems.  This means you can join a stream with data
encoded in parquet and stored in S3/HDFS.  Perhaps more importantly, it
also means that if you decide that you don't want to manage a Kafka cluster
anymore and would rather use Kinesis, you can do that too.  We recently
moved a bunch of our pipelines from Kafka to Kinesis and had to only change
a few lines of code! I think its likely that in the future Spark will also
have connectors for Google's PubSub and Azure's streaming offerings.

Regarding latency, there has been a lot of discussion about the inherent
latencies of micro-batch.  Fortunately, we were very careful to leave
batching out of the user facing API, and as we demo'ed last week, this
makes it possible for the Spark Streaming to achieve sub-millisecond
latencies <https://www.youtube.com/watch?v=qAZ5XUz32yM>.  Watch SPARK-20928
<https://issues.apache.org/jira/browse/SPARK-20928> for more on this effort
to eliminate micro-batch from Spark's execution model.

At the far other end of the latency spectrum...  For those with jobs that
run in the cloud on data that arrives sporadically, you can run streaming
jobs that only execute every few hours or every few days, shutting the
cluster down in between.  This architecture can result in a huge cost
savings for some applications
<https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html>
.

Michael

On Sun, Jun 11, 2017 at 1:12 AM, kant kodali <kanth...@gmail.com> wrote:

> Hi All,
>
> I am trying hard to figure out what is the real difference between Kafka
> Streaming vs Spark Streaming other than saying one can be used as part of
> Micro services (since Kafka streaming is just a library) and the other is a
> Standalone framework by itself.
>
> If I can accomplish same job one way or other this is a sort of a puzzling
> question for me so it would be great to know what Spark streaming can do
> that Kafka Streaming cannot do efficiently or whatever ?
>
> Thanks!
>
>


Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-13 Thread Paolo Patierno
I think that a big advantage to not use Spark Streaming when your solution is 
already based on Kafka is that you don't have to deal with another cluster. I 
mean ...
Imagine that your solution is already based on Kafka as ingestion systems for 
your events and then you need to do some real time analysis with streams. 
Adding Spark means adding a new cluster with a master and one or more nodes 
then Spark will distribute jobs for you. Using the lightweight streams library 
from Kafka means just developing a new application for getting events from the 
same cluster. You can deploy more instances of the same application for load 
balancing and all is done always by Kafka itself.
I think that in terms of deployment this is a big advantage of using Kafka 
stream in the same Kafka cluster instead of adding Spark.

Paolo

From: kant kodali <kanth...@gmail.com>
Sent: Monday, June 12, 2017 12:40:37 AM
To: Mohammed Guller
Cc: vincent gromakowski; yohann jardin; vaquar khan; user
Subject: Re: What is the real difference between Kafka streaming and Spark 
Streaming?

Also another difference I see is some thing like Spark Sql where there are 
logical plans, physical plans, Code generation and all those optimizations I 
don't see them in Kafka Streaming at this time.

On Sun, Jun 11, 2017 at 2:19 PM, kant kodali 
<kanth...@gmail.com<mailto:kanth...@gmail.com>> wrote:
I appreciate the responses however I see the other side of the argument and I 
actually feel they are competitors now in Streaming space in some sense.

Kafka Streaming can indeed do map, reduce, join and window operations and Like 
wise data can be ingested from many sources in Kafka and send the results out 
to many sinks. Look up "Kafka Connect"

Regarding Event at a time vs Micro-batch. I hear arguments from a group of 
people saying Spark Streaming is real time and other group of people is Kafka 
streaming is the true real time. so do we say Micro-batch is real time or Event 
at a time is real time?

It is well known fact that Spark is more popular with Data scientists who want 
to run ML Algorithms and so on but I also hear that people can use H2O package 
along with Kafka Streaming. so efficient each of these approaches are is 
something I have no clue.

The major difference I see is actually the Spark Scheduler I don't think Kafka 
Streaming has anything like this instead it just allows you to run lambda 
expressions on a stream and write it out to specific topic/partition and from 
there one can use Kafka Connect to write it out to any sink. so In short, All 
the optimizations built into spark scheduler don't seem to exist in Kafka 
Streaming so if I were to make a decision on which framework to use this is an 
additional question I would think about like "Do I want my stream to go through 
the scheduler and if so, why or why not"

Above all, please correct me if I am wrong :)




On Sun, Jun 11, 2017 at 12:41 PM, Mohammed Guller 
<moham...@glassbeam.com<mailto:moham...@glassbeam.com>> wrote:
Just to elaborate more on Vincent wrote – Kafka streaming provides true 
record-at-a-time processing capabilities whereas Spark Streaming provides 
micro-batching capabilities on top of Spark. Depending on your use case, you 
may find one better than the other. Both provide stateless ad stateful stream 
processing capabilities.

A few more things to consider:

  1.  If you don’t already have a Spark cluster, but have Kafka cluster, it may 
be easier to use Kafka streaming since you don’t need to setup and manage 
another cluster.
  2.  On the other hand, if you already have a spark cluster, but don’t have a 
Kafka cluster (in case you are using some other messaging system), Spark 
streaming is a better option.
  3.  If you already know and use Spark, you may find it easier to program with 
Spark Streaming API even if you are using Kafka.
  4.  Spark Streaming may give you better throughput. So you have to decide 
what is more important for your stream processing application – latency or 
throughput?
  5.  Kafka streaming is relatively new and less mature than Spark Streaming

Mohammed

From: vincent gromakowski 
[mailto:vincent.gromakow...@gmail.com<mailto:vincent.gromakow...@gmail.com>]
Sent: Sunday, June 11, 2017 12:09 PM
To: yohann jardin <yohannjar...@hotmail.com<mailto:yohannjar...@hotmail.com>>
Cc: kant kodali <kanth...@gmail.com<mailto:kanth...@gmail.com>>; vaquar khan 
<vaquar.k...@gmail.com<mailto:vaquar.k...@gmail.com>>; user 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: What is the real difference between Kafka streaming and Spark 
Streaming?

I think Kafka streams is good when the processing of each row is independant 
from each other (row parsing, data cleaning...)
Spark is better when processing group of rows (group by, ml, window func...)

Le 11 juin 2017 8:15 PM, "yohann jardin&quo

RE: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-12 Thread Mohammed Guller
Regarding Spark scheduler – if you are referring to the ability to distribute 
workload and scale, Kafka Streaming also provides that capability. It is 
deceptively simple in that regard if you already have a Kafka cluster. You can 
launch multiple instances of your Kafka streaming application and Kafka 
streaming will automatically balance the workload across different instances. 
It rebalances workload as you add or remove instances. Similarly, if an 
instance fails or crash, it will automatically detect that.

Regarding real-time – rather than debating which one is real-time, I would look 
at the latency requirements of my application. For most applications, the near 
real time capabilities of Spark Streaming might be good enough. For others, it 
may not.  For example, if I was building a high-frequency trading application, 
where I want to process individual trades as soon as they happen, I might lean 
towards Kafka streaming.

Agree about the benefits of using SQL with structured streaming.

Mohammed

From: kant kodali [mailto:kanth...@gmail.com]
Sent: Sunday, June 11, 2017 3:41 PM
To: Mohammed Guller <moham...@glassbeam.com>
Cc: vincent gromakowski <vincent.gromakow...@gmail.com>; yohann jardin 
<yohannjar...@hotmail.com>; vaquar khan <vaquar.k...@gmail.com>; user 
<user@spark.apache.org>
Subject: Re: What is the real difference between Kafka streaming and Spark 
Streaming?

Also another difference I see is some thing like Spark Sql where there are 
logical plans, physical plans, Code generation and all those optimizations I 
don't see them in Kafka Streaming at this time.

On Sun, Jun 11, 2017 at 2:19 PM, kant kodali 
<kanth...@gmail.com<mailto:kanth...@gmail.com>> wrote:
I appreciate the responses however I see the other side of the argument and I 
actually feel they are competitors now in Streaming space in some sense.

Kafka Streaming can indeed do map, reduce, join and window operations and Like 
wise data can be ingested from many sources in Kafka and send the results out 
to many sinks. Look up "Kafka Connect"

Regarding Event at a time vs Micro-batch. I hear arguments from a group of 
people saying Spark Streaming is real time and other group of people is Kafka 
streaming is the true real time. so do we say Micro-batch is real time or Event 
at a time is real time?

It is well known fact that Spark is more popular with Data scientists who want 
to run ML Algorithms and so on but I also hear that people can use H2O package 
along with Kafka Streaming. so efficient each of these approaches are is 
something I have no clue.

The major difference I see is actually the Spark Scheduler I don't think Kafka 
Streaming has anything like this instead it just allows you to run lambda 
expressions on a stream and write it out to specific topic/partition and from 
there one can use Kafka Connect to write it out to any sink. so In short, All 
the optimizations built into spark scheduler don't seem to exist in Kafka 
Streaming so if I were to make a decision on which framework to use this is an 
additional question I would think about like "Do I want my stream to go through 
the scheduler and if so, why or why not"

Above all, please correct me if I am wrong :)




On Sun, Jun 11, 2017 at 12:41 PM, Mohammed Guller 
<moham...@glassbeam.com<mailto:moham...@glassbeam.com>> wrote:
Just to elaborate more on Vincent wrote – Kafka streaming provides true 
record-at-a-time processing capabilities whereas Spark Streaming provides 
micro-batching capabilities on top of Spark. Depending on your use case, you 
may find one better than the other. Both provide stateless ad stateful stream 
processing capabilities.

A few more things to consider:

  1.  If you don’t already have a Spark cluster, but have Kafka cluster, it may 
be easier to use Kafka streaming since you don’t need to setup and manage 
another cluster.
  2.  On the other hand, if you already have a spark cluster, but don’t have a 
Kafka cluster (in case you are using some other messaging system), Spark 
streaming is a better option.
  3.  If you already know and use Spark, you may find it easier to program with 
Spark Streaming API even if you are using Kafka.
  4.  Spark Streaming may give you better throughput. So you have to decide 
what is more important for your stream processing application – latency or 
throughput?
  5.  Kafka streaming is relatively new and less mature than Spark Streaming

Mohammed

From: vincent gromakowski 
[mailto:vincent.gromakow...@gmail.com<mailto:vincent.gromakow...@gmail.com>]
Sent: Sunday, June 11, 2017 12:09 PM
To: yohann jardin <yohannjar...@hotmail.com<mailto:yohannjar...@hotmail.com>>
Cc: kant kodali <kanth...@gmail.com<mailto:kanth...@gmail.com>>; vaquar khan 
<vaquar.k...@gmail.com<mailto:vaquar.k...@gmail.com>>; user 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: What

Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-11 Thread kant kodali
Also another difference I see is some thing like Spark Sql where there are
logical plans, physical plans, Code generation and all those optimizations
I don't see them in Kafka Streaming at this time.

On Sun, Jun 11, 2017 at 2:19 PM, kant kodali <kanth...@gmail.com> wrote:

> I appreciate the responses however I see the other side of the argument
> and I actually feel they are competitors now in Streaming space in some
> sense.
>
> Kafka Streaming can indeed do map, reduce, join and window operations and
> Like wise data can be ingested from many sources in Kafka and send the
> results out to many sinks. Look up "Kafka Connect"
>
> Regarding Event at a time vs Micro-batch. I hear arguments from a group of
> people saying Spark Streaming is real time and other group of people is
> Kafka streaming is the true real time. so do we say Micro-batch is real
> time or Event at a time is real time?
>
> It is well known fact that Spark is more popular with Data scientists who
> want to run ML Algorithms and so on but I also hear that people can use H2O
> package along with Kafka Streaming. so efficient each of these approaches
> are is something I have no clue.
>
> The major difference I see is actually the *Spark Scheduler* I don't
> think Kafka Streaming has anything like this instead it just allows you to
> run lambda expressions on a stream and write it out to specific
> topic/partition and from there one can use Kafka Connect to write it out to
> any sink. so In short, All the optimizations built into spark scheduler
> don't seem to exist in Kafka Streaming so if I were to make a decision on
> which framework to use this is an additional question I would think about
> like "Do I want my stream to go through the scheduler and if so, why or why
> not"
>
> Above all, please correct me if I am wrong :)
>
>
>
>
> On Sun, Jun 11, 2017 at 12:41 PM, Mohammed Guller <moham...@glassbeam.com>
> wrote:
>
>> Just to elaborate more on Vincent wrote – Kafka streaming provides true
>> record-at-a-time processing capabilities whereas Spark Streaming provides
>> micro-batching capabilities on top of Spark. Depending on your use case,
>> you may find one better than the other. Both provide stateless ad stateful
>> stream processing capabilities.
>>
>>
>>
>> A few more things to consider:
>>
>>1. If you don’t already have a Spark cluster, but have Kafka cluster,
>>it may be easier to use Kafka streaming since you don’t need to setup and
>>manage another cluster.
>>2. On the other hand, if you already have a spark cluster, but don’t
>>have a Kafka cluster (in case you are using some other messaging system),
>>Spark streaming is a better option.
>>3. If you already know and use Spark, you may find it easier to
>>program with Spark Streaming API even if you are using Kafka.
>>4. Spark Streaming may give you better throughput. So you have to
>>decide what is more important for your stream processing application –
>>latency or throughput?
>>5. Kafka streaming is relatively new and less mature than Spark
>>Streaming
>>
>>
>>
>> Mohammed
>>
>>
>>
>> *From:* vincent gromakowski [mailto:vincent.gromakow...@gmail.com]
>> *Sent:* Sunday, June 11, 2017 12:09 PM
>> *To:* yohann jardin <yohannjar...@hotmail.com>
>> *Cc:* kant kodali <kanth...@gmail.com>; vaquar khan <
>> vaquar.k...@gmail.com>; user <user@spark.apache.org>
>> *Subject:* Re: What is the real difference between Kafka streaming and
>> Spark Streaming?
>>
>>
>>
>> I think Kafka streams is good when the processing of each row is
>> independant from each other (row parsing, data cleaning...)
>>
>> Spark is better when processing group of rows (group by, ml, window
>> func...)
>>
>>
>>
>> Le 11 juin 2017 8:15 PM, "yohann jardin" <yohannjar...@hotmail.com> a
>> écrit :
>>
>> Hey,
>>
>> Kafka can also do streaming on its own: https://kafka.apache.org/docum
>> entation/streams
>> I don’t know much about it unfortunately. I can only repeat what I heard
>> in conferences, saying that one should give a try to Kafka streaming when
>> its whole pipeline is using Kafka. I have no pros/cons to argument on this
>> topic.
>>
>> *Yohann Jardin*
>>
>> Le 6/11/2017 à 7:08 PM, vaquar khan a écrit :
>>
>> Hi Kant,
>>
>> Kafka is the message broker that using as Producers and Consumers and
>> Spark Streaming is used as the real time processing ,Kafka and Spark
>>

Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-11 Thread kant kodali
I appreciate the responses however I see the other side of the argument and
I actually feel they are competitors now in Streaming space in some sense.

Kafka Streaming can indeed do map, reduce, join and window operations and
Like wise data can be ingested from many sources in Kafka and send the
results out to many sinks. Look up "Kafka Connect"

Regarding Event at a time vs Micro-batch. I hear arguments from a group of
people saying Spark Streaming is real time and other group of people is
Kafka streaming is the true real time. so do we say Micro-batch is real
time or Event at a time is real time?

It is well known fact that Spark is more popular with Data scientists who
want to run ML Algorithms and so on but I also hear that people can use H2O
package along with Kafka Streaming. so efficient each of these approaches
are is something I have no clue.

The major difference I see is actually the *Spark Scheduler* I don't think
Kafka Streaming has anything like this instead it just allows you to run
lambda expressions on a stream and write it out to specific topic/partition
and from there one can use Kafka Connect to write it out to any sink. so In
short, All the optimizations built into spark scheduler don't seem to exist
in Kafka Streaming so if I were to make a decision on which framework to
use this is an additional question I would think about like "Do I want my
stream to go through the scheduler and if so, why or why not"

Above all, please correct me if I am wrong :)




On Sun, Jun 11, 2017 at 12:41 PM, Mohammed Guller <moham...@glassbeam.com>
wrote:

> Just to elaborate more on Vincent wrote – Kafka streaming provides true
> record-at-a-time processing capabilities whereas Spark Streaming provides
> micro-batching capabilities on top of Spark. Depending on your use case,
> you may find one better than the other. Both provide stateless ad stateful
> stream processing capabilities.
>
>
>
> A few more things to consider:
>
>1. If you don’t already have a Spark cluster, but have Kafka cluster,
>it may be easier to use Kafka streaming since you don’t need to setup and
>manage another cluster.
>2. On the other hand, if you already have a spark cluster, but don’t
>have a Kafka cluster (in case you are using some other messaging system),
>Spark streaming is a better option.
>3. If you already know and use Spark, you may find it easier to
>program with Spark Streaming API even if you are using Kafka.
>4. Spark Streaming may give you better throughput. So you have to
>decide what is more important for your stream processing application –
>latency or throughput?
>5. Kafka streaming is relatively new and less mature than Spark
>Streaming
>
>
>
> Mohammed
>
>
>
> *From:* vincent gromakowski [mailto:vincent.gromakow...@gmail.com]
> *Sent:* Sunday, June 11, 2017 12:09 PM
> *To:* yohann jardin <yohannjar...@hotmail.com>
> *Cc:* kant kodali <kanth...@gmail.com>; vaquar khan <vaquar.k...@gmail.com>;
> user <user@spark.apache.org>
> *Subject:* Re: What is the real difference between Kafka streaming and
> Spark Streaming?
>
>
>
> I think Kafka streams is good when the processing of each row is
> independant from each other (row parsing, data cleaning...)
>
> Spark is better when processing group of rows (group by, ml, window
> func...)
>
>
>
> Le 11 juin 2017 8:15 PM, "yohann jardin" <yohannjar...@hotmail.com> a
> écrit :
>
> Hey,
>
> Kafka can also do streaming on its own: https://kafka.apache.org/
> documentation/streams
> I don’t know much about it unfortunately. I can only repeat what I heard
> in conferences, saying that one should give a try to Kafka streaming when
> its whole pipeline is using Kafka. I have no pros/cons to argument on this
> topic.
>
> *Yohann Jardin*
>
> Le 6/11/2017 à 7:08 PM, vaquar khan a écrit :
>
> Hi Kant,
>
> Kafka is the message broker that using as Producers and Consumers and
> Spark Streaming is used as the real time processing ,Kafka and Spark
> Streaming work together not competitors.
>
> Spark Streaming is reading data from Kafka and process into micro batching
> for streaming data, In easy terms collects data for some time, build RDD
> and then process these micro batches.
>
>
>
>
>
> Please read doc : https://spark.apache.org/docs/latest/streaming-
> programming-guide.html
>
>
>
> Spark Streaming is an extension of the core Spark API that enables
> scalable, high-throughput, fault-tolerant stream processing of live data
> streams. Data can be ingested from many sources like *Kafka, Flume,
> Kinesis, or TCP sockets*, and can be processed using complex algorithms
> expressed with high-level fu

RE: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-11 Thread Mohammed Guller
Just to elaborate more on Vincent wrote – Kafka streaming provides true 
record-at-a-time processing capabilities whereas Spark Streaming provides 
micro-batching capabilities on top of Spark. Depending on your use case, you 
may find one better than the other. Both provide stateless ad stateful stream 
processing capabilities.

A few more things to consider:

  1.  If you don’t already have a Spark cluster, but have Kafka cluster, it may 
be easier to use Kafka streaming since you don’t need to setup and manage 
another cluster.
  2.  On the other hand, if you already have a spark cluster, but don’t have a 
Kafka cluster (in case you are using some other messaging system), Spark 
streaming is a better option.
  3.  If you already know and use Spark, you may find it easier to program with 
Spark Streaming API even if you are using Kafka.
  4.  Spark Streaming may give you better throughput. So you have to decide 
what is more important for your stream processing application – latency or 
throughput?
  5.  Kafka streaming is relatively new and less mature than Spark Streaming

Mohammed

From: vincent gromakowski [mailto:vincent.gromakow...@gmail.com]
Sent: Sunday, June 11, 2017 12:09 PM
To: yohann jardin <yohannjar...@hotmail.com>
Cc: kant kodali <kanth...@gmail.com>; vaquar khan <vaquar.k...@gmail.com>; user 
<user@spark.apache.org>
Subject: Re: What is the real difference between Kafka streaming and Spark 
Streaming?

I think Kafka streams is good when the processing of each row is independant 
from each other (row parsing, data cleaning...)
Spark is better when processing group of rows (group by, ml, window func...)

Le 11 juin 2017 8:15 PM, "yohann jardin" 
<yohannjar...@hotmail.com<mailto:yohannjar...@hotmail.com>> a écrit :

Hey,
Kafka can also do streaming on its own: 
https://kafka.apache.org/documentation/streams
I don’t know much about it unfortunately. I can only repeat what I heard in 
conferences, saying that one should give a try to Kafka streaming when its 
whole pipeline is using Kafka. I have no pros/cons to argument on this topic.

Yohann Jardin
Le 6/11/2017 à 7:08 PM, vaquar khan a écrit :

Hi Kant,

Kafka is the message broker that using as Producers and Consumers and Spark 
Streaming is used as the real time processing ,Kafka and Spark Streaming work 
together not competitors.
Spark Streaming is reading data from Kafka and process into micro batching for 
streaming data, In easy terms collects data for some time, build RDD and then 
process these micro batches.


Please read doc : 
https://spark.apache.org/docs/latest/streaming-programming-guide.html


Spark Streaming is an extension of the core Spark API that enables scalable, 
high-throughput, fault-tolerant stream processing of live data streams. Data 
can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, 
and can be processed using complex algorithms expressed with high-level 
functions like map, reduce, join and window. Finally, processed data can be 
pushed out to filesystems, databases, and live dashboards. In fact, you can 
apply Spark’s machine 
learning<https://spark.apache.org/docs/latest/ml-guide.html> and graph 
processing<https://spark.apache.org/docs/latest/graphx-programming-guide.html> 
algorithms on data streams.


Regards,

Vaquar khan

On Sun, Jun 11, 2017 at 3:12 AM, kant kodali 
<kanth...@gmail.com<mailto:kanth...@gmail.com>> wrote:
Hi All,

I am trying hard to figure out what is the real difference between Kafka 
Streaming vs Spark Streaming other than saying one can be used as part of Micro 
services (since Kafka streaming is just a library) and the other is a 
Standalone framework by itself.

If I can accomplish same job one way or other this is a sort of a puzzling 
question for me so it would be great to know what Spark streaming can do that 
Kafka Streaming cannot do efficiently or whatever ?

Thanks!




--
Regards,
Vaquar Khan
+1 -224-436-0783<tel:(224)%20436-0783>
Greater Chicago




Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-11 Thread vincent gromakowski
I think Kafka streams is good when the processing of each row is
independant from each other (row parsing, data cleaning...)
Spark is better when processing group of rows (group by, ml, window func...)

Le 11 juin 2017 8:15 PM, "yohann jardin" <yohannjar...@hotmail.com> a
écrit :

Hey,
Kafka can also do streaming on its own: https://kafka.apache.org/
documentation/streams
I don’t know much about it unfortunately. I can only repeat what I heard in
conferences, saying that one should give a try to Kafka streaming when its
whole pipeline is using Kafka. I have no pros/cons to argument on this
topic.

*Yohann Jardin*
Le 6/11/2017 à 7:08 PM, vaquar khan a écrit :

Hi Kant,

Kafka is the message broker that using as Producers and Consumers and Spark
Streaming is used as the real time processing ,Kafka and Spark Streaming
work together not competitors.
Spark Streaming is reading data from Kafka and process into micro batching
for streaming data, In easy terms collects data for some time, build RDD
and then process these micro batches.


Please read doc : https://spark.apache.org/docs/latest/streaming-
programming-guide.html

Spark Streaming is an extension of the core Spark API that enables
scalable, high-throughput, fault-tolerant stream processing of live data
streams. Data can be ingested from many sources like *Kafka, Flume,
Kinesis, or TCP sockets*, and can be processed using complex algorithms
expressed with high-level functions like map, reduce, join and window.
Finally, processed data can be pushed out to filesystems, databases, and
live dashboards. In fact, you can apply Spark’s machine learning
<https://spark.apache.org/docs/latest/ml-guide.html> and graph processing
<https://spark.apache.org/docs/latest/graphx-programming-guide.html> algorithms
on data streams.

Regards,

Vaquar khan

On Sun, Jun 11, 2017 at 3:12 AM, kant kodali <kanth...@gmail.com> wrote:

> Hi All,
>
> I am trying hard to figure out what is the real difference between Kafka
> Streaming vs Spark Streaming other than saying one can be used as part of
> Micro services (since Kafka streaming is just a library) and the other is a
> Standalone framework by itself.
>
> If I can accomplish same job one way or other this is a sort of a puzzling
> question for me so it would be great to know what Spark streaming can do
> that Kafka Streaming cannot do efficiently or whatever ?
>
> Thanks!
>
>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783 <(224)%20436-0783>
Greater Chicago


Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-11 Thread yohann jardin
Hey,

Kafka can also do streaming on its own: 
https://kafka.apache.org/documentation/streams
I don’t know much about it unfortunately. I can only repeat what I heard in 
conferences, saying that one should give a try to Kafka streaming when its 
whole pipeline is using Kafka. I have no pros/cons to argument on this topic.

Yohann Jardin

Le 6/11/2017 à 7:08 PM, vaquar khan a écrit :

Hi Kant,

Kafka is the message broker that using as Producers and Consumers and Spark 
Streaming is used as the real time processing ,Kafka and Spark Streaming work 
together not competitors.

Spark Streaming is reading data from Kafka and process into micro batching for 
streaming data, In easy terms collects data for some time, build RDD and then 
process these micro batches.


Please read doc : 
https://spark.apache.org/docs/latest/streaming-programming-guide.html


Spark Streaming is an extension of the core Spark API that enables scalable, 
high-throughput, fault-tolerant stream processing of live data streams. Data 
can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, 
and can be processed using complex algorithms expressed with high-level 
functions like map, reduce, join and window. Finally, processed data can be 
pushed out to filesystems, databases, and live dashboards. In fact, you can 
apply Spark’s machine 
learning<https://spark.apache.org/docs/latest/ml-guide.html> and graph 
processing<https://spark.apache.org/docs/latest/graphx-programming-guide.html> 
algorithms on data streams.


Regards,

Vaquar khan

On Sun, Jun 11, 2017 at 3:12 AM, kant kodali 
<kanth...@gmail.com<mailto:kanth...@gmail.com>> wrote:
Hi All,

I am trying hard to figure out what is the real difference between Kafka 
Streaming vs Spark Streaming other than saying one can be used as part of Micro 
services (since Kafka streaming is just a library) and the other is a 
Standalone framework by itself.

If I can accomplish same job one way or other this is a sort of a puzzling 
question for me so it would be great to know what Spark streaming can do that 
Kafka Streaming cannot do efficiently or whatever ?

Thanks!




--
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago



Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-11 Thread yohann jardin
Hey,

Kafka can also do streaming on its own: 
https://kafka.apache.org/documentation/streams
I don't know much about it unfortunately. I can only repeat what I heard in 
conferences, saying that one should give a try to Kafka streaming when its 
whole pipeline is using Kafka. I have no pros/cons to argument on this topic.

Yohann Jardin

Le 6/11/2017 à 7:08 PM, vaquar khan a écrit :

Hi Kant,

Kafka is the message broker that using as Producers and Consumers and Spark 
Streaming is used as the real time processing ,Kafka and Spark Streaming work 
together not competitors.

Spark Streaming is reading data from Kafka and process into micro batching for 
streaming data, In easy terms collects data for some time, build RDD and then 
process these micro batches.


Please read doc : 
https://spark.apache.org/docs/latest/streaming-programming-guide.html


Spark Streaming is an extension of the core Spark API that enables scalable, 
high-throughput, fault-tolerant stream processing of live data streams. Data 
can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, 
and can be processed using complex algorithms expressed with high-level 
functions like map, reduce, join and window. Finally, processed data can be 
pushed out to filesystems, databases, and live dashboards. In fact, you can 
apply Spark’s machine 
learning<https://spark.apache.org/docs/latest/ml-guide.html> and graph 
processing<https://spark.apache.org/docs/latest/graphx-programming-guide.html> 
algorithms on data streams.


Regards,

Vaquar khan

On Sun, Jun 11, 2017 at 3:12 AM, kant kodali 
<kanth...@gmail.com<mailto:kanth...@gmail.com>> wrote:
Hi All,

I am trying hard to figure out what is the real difference between Kafka 
Streaming vs Spark Streaming other than saying one can be used as part of Micro 
services (since Kafka streaming is just a library) and the other is a 
Standalone framework by itself.

If I can accomplish same job one way or other this is a sort of a puzzling 
question for me so it would be great to know what Spark streaming can do that 
Kafka Streaming cannot do efficiently or whatever ?

Thanks!




--
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago



Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-11 Thread vaquar khan
Hi Kant,

Kafka is the message broker that using as Producers and Consumers and Spark
Streaming is used as the real time processing ,Kafka and Spark Streaming
work together not competitors.
Spark Streaming is reading data from Kafka and process into micro batching
for streaming data, In easy terms collects data for some time, build RDD
and then process these micro batches.


Please read doc :
https://spark.apache.org/docs/latest/streaming-programming-guide.html

Spark Streaming is an extension of the core Spark API that enables
scalable, high-throughput, fault-tolerant stream processing of live data
streams. Data can be ingested from many sources like *Kafka, Flume,
Kinesis, or TCP sockets*, and can be processed using complex algorithms
expressed with high-level functions like map, reduce, join and window.
Finally, processed data can be pushed out to filesystems, databases, and
live dashboards. In fact, you can apply Spark’s machine learning
<https://spark.apache.org/docs/latest/ml-guide.html> and graph processing
<https://spark.apache.org/docs/latest/graphx-programming-guide.html> algorithms
on data streams.

Regards,

Vaquar khan

On Sun, Jun 11, 2017 at 3:12 AM, kant kodali <kanth...@gmail.com> wrote:

> Hi All,
>
> I am trying hard to figure out what is the real difference between Kafka
> Streaming vs Spark Streaming other than saying one can be used as part of
> Micro services (since Kafka streaming is just a library) and the other is a
> Standalone framework by itself.
>
> If I can accomplish same job one way or other this is a sort of a puzzling
> question for me so it would be great to know what Spark streaming can do
> that Kafka Streaming cannot do efficiently or whatever ?
>
> Thanks!
>
>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago


What is the real difference between Kafka streaming and Spark Streaming?

2017-06-11 Thread kant kodali
Hi All,

I am trying hard to figure out what is the real difference between Kafka
Streaming vs Spark Streaming other than saying one can be used as part of
Micro services (since Kafka streaming is just a library) and the other is a
Standalone framework by itself.

If I can accomplish same job one way or other this is a sort of a puzzling
question for me so it would be great to know what Spark streaming can do
that Kafka Streaming cannot do efficiently or whatever ?

Thanks!


Re: what is the difference between json format vs kafka format?

2017-05-15 Thread Michael Armbrust
e schema from the Dataset created
>> (i.e Dataset.schema ).
>>
>> - Then you can run the real streaming query with from_json and the learnt
>> schema.
>>
>> Make sure that the generated text file have sufficient data to infer the
>> full schema. Let me know if this works for you.
>>
>> TD
>>
>>
>> On Sat, May 13, 2017 at 6:04 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> Thanks for the response. Looks like from_json requires schema ahead of
>>> time. Is there any function I can use to infer schema from the json
>>> messages I am receiving through Kafka?  I tried with the code below however
>>> I get the following exception.
>>>
>>> org.apache.spark.sql.AnalysisException: Queries with streaming sources
>>> must be executed with writeStream.start()
>>>
>>>
>>> //code
>>> val ds = datasetRows.selectExpr("CAST(value AS STRING)").toJSON //
>>> datasetRows is of type DataSet that I get from loading from Kafka
>>>
>>> val foo = ds.select("*").count()
>>> val query = foo.writeStream.outputMode("complete").format("console").sta
>>> rt();
>>> query.awaitTermination()
>>>
>>> I am just trying to parse Json messages from Kafka put into Dataframe or
>>> Dataset without requiring the schema and doing the simple count.
>>>
>>> Thanks!
>>>
>>>
>>>
>>> On Sat, May 13, 2017 at 3:29 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> I understand the confusing. "json" format is for json encoded files
>>>> being written in a directory. For Kafka, use "kafk" format. Then you decode
>>>> the binary data as a json, you can use the function "from_json" (spark 2.1
>>>> and above). Here is our blog post on this.
>>>>
>>>> https://databricks.com/blog/2017/04/26/processing-data-in-ap
>>>> ache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>>>
>>>> And my talk also explains this.
>>>>
>>>> https://spark-summit.org/east-2017/events/making-structured-
>>>> streaming-ready-for-production-updates-and-future-directions/
>>>>
>>>> On Sat, May 13, 2017 at 3:42 AM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> HI All,
>>>>>
>>>>> What is the difference between sparkSession.readStream.format("kafka")
>>>>> vs sparkSession.readStream.format("json") ?
>>>>> I am sending json encoded messages in Kafka and I am not sure which
>>>>> one of the above I should use?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: what is the difference between json format vs kafka format?

2017-05-13 Thread kant kodali
athagata.das1...@gmail.com> wrote:
>>
>>> I understand the confusing. "json" format is for json encoded files
>>> being written in a directory. For Kafka, use "kafk" format. Then you decode
>>> the binary data as a json, you can use the function "from_json" (spark 2.1
>>> and above). Here is our blog post on this.
>>>
>>> https://databricks.com/blog/2017/04/26/processing-data-in-ap
>>> ache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>>
>>> And my talk also explains this.
>>>
>>> https://spark-summit.org/east-2017/events/making-structured-
>>> streaming-ready-for-production-updates-and-future-directions/
>>>
>>> On Sat, May 13, 2017 at 3:42 AM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> HI All,
>>>>
>>>> What is the difference between sparkSession.readStream.format("kafka")
>>>> vs sparkSession.readStream.format("json") ?
>>>> I am sending json encoded messages in Kafka and I am not sure which one
>>>> of the above I should use?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>
>>
>


Re: what is the difference between json format vs kafka format?

2017-05-13 Thread Tathagata Das
You cant do ".count()" directly on streaming DataFrames. This is because
"count" is an Action (remember RDD actions) that executes and returns a
result immediately which can be done only when the data is bounded (e.g.
batch/interactive queries). For streaming queries, you have to let it run
in the background continuously by starting it using writeStreamstart().

And for streaming queries, you have specify schema from before so that at
runtime it explicitly fails when schema is incorrectly changed.

In your case, what you can do is the following.
- Run a streaming query that converts the binary data from KAFka to string,
and saves as text files (i.e. *writeStream.format("text").start("path") *)

- Then run a batch query on the saved text files with format json
(i.e.  *read.format("json").load(path)
*)  with schema inference, and get the schema from the Dataset created (i.e
Dataset.schema ).

- Then you can run the real streaming query with from_json and the learnt
schema.

Make sure that the generated text file have sufficient data to infer the
full schema. Let me know if this works for you.

TD


On Sat, May 13, 2017 at 6:04 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi!
>
> Thanks for the response. Looks like from_json requires schema ahead of
> time. Is there any function I can use to infer schema from the json
> messages I am receiving through Kafka?  I tried with the code below however
> I get the following exception.
>
> org.apache.spark.sql.AnalysisException: Queries with streaming sources
> must be executed with writeStream.start()
>
>
> //code
> val ds = datasetRows.selectExpr("CAST(value AS STRING)").toJSON //
> datasetRows is of type DataSet that I get from loading from Kafka
>
> val foo = ds.select("*").count()
> val query = foo.writeStream.outputMode("complete").format("console").sta
> rt();
> query.awaitTermination()
>
> I am just trying to parse Json messages from Kafka put into Dataframe or
> Dataset without requiring the schema and doing the simple count.
>
> Thanks!
>
>
>
> On Sat, May 13, 2017 at 3:29 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> I understand the confusing. "json" format is for json encoded files being
>> written in a directory. For Kafka, use "kafk" format. Then you decode the
>> binary data as a json, you can use the function "from_json" (spark 2.1 and
>> above). Here is our blog post on this.
>>
>> https://databricks.com/blog/2017/04/26/processing-data-in-ap
>> ache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>
>> And my talk also explains this.
>>
>> https://spark-summit.org/east-2017/events/making-structured-
>> streaming-ready-for-production-updates-and-future-directions/
>>
>> On Sat, May 13, 2017 at 3:42 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> HI All,
>>>
>>> What is the difference between sparkSession.readStream.format("kafka")
>>> vs sparkSession.readStream.format("json") ?
>>> I am sending json encoded messages in Kafka and I am not sure which one
>>> of the above I should use?
>>>
>>> Thanks!
>>>
>>>
>>
>


Re: what is the difference between json format vs kafka format?

2017-05-13 Thread kant kodali
Hi!

Thanks for the response. Looks like from_json requires schema ahead of
time. Is there any function I can use to infer schema from the json
messages I am receiving through Kafka?  I tried with the code below however
I get the following exception.

org.apache.spark.sql.AnalysisException: Queries with streaming sources must
be executed with writeStream.start()


//code
val ds = datasetRows.selectExpr("CAST(value AS STRING)").toJSON //
datasetRows is of type DataSet that I get from loading from Kafka

val foo = ds.select("*").count()
val query =
foo.writeStream.outputMode("complete").format("console").start();
query.awaitTermination()

I am just trying to parse Json messages from Kafka put into Dataframe or
Dataset without requiring the schema and doing the simple count.

Thanks!



On Sat, May 13, 2017 at 3:29 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> I understand the confusing. "json" format is for json encoded files being
> written in a directory. For Kafka, use "kafk" format. Then you decode the
> binary data as a json, you can use the function "from_json" (spark 2.1 and
> above). Here is our blog post on this.
>
> https://databricks.com/blog/2017/04/26/processing-data-in-
> apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>
> And my talk also explains this.
>
> https://spark-summit.org/east-2017/events/making-structured-
> streaming-ready-for-production-updates-and-future-directions/
>
> On Sat, May 13, 2017 at 3:42 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> HI All,
>>
>> What is the difference between sparkSession.readStream.format("kafka")
>> vs sparkSession.readStream.format("json") ?
>> I am sending json encoded messages in Kafka and I am not sure which one
>> of the above I should use?
>>
>> Thanks!
>>
>>
>


Re: what is the difference between json format vs kafka format?

2017-05-13 Thread Tathagata Das
I understand the confusing. "json" format is for json encoded files being
written in a directory. For Kafka, use "kafk" format. Then you decode the
binary data as a json, you can use the function "from_json" (spark 2.1 and
above). Here is our blog post on this.

https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

And my talk also explains this.

https://spark-summit.org/east-2017/events/making-structured-streaming-ready-for-production-updates-and-future-directions/

On Sat, May 13, 2017 at 3:42 AM, kant kodali <kanth...@gmail.com> wrote:

> HI All,
>
> What is the difference between sparkSession.readStream.format("kafka") vs
> sparkSession.readStream.format("json") ?
> I am sending json encoded messages in Kafka and I am not sure which one of
> the above I should use?
>
> Thanks!
>
>


what is the difference between json format vs kafka format?

2017-05-13 Thread kant kodali
HI All,

What is the difference between sparkSession.readStream.format("kafka") vs
sparkSession.readStream.format("json") ?
I am sending json encoded messages in Kafka and I am not sure which one of
the above I should use?

Thanks!


Re: take the difference between two columns of a dataframe in pyspark

2017-05-08 Thread Gourav Sengupta
Hi,

convert then to temporary table and write a SQL, that will also work.


Regards,
Gourav

On Sun, May 7, 2017 at 2:49 AM, Zeming Yu <zemin...@gmail.com> wrote:

> Say I have the following dataframe with two numeric columns A and B,
> what's the best way to add a column showing the difference between the two
> columns?
>
> +-+--+
> |A| B|
> +-+--+
> |786.31999|786.12|
> |   786.12|786.12|
> |   786.42|786.12|
> |   786.72|786.12|
> |   786.92|786.12|
> |   786.92|786.12|
> |   786.72|786.12|
> |   786.72|786.12|
> |   827.72|786.02|
> |   827.72|786.02|
> +-+--+
>
>
> I could probably figure out how to do this vis UDF, but is UDF generally 
> slower?
>
>
> Thanks!
>
>


  1   2   3   >