how to refresh the loaded non-streaming dataframe for each steaming batch ?

2019-09-05 Thread Shyam P
Hi,

I am using spark-sql-2.4.1v to streaming in my PoC.

how to refresh the loaded dataframe from hdfs/cassandra table every time
new batch of stream processed ? What is the practice followed in general to
handle this kind of scenario?

Below is the SOF link for more details .

https://stackoverflow.com/questions/57815645/how-to-refresh-the-contents-of-non-streaming-dataframe

Thank you,
Shyam


org.apache.spark.sql.AnalysisException: Detected implicit cartesian product

2019-09-05 Thread kyunam
"left join" complains and tells me I need to turn on
"spark.sql.crossJoin.enabled=true".

But when I persist one dataframe, it runs fine.

Why do you have to "persist"?

org.apache.spark.sql.AnalysisException: Detected implicit cartesian product
for INNER join between logical plans

SELECT * FROM LHS left join RHS on LHS.R = RHS.R

The above happens in both Spark 2.3.3 and 2.4.4.

Thanks,
Kyunam



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Start point to read source codes

2019-09-05 Thread David Zhou
Hi Hichame,
Thanks a lot. I forked it. There are lots of codes. Need documents to guide
me which part I should start from.

On Thu, Sep 5, 2019 at 1:30 PM Hichame El Khalfi 
wrote:

> Hey David,
>
> You can the source code on GitHub:
> https://github.com/apache/spark
>
> Hope this helps,
>
> Hichame
> *From:* zhou10...@gmail.com
> *Sent:* September 5, 2019 4:11 PM
> *To:* user@spark.apache.org
> *Subject:* Start point to read source codes
>
> Hi,
>
> I want to read the source codes. Is there any doc, wiki or book which
> introduces the source codes.
>
> Thanks in advance.
>
> David
>


Re: Start point to read source codes

2019-09-05 Thread Hichame El Khalfi
Hey David,

You can the source code on GitHub:
https://github.com/apache/spark

Hope this helps,

Hichame
From: zhou10...@gmail.com
Sent: September 5, 2019 4:11 PM
To: user@spark.apache.org
Subject: Start point to read source codes


Hi,

I want to read the source codes. Is there any doc, wiki or book which 
introduces the source codes.

Thanks in advance.

David


Start point to read source codes

2019-09-05 Thread da zhou
Hi,

I want to read the source codes. Is there any doc, wiki or book which
introduces the source codes.

Thanks in advance.

David


Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Sethupathi T
Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use
spark streaming (KafkaUtils.createDirectStream) than structured streaming
by following this document
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
re-iterating the issue again for better understanding.
spark-streaming-kafka-0-10

kafka
connector prefix "spark-executor" + group.id for executors, driver uses
original group id.

*Here is the code where executor construct executor specific group id *

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
 line # 212,


*It would be great if you could provide the explanation to the following
questions.*

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10

  library by removing the group id prefix? at line # 212 in KafkaUtils.scala
#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
advisable to use in production?

*Here is the my spark streaming code snippet*

val kafkaParams = Map[String, Object](

  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,

  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,

  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",

  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),

  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],

  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[MessageDeserializer],

  "security.protocol" -> "SSL",

  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,

  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,

  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,

  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,

  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD

)

val stream = KafkaUtils.createDirectStream[String, Message](

  ssc,

  PreferConsistent,

  Subscribe[String, Message](topicsSet, kafkaParams)

)

---
Thanks in Advance,
Sethupathi.T


On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
wrote:

