[pyspark 2.4.3] small input csv ~3.4GB gets 40K tasks created

2019-08-29 Thread Rishi Shah
Hi All,

I am scratching my head against this weird behavior, where df (read from
.csv) of size ~3.4GB gets cross joined with itself and creates 50K tasks!
How to correlate input size with number of tasks in this case?

-- 
Regards,

Rishi Shah


Re: Structured Streaming Dataframe Size

2019-08-29 Thread Tathagata Das
Responses inline.

On Wed, Aug 28, 2019 at 8:42 AM Nick Dawes  wrote:

> Thank you, TD. Couple of follow up questions please.
>
> 1) "It only keeps around the minimal intermediate state data"
>
> How do you define "minimal" here? Is there a configuration property to
> control the time or size of Streaming Dataframe?
>
Thats what watermarks are for. You can tune how much late data to consider
and accordingly how much of the past information need to be buffered as the
state. More lateness tolerance = more state in memory to manage.
Shameful plug but see my deep dive talk -
https://databricks.com/session/a-deep-dive-into-stateful-stream-processing-in-structured-streaming


>
> 2) I'm not writing anything out to any database or S3. My requirement is
> to find out a count (real-time) in a 1 hour window. I would like to get
> this count from a BI tool. So can register as a temp view and access from
> BI tool?
>
> I tried something like this In my Streaming application
>
> AggStreamingDF.createOrReplaceGlobalTempView("streaming_table")
>
> Then, In BI tool, I queried like this...
>
> select * from streaming_table
>
> Error:  Queries with streaming sources must be executed with
> writeStream.start()
>
> Any suggestions to make this work?
>
>
There are two ways of doing this

1. Write the aggregates to an in-memory table (driver's memory) and query
that.

*
AggStreamingDF.write.format("memory").outputMode("complete").queryName("myAggTable").start()*
Then
*select * from **myAggTable*

2. Write the aggregates to files using Delta Lake 
project (docs ).

*AggStreamingDF.write.format("delta").outputMode("complete").start("path/to/delta/table")*
Then you can query the delta table using Spark.

*spark.read.format("delta").load("path/to/delta/table").createOrReplaceGlobalTempView("myAggTable")
*
Then
*select * from **myAggTable*
This will give awesome ACID transactional guarantees between reads and
writes. Read more on the linked website (full disclosure, I work on that
project as well).



> Thank you very much for your help!
>
>
> On Tue, Aug 27, 2019, 6:42 PM Tathagata Das 
> wrote:
>
>>
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts
>>
>> *Note that Structured Streaming does not materialize the entire table*.
>>> It reads the latest available data from the streaming data source,
>>> processes it incrementally to update the result, and then discards the
>>> source data. It only keeps around the minimal intermediate *state* data
>>> as required to update the result (e.g. intermediate counts in the earlier
>>> example).
>>>
>>
>>
>> On Tue, Aug 27, 2019 at 1:21 PM Nick Dawes  wrote:
>>
>>> I have a quick newbie question.
>>>
>>> Spark Structured Streaming creates an unbounded dataframe that keeps
>>> appending rows to it.
>>>
>>> So what's the max size of data it can hold? What if the size becomes
>>> bigger than the JVM? Will it spill to disk? I'm using S3 as storage. So
>>> will it write temp data on S3 or on local file system of the cluster?
>>>
>>> Nick
>>>
>>


Re: Will this use-case can be handled with spark-sql streaming and cassandra?

2019-08-29 Thread Jörn Franke
1) this is not a use case, but a technical solution. Hence nobody can tell you 
if it make sense or not
2) do an upsert in Cassandra. However keep in mind that the application 
submitting to the Kafka topic and the one consuming from the Kafka topic need 
to ensure that they process messages in the right order. This may not be always 
guaranteed, eg in case of errors, and they need to avoid overwriting new data 
with old data. This is also not a Kafka setting that has to be dealt with at 
producer and consumer level

> Am 29.08.2019 um 13:21 schrieb Shyam P :
> 
> Hi,
> I need to do a PoC for a business use-case.
> 
> Use case : Need to update a record in Cassandra table if exists.
> 
> Will spark streaming support compare each record and update existing 
> Cassandra record ?
> 
> For each record received from kakfa topic , If I want to check and compare 
> each record whether its already there in Cassandra or not , if yes , update 
> the record else insert a new record.
> 
> How can be this done using spark-structured streaming and cassandra? any 
> snippet or sample if you have.
> 
> Thank you,
> 
> Shyam


