Spark streaming connecting to two kafka clusters

2018-07-17 Thread Sathi Chowdhury
Hi,My question is about ability to integrate spark streaming with multiple 
clusters.Is it a supported use case. An example of that is that two topics 
owned by different group and they have their own kakka infra .Can i have two 
dataframes as a result of spark.readstream listening to different kafka 
clueters in the same spark screaming job?Any one has solved this usecase 
before? 

Thanks.Sathi

Re: Dataset - withColumn and withColumnRenamed that accept Column type

2018-07-17 Thread Sathi Chowdhury
Hi,My question is about ability to integrate spark streaming with multiple 
clusters.Is it a supported use case. An example of that is that two topics 
owned by different group and they have their own kakka infra .Can i have two 
dataframes as a result of spark.readstream listening to different kafka 
clueters in the same spark screaming job?Any one has solved this usecase 
before? 

Thanks.Sathi

Re: Tune hive query launched thru spark-yarn job.

2019-09-05 Thread Sathi Chowdhury
What I can immediately think of is, 
as you are doing IN in the where clause for a series of timestamps, if you can 
consider breaking them and for each epoch timestampYou can load your results to 
an intermediate staging table and then do a final aggregate from that table 
keeping the group by same. As it is sum and can be done in two steps.hth



On Thursday, September 5, 2019, 5:10 AM, Himali Patel 
 wrote:

 
Hello all,
 
  
 
We have one use-case where we are aggregating billion of rows. It does huge 
shuffle.
 
Example : 
 
As per ‘Job’ tab on yarn UI
 
When Input size is 350 G something,  shuffle size >3 TBs. This increases 
Non-DFS usage beyond warning limit and thus affecting entire cluster.
 
  
 
It seems we need to tune our query / resources. Any suggestions ?
 
  
 
1.
 
Our data is high in cardinality :
 
# input rows are ~15 billion
 
# output rows are ~13 billion
 
  
 
2.
 
Spark version is 1.6
 
Hive is 1.1
 
It’s CDH.
 
We query using hive context in spark job. (yarn is resource manager)
 
Hive context has configs as :
 
.setConf("hive.exec.dynamic.partition.mode","nonstrict")
 
.setConf("hive.exec.dynamic.partition","true")
 
.setConf("hive.exec.stagingdir","/tmp/hive/")
 
  
 
3.
 
Our aggregation is done using single query as below :
 
SELECT 
 
,
 
SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,
 
(c1, 'HEX', 'UNION') AS c1, 
(c2, 'HEX', 'UNION') AS c2, 
(c3, 'HEX', 'UNION') AS c3, 
(c4, 'HEX', 'UNION') AS c4, 
(c5, 'HEX', 'UNION') AS c5, 
 
  AS ,  AS 
 
FROM 
 
WHERE  IN ( , ,  , 
 ,  ,  ,  , )
 
GROUP BY .
 
  
 
4.
 
Configs are : 
 spark.master=yarn-client 
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<> spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000       How to tune this job ?                  
        

 
  
 




Re: BUG :: UI Spark

2024-05-26 Thread Sathi Chowdhury
Can you please explain how did you realize it’s wrong? Did you check cloudwatch 
for the same metrics and compare? Also are you using do.cache() and expecting 
that shuffle read/write to go away ?


Sent from Yahoo Mail for iPhone


On Sunday, May 26, 2024, 7:53 AM, Prem Sahoo  wrote:

Can anyone please assist me ?
On Fri, May 24, 2024 at 12:29 AM Prem Sahoo  wrote:

Does anyone have a clue ?
On Thu, May 23, 2024 at 11:40 AM Prem Sahoo  wrote:

Hello Team,in spark DAG UI , we have Stages tab. Once you click on each stage 
you can view the tasks.
In each task we have a column "ShuffleWrite Size/Records " that column prints 
wrong data when it gets the data from cache/persist . it typically will show 
the wrong record number though the data size is correct for e.g  3.2G/ 7400 
which is wrong . 
please advise.