> Hi,
>
> Let me share Spark 3.0 documentation part (Structured Streaming and not
> DStreams what you've mentioned but still relevant):
>
> kafka.group.id string none streaming and batch The Kafka group id to use
> in Kafka consumer while reading from Kafka. Use this with caution. By
> default, each query generates a unique group id for reading data. This
> ensures that each Kafka source has its own consumer group that does not
> face interference from any other consumer, and therefore can read all of
> the partitions of its subscribed topics. In some scenarios (for example,
> Kafka group-based authorization), you may want to use a specific authorized
> group id to read data. You can optionally set the group id. However, do
> this with extreme caution as it can cause unexpected behavior. Concurrently
> running queries (both, batch and streaming) or sources with the same group
> id are likely interfere with each other causing each query to read only
> part of the data. This may also occur when queries are started/restarted in
> quick succession. To minimize such issues, set the Kafka consumer session
> timeout (by setting option "kafka.session.timeout.ms") to be very small.
> When this is set, option "groupIdPrefix" will be ignored.
> I think it answers your questions.
>
> As a general suggestion maybe it worth to revisit Spark 3.0 because
> Structured Streaming has another interesting feature:
> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
> consumer group identifiers (`group.id`) that are generated by structured
> streaming queries. If "kafka.group.id" is set, this option will be
> ignored.
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
>  wrote:
>
>> Hi Team,
>>
>> We have secured Kafka cluster (which only allows to consume from the
>> pre-configured, authorized consumer group), there is a scenario where we
>> want to use spark streaming to consume from secured kafka. so we have
>> decided to use spark-streaming-kafka-0-10
>>  
>> (it
>> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
>> deploy the application in cluster mode, i realized that the actual group id
>> has been prefixed with "spark-executor" in executor configuration (executor
>> as trying to connect to kafka with "spark-executor" + actual group id,
>> which is not really exists and getting exception).
>>
>> *Here is the code where executor construct executor specific group id *
>>
>>
>> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>  line
>> # 212,
>>
>> *Here 

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Sethupathi T
Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use
spark streaming (KafkaUtils.createDirectStream) than structured streaming
by following this document
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
re-iterating the issue again for better understanding.
spark-streaming-kafka-0-10

kafka
connector prefix "spark-executor" + group.id for executors, driver uses
original group id.

*Here is the code where executor construct executor specific group id *

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
line
# 212,

*It would be great if you could provide the explanation to the following
questions.*

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10

library by removing the group id prefix? at line # 212 in KafkaUtils.scala

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
advisable to use in production?

*Here is the my spark streaming code snippet*

val kafkaParams = Map[String, Object](

  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[MessageDeserializer],
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
  ssc,
  PreferConsistent,
  Subscribe[String, Message](topicsSet, kafkaParams)
)

---
Thanks in Advance,
Sethupathi.T

On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
wrote:

> Hi,
>
> Let me share Spark 3.0 documentation part (Structured Streaming and not
> DStreams what you've mentioned but still relevant):
>
> kafka.group.id string none streaming and batch The Kafka group id to use
> in Kafka consumer while reading from Kafka. Use this with caution. By
> default, each query generates a unique group id for reading data. This
> ensures that each Kafka source has its own consumer group that does not
> face interference from any other consumer, and therefore can read all of
> the partitions of its subscribed topics. In some scenarios (for example,
> Kafka group-based authorization), you may want to use a specific authorized
> group id to read data. You can optionally set the group id. However, do
> this with extreme caution as it can cause unexpected behavior. Concurrently
> running queries (both, batch and streaming) or sources with the same group
> id are likely interfere with each other causing each query to read only
> part of the data. This may also occur when queries are started/restarted in
> quick succession. To minimize such issues, set the Kafka consumer session
> timeout (by setting option "kafka.session.timeout.ms") to be very small.
> When this is set, option "groupIdPrefix" will be ignored.
> I think it answers your questions.
>
> As a general suggestion maybe it worth to revisit Spark 3.0 because
> Structured Streaming has another interesting feature:
> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
> consumer group identifiers (`group.id`) that are generated by structured
> streaming queries. If "kafka.group.id" is set, this option will be
> ignored.
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
>  wrote:
>
>> Hi Team,
>>
>> We have secured Kafka cluster (which only allows to consume from the
>> pre-configured, authorized consumer group), there is a scenario where we
>> want to use spark streaming to consume from secured kafka. so we have
>> decided to use spark-streaming-kafka-0-10
>>  
>> (it
>> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
>> deploy the application in cluster mode, i realized that the actual group id
>> has been prefixed with "spark-executor" in executor configuration (executor
>> as trying to connect to kafka with "spark-executor" + actual group id,
>> which is not really exists and getting exception).
>>
>> *Here is the code where executor construct executor specific group id *
>>
>>
>> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>  line
>> # 212,
>>
>> *Here are my Questions*
>>

Re: Collecting large dataset

2019-09-05 Thread Marcin Tustin
Stop using collect for this purpose. Either continue your further
processing in spark (maybe you need to use streaming), or sink the data to
something that can accept the data (gcs/s3/azure
storage/redshift/elasticsearch/whatever), and have further processing read
from that sink.

On Thu, Sep 5, 2019 at 2:23 PM Rishikesh Gawade 
wrote:

> *This Message originated outside your organization.*
> --
> Hi.
> I have been trying to collect a large dataset(about 2 gb in size, 30
> columns, more than a million rows) onto the driver side. I am aware that
> collecting such a huge dataset isn't suggested, however, the application
> within which the spark driver is running requires that data.
> While collecting the dataframe, the spark job throws an error,
> TaskResultLost( resultset lost from blockmanager).
> I searched for solutions around this and set the following properties:
> spark.blockManager.port, maxResultSize to 0(unlimited), 
> spark.driver.blockManager.port
> and the application within which spark driver is running has 28 gb of max
> heap size.
> And yet the error arises again.
> There are 22 executors running in my cluster.
> Is there any config/necessary step that i am missing before collecting
> such large data?
> Or is there any other effective approach that would guarantee collecting
> such large data without failure?
>
> Thanks,
> Rishikesh
>


Collecting large dataset

2019-09-05 Thread Rishikesh Gawade
Hi.
I have been trying to collect a large dataset(about 2 gb in size, 30
columns, more than a million rows) onto the driver side. I am aware that
collecting such a huge dataset isn't suggested, however, the application
within which the spark driver is running requires that data.
While collecting the dataframe, the spark job throws an error,
TaskResultLost( resultset lost from blockmanager).
I searched for solutions around this and set the following properties:
spark.blockManager.port, maxResultSize to 0(unlimited),
spark.driver.blockManager.port
and the application within which spark driver is running has 28 gb of max
heap size.
And yet the error arises again.
There are 22 executors running in my cluster.
Is there any config/necessary step that i am missing before collecting such
large data?
Or is there any other effective approach that would guarantee collecting
such large data without failure?

Thanks,
Rishikesh


Re: read image or binary files / spark 2.3

2019-09-05 Thread Peter Liu
Hello experts,

I have quick question: which API allows me to read images files or binary
files (for SparkSession.readStream) from a local/hadoop file system in
Spark 2.3?

I have been browsing the following documentations and googling for it and
didn't find a good example/documentation:

https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html
https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.package

any hint/help would be very much appreciated!

thanks!

Peter


Re: Tune hive query launched thru spark-yarn job.

2019-09-05 Thread Himali Patel
Hi Sathi,

Thanks for a quick reply, so this ( list of some epoch times in IN clause) was 
part of 30 days aggregation already. As per our input to output aggregation 
ratio, our cardinality is too high. So we require query tuning kind of thing. 
As we can’t assign additional resource for this job.


From: Sathi Chowdhury 
Date: Thursday, 5 September 2019 at 8:10 PM
To: Himali Patel , "user@spark.apache.org" 

Subject: Re: Tune hive query launched thru spark-yarn job.

What I can immediately think of is,
as you are doing IN in the where clause for a series of timestamps, if you can 
consider breaking them and for each epoch timestamp
You can load your results to an intermediate staging table and then do a final 
aggregate from that table keeping the group by same. As it is sum and can be 
done in two steps.
hth






On Thursday, September 5, 2019, 5:10 AM, Himali Patel 
 wrote:

Hello all,



We have one use-case where we are aggregating billion of rows. It does huge 
shuffle.

Example :

As per ‘Job’ tab on yarn UI

When Input size is 350 G something,  shuffle size >3 TBs. This increases 
Non-DFS usage beyond warning limit and thus affecting entire cluster.



It seems we need to tune our query / resources. Any suggestions ?



1.

Our data is high in cardinality :

# input rows are ~15 billion

# output rows are ~13 billion



2.

Spark version is 1.6

Hive is 1.1

It’s CDH.

We query using hive context in spark job. (yarn is resource manager)

Hive context has configs as :

.setConf("hive.exec.dynamic.partition.mode","nonstrict")

.setConf("hive.exec.dynamic.partition","true")

.setConf("hive.exec.stagingdir","/tmp/hive/")



3.

Our aggregation is done using single query as below :

SELECT

,

SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,

(c1, 'HEX', 'UNION') AS c1, 
(c2, 'HEX', 'UNION') AS c2, 
(c3, 'HEX', 'UNION') AS c3, 
(c4, 'HEX', 'UNION') AS c4, 
(c5, 'HEX', 'UNION') AS c5,

  AS ,  AS 

FROM 

WHERE  IN ( , ,  , 
 ,  ,  ,  , )

GROUP BY .



4.

Configs are :

spark.master=yarn-client
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<>

spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000





How to tune this job ?






















Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Gabor Somogyi
Hi,

Let me share Spark 3.0 documentation part (Structured Streaming and not
DStreams what you've mentioned but still relevant):

kafka.group.id string none streaming and batch The Kafka group id to use in
Kafka consumer while reading from Kafka. Use this with caution. By default,
each query generates a unique group id for reading data. This ensures that
each Kafka source has its own consumer group that does not face
interference from any other consumer, and therefore can read all of the
partitions of its subscribed topics. In some scenarios (for example, Kafka
group-based authorization), you may want to use a specific authorized group
id to read data. You can optionally set the group id. However, do this with
extreme caution as it can cause unexpected behavior. Concurrently running
queries (both, batch and streaming) or sources with the same group id are
likely interfere with each other causing each query to read only part of
the data. This may also occur when queries are started/restarted in quick
succession. To minimize such issues, set the Kafka consumer session timeout
(by setting option "kafka.session.timeout.ms") to be very small. When this
is set, option "groupIdPrefix" will be ignored.
I think it answers your questions.

As a general suggestion maybe it worth to revisit Spark 3.0 because
Structured Streaming has another interesting feature:
groupIdPrefix string spark-kafka-source streaming and batch Prefix of
consumer group identifiers (`group.id`) that are generated by structured
streaming queries. If "kafka.group.id" is set, this option will be ignored.

BR,
G


On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
 wrote:

> Hi Team,
>
> We have secured Kafka cluster (which only allows to consume from the
> pre-configured, authorized consumer group), there is a scenario where we
> want to use spark streaming to consume from secured kafka. so we have
> decided to use spark-streaming-kafka-0-10
>  
> (it
> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
> deploy the application in cluster mode, i realized that the actual group id
> has been prefixed with "spark-executor" in executor configuration (executor
> as trying to connect to kafka with "spark-executor" + actual group id,
> which is not really exists and getting exception).
>
> *Here is the code where executor construct executor specific group id *
>
>
> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>  line
> # 212,
>
> *Here are my Questions*
>
> #1 What was the specific reason for prefixing group id in executor ?
>
> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
> 
> library by removing the group id prefix?
>
> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
> advisable to use in production?
>
> *Here is the my spark streaming code snippet*
>
> val kafkaParams = Map[String, Object](
>
>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
> classOf[MessageDeserializer],
>   "security.protocol" -> "SSL",
>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
> )
>
> val stream = KafkaUtils.createDirectStream[String, Message](
>   ssc,
>   PreferConsistent,
>   Subscribe[String, Message](topicsSet, kafkaParams)
> )
>
> ---
> Thanks in Advance,
> Sethupathi.T
>


Re: Tune hive query launched thru spark-yarn job.

2019-09-05 Thread Sathi Chowdhury
What I can immediately think of is, 
as you are doing IN in the where clause for a series of timestamps, if you can 
consider breaking them and for each epoch timestampYou can load your results to 
an intermediate staging table and then do a final aggregate from that table 
keeping the group by same. As it is sum and can be done in two steps.hth



On Thursday, September 5, 2019, 5:10 AM, Himali Patel 
 wrote:

 
Hello all,
 
  
 
We have one use-case where we are aggregating billion of rows. It does huge 
shuffle.
 
Example : 
 
As per ‘Job’ tab on yarn UI
 
When Input size is 350 G something,  shuffle size >3 TBs. This increases 
Non-DFS usage beyond warning limit and thus affecting entire cluster.
 
  
 
It seems we need to tune our query / resources. Any suggestions ?
 
  
 
1.
 
Our data is high in cardinality :
 
# input rows are ~15 billion
 
# output rows are ~13 billion
 
  
 
2.
 
Spark version is 1.6
 
Hive is 1.1
 
It’s CDH.
 
We query using hive context in spark job. (yarn is resource manager)
 
Hive context has configs as :
 
.setConf("hive.exec.dynamic.partition.mode","nonstrict")
 
.setConf("hive.exec.dynamic.partition","true")
 
.setConf("hive.exec.stagingdir","/tmp/hive/")
 
  
 
3.
 
Our aggregation is done using single query as below :
 
SELECT 
 
,
 
SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,
 
(c1, 'HEX', 'UNION') AS c1, 
(c2, 'HEX', 'UNION') AS c2, 
(c3, 'HEX', 'UNION') AS c3, 
(c4, 'HEX', 'UNION') AS c4, 
(c5, 'HEX', 'UNION') AS c5, 
 
  AS ,  AS 
 
FROM 
 
WHERE  IN ( , ,  , 
 ,  ,  ,  , )
 
GROUP BY .
 
  
 
4.
 
Configs are : 
 spark.master=yarn-client 
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<> spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000       How to tune this job ?                  
        

 
  
 




Tune hive query launched thru spark-yarn job.

2019-09-05 Thread Himali Patel
Hello all,

We have one use-case where we are aggregating billion of rows. It does huge 
shuffle.
Example :
As per ‘Job’ tab on yarn UI
When Input size is 350 G something,  shuffle size >3 TBs. This increases 
Non-DFS usage beyond warning limit and thus affecting entire cluster.

It seems we need to tune our query / resources. Any suggestions ?

1.
Our data is high in cardinality :
# input rows are ~15 billion
# output rows are ~13 billion

2.
Spark version is 1.6
Hive is 1.1
It’s CDH.
We query using hive context in spark job. (yarn is resource manager)
Hive context has configs as :

.setConf("hive.exec.dynamic.partition.mode","nonstrict")

.setConf("hive.exec.dynamic.partition","true")

.setConf("hive.exec.stagingdir","/tmp/hive/")

3.
Our aggregation is done using single query as below :
SELECT
,
SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,
(c1, 'HEX', 'UNION') AS c1, 
(c2, 'HEX', 'UNION') AS c2, 
(c3, 'HEX', 'UNION') AS c3, 
(c4, 'HEX', 'UNION') AS c4, 
(c5, 'HEX', 'UNION') AS c5,
  AS ,  AS 
FROM 
WHERE  IN ( , ,  , 
 ,  ,  ,  , )
GROUP BY .

4.
Configs are :

spark.master=yarn-client
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<>

spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000





How to tune this job ?





















Test mail

2019-09-05 Thread Himali Patel



[Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Sethupathi T
Hi Team,

We have secured Kafka cluster (which only allows to consume from the
pre-configured, authorized consumer group), there is a scenario where we
want to use spark streaming to consume from secured kafka. so we have
decided to use spark-streaming-kafka-0-10

(it
supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
deploy the application in cluster mode, i realized that the actual group id
has been prefixed with "spark-executor" in executor configuration (executor
as trying to connect to kafka with "spark-executor" + actual group id,
which is not really exists and getting exception).

*Here is the code where executor construct executor specific group id *

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
line
# 212,

*Here are my Questions*

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10

library by removing the group id prefix?

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
advisable to use in production?

*Here is the my spark streaming code snippet*

val kafkaParams = Map[String, Object](

  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[MessageDeserializer],
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
  ssc,
  PreferConsistent,
  Subscribe[String, Message](topicsSet, kafkaParams)
)

---
Thanks in Advance,
Sethupathi.T


How to query StructField's metadata in spark sql?

2019-09-05 Thread kyunam
Using SQL, is it possible to query a column's metadata?

Thanks,
Kyunam



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org