Re: Will this use-case can be handled with spark-sql streaming and cassandra?

2019-08-29 Thread Aayush Ranaut
What exactly is your requirement? 
Is the read before write mandatory?
Are you maintaining states in Cassandra?


Regards
Prathmesh Ranaut
https://linkedin.com/in/prathmeshranaut


> On Aug 29, 2019, at 3:35 PM, Shyam P  wrote:
> 
> 
> thanks Aayush.     For every record I need to get the data from cassandra 
> table and update it ? Else it may not update the existing record.
> 
>     What is this datastax-spark-connector ? is that not a "Cassandra 
> connector library written for spark"?
> If not , how to write ourselves.   
> Where and how to start ? Can you please guide me.
> 
> 
> 
> Thank you.
> Shyam
> 
> 
> 
> 
> On Thu, Aug 29, 2019 at 5:03 PM Aayush Ranaut > wrote:
> 
>> Cassandra is upsert, you should be able to do what you need with a single 
>> statement unless you’re looking to maintain counters. 
>> 
>> I’m not sure if there is a Cassandra connector library written for spark 
>> streaming because we wrote one ourselves when we wanted to do the same.
>> 
>> Regards
>> Prathmesh Ranaut
>> https://linkedin.com/in/prathmeshranaut
>> 
>> 
>> On Aug 29, 2019, at 7:21 AM, Shyam P >> wrote:
>> 
>> 
>>> Hi,
>>> I need to do a PoC for a business use-case.
>>> 
>>> Use case : Need to update a record in Cassandra table if exists.
>>> 
>>> Will spark streaming support compare each record and update existing 
>>> Cassandra record ?
>>> 
>>> For each record received from kakfa topic , If I want to check and compare 
>>> each record whether its already there in Cassandra or not , if yes , update 
>>> the record else insert a new record.
>>> 
>>> How can be this done using spark-structured streaming and cassandra? any 
>>> snippet or sample if you have.
>>> 
>>> Thank you,
>>> 
>>> Shyam
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> 
> 


Re: Control Sqoop job from Spark job

2019-08-29 Thread Chetan Khatri
Sorry,
I call sqoop job from above function. Can you help me to resolve this.

Thanks

On Fri, Aug 30, 2019 at 1:31 AM Chetan Khatri 
wrote:

> Hi Users,
> I am launching a Sqoop job from Spark job and would like to FAIL Spark job
> if Sqoop job fails.
>
> def executeSqoopOriginal(serverName: String, schemaName: String, username: 
> String, password: String,
>  query: String, splitBy: String, fetchSize: Int, numMappers: 
> Int, targetDir: String, jobName: String, dateColumns: String) = {
>
>   val connectionString = "jdbc:sqlserver://" + serverName + ";" + 
> "databaseName=" + schemaName
>   var parameters = Array("import")
>   parameters = parameters :+ "-Dmapreduce.job.user.classpath.first=true"
>   parameters = parameters :+ "--connect"
>   parameters = parameters :+ connectionString
>   parameters = parameters :+ "--mapreduce-job-name"
>   parameters = parameters :+ jobName
>   parameters = parameters :+ "--username"
>   parameters = parameters :+ username
>   parameters = parameters :+ "--password"
>   parameters = parameters :+ password
>   parameters = parameters :+ "--hadoop-mapred-home"
>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop-mapreduce/"
>   parameters = parameters :+ "--hadoop-home"
>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop/"
>   parameters = parameters :+ "--query"
>   parameters = parameters :+ query
>   parameters = parameters :+ "--split-by"
>   parameters = parameters :+ splitBy
>   parameters = parameters :+ "--fetch-size"
>   parameters = parameters :+ fetchSize.toString
>   parameters = parameters :+ "--num-mappers"
>   parameters = parameters :+ numMappers.toString
>   if (dateColumns.length() > 0) {
> parameters = parameters :+ "--map-column-java"
> parameters = parameters :+ dateColumns
>   }
>   parameters = parameters :+ "--target-dir"
>   parameters = parameters :+ targetDir
>   parameters = parameters :+ "--delete-target-dir"
>   parameters = parameters :+ "--as-avrodatafile"
>
> }
>
>


Control Sqoop job from Spark job

2019-08-29 Thread Chetan Khatri
Hi Users,
I am launching a Sqoop job from Spark job and would like to FAIL Spark job
if Sqoop job fails.

def executeSqoopOriginal(serverName: String, schemaName: String,
username: String, password: String,
 query: String, splitBy: String, fetchSize: Int,
numMappers: Int, targetDir: String, jobName: String, dateColumns:
String) = {

  val connectionString = "jdbc:sqlserver://" + serverName + ";" +
"databaseName=" + schemaName
  var parameters = Array("import")
  parameters = parameters :+ "-Dmapreduce.job.user.classpath.first=true"
  parameters = parameters :+ "--connect"
  parameters = parameters :+ connectionString
  parameters = parameters :+ "--mapreduce-job-name"
  parameters = parameters :+ jobName
  parameters = parameters :+ "--username"
  parameters = parameters :+ username
  parameters = parameters :+ "--password"
  parameters = parameters :+ password
  parameters = parameters :+ "--hadoop-mapred-home"
  parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop-mapreduce/"
  parameters = parameters :+ "--hadoop-home"
  parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop/"
  parameters = parameters :+ "--query"
  parameters = parameters :+ query
  parameters = parameters :+ "--split-by"
  parameters = parameters :+ splitBy
  parameters = parameters :+ "--fetch-size"
  parameters = parameters :+ fetchSize.toString
  parameters = parameters :+ "--num-mappers"
  parameters = parameters :+ numMappers.toString
  if (dateColumns.length() > 0) {
parameters = parameters :+ "--map-column-java"
parameters = parameters :+ dateColumns
  }
  parameters = parameters :+ "--target-dir"
  parameters = parameters :+ targetDir
  parameters = parameters :+ "--delete-target-dir"
  parameters = parameters :+ "--as-avrodatafile"

}


Re: Will this use-case can be handled with spark-sql streaming and cassandra?

2019-08-29 Thread Shyam P
thanks Aayush.
 For every record I need to get the data from cassandra table and
update it ? Else it may not update the existing record.

What is this datastax-spark-connector ? is that not a "Cassandra
connector library written for spark"?
If not , how to write ourselves.
Where and how to start ? Can you please guide me.

Thank you.
Shyam


On Thu, Aug 29, 2019 at 5:03 PM Aayush Ranaut 
wrote:

> Cassandra is upsert, you should be able to do what you need with a single
> statement unless you’re looking to maintain counters.
>
> I’m not sure if there is a Cassandra connector library written for spark
> streaming because we wrote one ourselves when we wanted to do the same.
>
> Regards
> Prathmesh Ranaut
> https://linkedin.com/in/prathmeshranaut
>
> On Aug 29, 2019, at 7:21 AM, Shyam P  wrote:
>
> Hi,
>
> I need to do a PoC for a business use-case.
>
> *Use case :* Need to update a record in Cassandra table if exists.
>
> Will spark streaming support compare each record and update existing
> Cassandra record ?
>
> For each record received from kakfa topic , If I want to check and compare
> each record whether its already there in Cassandra or not , if yes , update
> the record else insert a new record.
>
> How can be this done using spark-structured streaming and cassandra? any
> snippet or sample if you have.
>
> Thank you,
>
> Shyam
>
>


Re: Will this use-case can be handled with spark-sql streaming and cassandra?

2019-08-29 Thread Aayush Ranaut
Cassandra is upsert, you should be able to do what you need with a single 
statement unless you’re looking to maintain counters. 

I’m not sure if there is a Cassandra connector library written for spark 
streaming because we wrote one ourselves when we wanted to do the same.

Regards
Prathmesh Ranaut
https://linkedin.com/in/prathmeshranaut

> On Aug 29, 2019, at 7:21 AM, Shyam P  wrote:
> 
> Hi,
> I need to do a PoC for a business use-case.
> 
> Use case : Need to update a record in Cassandra table if exists.
> 
> Will spark streaming support compare each record and update existing 
> Cassandra record ?
> 
> For each record received from kakfa topic , If I want to check and compare 
> each record whether its already there in Cassandra or not , if yes , update 
> the record else insert a new record.
> 
> How can be this done using spark-structured streaming and cassandra? any 
> snippet or sample if you have.
> 
> Thank you,
> 
> Shyam


Will this use-case can be handled with spark-sql streaming and cassandra?

2019-08-29 Thread Shyam P
Hi,

I need to do a PoC for a business use-case.

*Use case :* Need to update a record in Cassandra table if exists.

Will spark streaming support compare each record and update existing
Cassandra record ?

For each record received from kakfa topic , If I want to check and compare
each record whether its already there in Cassandra or not , if yes , update
the record else insert a new record.

How can be this done using spark-structured streaming and cassandra? any
snippet or sample if you have.

Thank you,

Shyam


Re: [Spark SQL] failure in query

2019-08-29 Thread Subash Prabakar
What is the no of part files in that big table? And what is the
distribution of request ID? Is the variance of the column is less or huge?
Because partitionBy clause will move data with same request ID to one
executor. If the data is huge it might put load on executor.

On Sun, 25 Aug 2019 at 16:56, Tzahi File  wrote:

> Hi,
>
> I encountered some issue to run a spark SQL query, and will happy to some
> advice.
> I'm trying to run a query on a very big data set (around 1.5TB) and it
> getting failures in all of my tries. A template of the query is as below:
> insert overwrite table partition(part)
> select  /*+ BROADCAST(c) */
>  *, row_number() over (partition by request_id order by economic_value
> DESC) row_number
> from (
> select a,b,c,d,e
> from table (raw data 1.5TB))
> left join small_table
>
> The heavy part in this query is the window function.
> I'm using 65 spots of type 5.4x.large.
> The spark settings:
> --conf spark.driver.memory=10g
> --conf spark.sql.shuffle.partitions=1200
> --conf spark.executor.memory=22000M
> --conf spark.shuffle.service.enabled=false
>
>
> You can see below an example of the errors that I get:
> [image: image.png]
>
>
> any suggestions?
>
>
>
> Thanks!
> Tzahi
>


In Catalyst expressions, when is it appropriate to use codegen

2019-08-29 Thread Arwin Tio
Hi,

I am exploring the usage of Catalyst expression functions to avoid the 
performance issues associated with UDFs.

One thing that I noticed is that there is a trait called CodegenFallback and 
there are some Catalyst expressions in Spark that inherit from it [0].

My question is, is there a technical limitation for some Catalyst expressions, 
like datetimeExpressions, that make codegen unsuitable? How do you evaluate 
whether or not a Catalyst expression should use codegen?

Thanks,

Arwin

[0] 
https://github.com/apache/spark/blob/3a4afce96c6840431ed45280742f9e969be19639/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L95


How to Load a Graphx Graph from a parquet file?

2019-08-29 Thread Alexander Czech
 Hey all,
I want to load a parquet containing my edges into an Graph my code so far
looks like this:

val edgesDF = spark.read.parquet("/path/to/edges/parquet/")
val edgesRDD = edgesDF.rdd
val graph = Graph.fromEdgeTuples(edgesRDD, 1)

But simply this produces an error:

[error]  found   :
org.apache.spark.rdd.RDD[org.apache.spark.sql.Row][error]  required:
org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId,
org.apache.spark.graphx.VertexId)][error] (which expands to)
org.apache.spark.rdd.RDD[(Long, Long)][error] Error occurred in an
application involving default arguments.[error] val graph =
Graph.fromEdgeTuples(edgesRDD, 1)

I tried to declare the edgesRDD like the following code but this just
moves the error by doing this:
val edgesDF = spark.read.parquet("/path/to/edges/parquet/")val
edgesRDD : RDD[(Long,Long)] = edgesDF.rdd
val graph = Graph.fromEdgeTuples(edgesRDD, 1)
[error] 
/home/alex/ownCloud/JupyterNotebooks/Diss_scripte/Webgraph_analysis/pagerankscala/src/main/scala/pagerank.scala:17:44:
type mismatch;
[error]  found   : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error]  required: org.apache.spark.rdd.RDD[(Long, Long)]
[error] val edgesRDD : RDD[(Long,Long)] = edgesDF.rdd

So I guess I have to transform
org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] into
 org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId,
org.apache.spark.graphx.VertexId)](which expands to)
org.apache.spark.rdd.RDD[(Long, Long)]

how can I achieve this ?