[Spark SQL] issue about diffrence in memory size between DataFrame and RDD

2020-04-19 Thread Lyx
Hello,

 I'm using Spark to deal with my project these days, however i 
noticed that when load data

stored in Hadoop hdfs, it seems that there is a huge difference in JVM memory 
size between using DataFrame

and using RDD format.Below lists my shell script when using spark-shell, 
my original files(testData) are just ordinary text files 

which is about 11GB when stored in hard disk,each line has the format of 
"Id1,Id2" where both Id1 and Id2 are some random numbers of int32.

/* code segment 

import java.io.DataOutputStream
import java.util
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import scala.collection.mutable.ArrayBuffer

// this text file's size is 11GB in hard disk
var filePath = "hdfs://10.10.23.105:9000/testData"


val fields = Array.range(0, 2).map(i = StructField(s"col$i", IntegerType))
val schema: StructType = new StructType(fields)

val df: Dataset[Row] = spark.read.format("csv").schema(schema).load(filePath)

// the fisrt dataframe which turn out to be 5.5GB in memory
df.cache()
df.count()

// the second datafame which turn out to be 95GB in memory
df.rdd.cache()
df.rdd.count()

// the third rdd format which turn out to be 88GB in memory
val pureRDD= spark.sparkContext.textFile(filePath)
pureRDD.cache()
pureRDD.count()

//the line below gose wrong when i using collect() even driver has 200GB and 
executor have 300GB memory allocated
df.collect()

*/




 So here I encountered 2 problems:

Q1: I loaded and cached the very identical raw file into 3 types format 
respectivelyas showed 
above:DataFrame,DataFrame.rdd,RDD. Then I founded that 
DataFrame used just 5.5GB in my JVM , however df.rdd used nearly 95GB and RDD 
used about 69GB .So I'am wondering why RDD or DataFrame.rdd will take so much 
memory space even the original files are very small?




Q2: And I also noticed that when i called df.collect(),it will keep blocking 
without exeption or further information, while using RDD.collect() won't cause 
this problem and can return the result successfully.

(P.S. my driver is allocated 200GB alone with a 300GB executor in JVM heap, 
which is sufficient enough for such a collect action.)

 

 Hoping your attention and help

 Best regards with thanks!




 




Department of Engineering Mechanics


Zhejiang University


Hangzhou 310027, P.R. China


Mobile: (+86)15158859317


E-mail: lyx_z...@zju.edu.cn



发自我的iPhone

[Spark SQL] [Beginner] Dataset[Row] collect to driver throw java.io.EOFException: Premature EOF: no length prefix available

2020-04-19 Thread maqy1...@outlook.com
Hi all,
 I get a Dataset[Row] through the following code:

val df: Dataset[Row] = 
spark.read.format("csv).schema(schema).load("hdfs://master:9000/mydata")

 After that I want to collect it to the driver:

val df_rows: Array[Row] = df.collect()

 The Spark web ui shows that all tasks have run successfully, but the 
application did not stop. After more than ten minutes, an error will be 
generated in the shell:

java.io.EOFException: Premature EOF: no length prefix available
 
Environment:
Spark 2.4.3
Hadoop 2.7.7
Total rows of data about 800,000,000, 12GB

More detailed information can be seen here:
https://stackoverflow.com/questions/61202566/spark-sql-datasetrow-collect-to-driver-throw-java-io-eofexception-premature-e
Does anyone know the reason?

Best regards,
maqy



Re: Understanding spark structured streaming checkpointing system

2020-04-19 Thread Ruijing Li
It’s not intermittent, seems to happen everytime spark fails when it starts
up from last checkpoint and complains the offset is old. I checked the
offset and it is indeed true the offset expired from kafka side. My version
of spark is 2.4.4 using kafka 0.10

On Sun, Apr 19, 2020 at 3:38 PM Jungtaek Lim 
wrote:

> That sounds odd. Is it intermittent, or always reproducible if you starts
> with same checkpoint? What's the version of Spark?
>
> On Fri, Apr 17, 2020 at 6:17 AM Ruijing Li  wrote:
>
>> Hi all,
>>
>> I have a question on how structured streaming does checkpointing. I’m
>> noticing that spark is not reading from the max / latest offset it’s seen.
>> For example, in HDFS, I see it stored offset file 30 which contains
>> partition: offset {1: 2000}
>>
>> But instead after stopping the job and restarting it, I see it instead
>> reads from offset file 9 which contains {1:1000}
>>
>> Can someone explain why spark doesn’t take the max offset?
>>
>> Thanks.
>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Re: [Structured Streaming] Checkpoint file compact file grows big

2020-04-19 Thread Jungtaek Lim
Deleting the latest .compact file would lose the ability for exactly-once
and lead Spark fail to read from the output directory. If you're reading
the output directory from non-Spark then metadata on output directory
doesn't matter, but there's no exactly-once (exactly-once is achieved
leveraging the metadata, which only Spark can read).

Btw, what you've encountered is the one of known issues on file stream sink
- there're two different JIRA issues filed for the same issue so far
(reported from end users):

https://issues.apache.org/jira/browse/SPARK-24295
https://issues.apache.org/jira/browse/SPARK-29995

I've proposed the retention of output files in file stream sink but haven't
got some love. (That means it's not guaranteed to be addressed)

https://issues.apache.org/jira/browse/SPARK-27188

Given the patch is stale, I'm planning to rework based on latest master
again sooner.

Btw, I've also proposed other improvements to help addressing latency
issues in file stream source & file stream sink but haven't got some love
from committers as well (no guarantee to be addressed)

https://issues.apache.org/jira/browse/SPARK-30804
https://issues.apache.org/jira/browse/SPARK-30866
https://issues.apache.org/jira/browse/SPARK-30900
https://issues.apache.org/jira/browse/SPARK-30915
https://issues.apache.org/jira/browse/SPARK-30946

SPARK-30946 is closely related to the issue - it will help the size of
checkpoint file much smaller and also much shorter elapsed time to compact.
Efficiency would depend on compression ratio, but it could achieve 5 times
faster to compact and 80% smaller (1/5 of original) which would delay the
point of time greatly even without TTL. Say, if you reached the bad state
in 2 weeks, the patch would make it delayed by 8 weeks more (10 weeks to
reach the bad state).

That said, it doesn't completely get rid of necessity of TTL, but open the
chance to have longer TTL without encountering bad state.

If you're adventurous you can apply these patches on your version of Spark
and see whether it helps.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Thu, Apr 16, 2020 at 9:24 AM Ahn, Daniel 
wrote:

> Are Spark Structured Streaming checkpoint files expected to grow over time
> indefinitely? Is there a recommended way to safely age-off old checkpoint
> data?
>
>
>
> Currently we have a Spark Structured Streaming process reading from Kafka
> and writing to an HDFS sink, with checkpointing enabled and writing to a
> location on HDFS. This streaming application has been running for 4 months
> and over time we have noticed that with every 10th job within the
> application there is about a 5 minute delay between when a job finishes and
> the next job starts which we have attributed to the checkpoint compaction
> process. At this point the .compact file that is written is about 2GB in
> size and the contents of the file show it keeps track of files it processed
> at the very origin of the streaming application.
>
>
>
> This issue can be reproduced with any Spark Structured Streaming process
> that writes checkpoint files.
>
>
>
> Is the best approach for handling the growth of these files to simply
> delete the latest .compact file within the checkpoint directory, and are
> there any associated risks with doing so?
>
>
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>


Re: Using startingOffsets latest - no data from structured streaming kafka query

2020-04-19 Thread Jungtaek Lim
Did you provide more records to topic "after" you started the query? That's
the only one I can imagine based on such information.

On Fri, Apr 17, 2020 at 9:13 AM Ruijing Li  wrote:

> Hi all,
>
> Apologies if this has been asked before, but I could not find the answer
> to this question. We have a structured streaming job, but for some reason,
> if we use startingOffsets = latest with foreachbatch mode, it doesn’t
> produce any data.
>
> Rather, in logs I see it repeats the message “ Fetcher [Consumer]
> Resetting offset for partition to offset” over and over again..
>
> However with startingOffsets=earliest, we don’t get this issue. I’m
> wondering then how we can use startingOffsets=latest as I wish to start
> from the latest offset available.
> --
> Cheers,
> Ruijing Li
>


Re: Spark structured streaming - Fallback to earliest offset

2020-04-19 Thread Jungtaek Lim
You may want to check "where" the job is stuck via taking thread dump - it
could be in kafka consumer, in Spark codebase, etc. Without the information
it's hard to say.

On Thu, Apr 16, 2020 at 4:22 PM Ruijing Li  wrote:

> Thanks Jungtaek, that makes sense.
>
> I tried Burak’s solution of just turning failOnDataLoss to be false, but
> instead of failing, the job is stuck. I’m guessing that the offsets are
> being deleted faster than the job can process them and it will be stuck
> unless I increase resources? Or does once the exception happen, spark will
> hang?
>
> On Tue, Apr 14, 2020 at 10:48 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> I think Spark is trying to ensure that it reads the input "continuously"
>> without any missing. Technically it may be valid to say the situation is a
>> kind of "data-loss", as the query couldn't process the offsets which are
>> being thrown out, and owner of the query needs to be careful as it affects
>> the result.
>>
>> If your streaming query keeps up with input rate then it's pretty rare
>> for the query to go under retention. Even it lags a bit, it'd be safe if
>> retention is set to enough period. The ideal state would be ensuring your
>> query to process all offsets before they are thrown out by retention (don't
>> leave the query lagging behind - either increasing processing power or
>> increasing retention duration, though most probably you'll need to do
>> former), but if you can't make sure and if you understand the risk then yes
>> you can turn off the option and take the risk.
>>
>>
>> On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li  wrote:
>>
>>> I see, I wasn’t sure if that would work as expected. The docs seems to
>>> suggest to be careful before turning off that option, and I’m not sure why
>>> failOnDataLoss is true by default.
>>>
>>> On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:
>>>
 Just set `failOnDataLoss=false` as an option in readStream?

 On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li 
 wrote:

> Hi all,
>
> I have a spark structured streaming app that is consuming from a kafka
> topic with retention set up. Sometimes I face an issue where my query has
> not finished processing a message but the retention kicks in and deletes
> the offset, which since I use the default setting of “failOnDataLoss=true”
> causes my query to fail. The solution I currently have is manual, deleting
> the offsets directory and rerunning.
>
> I instead like to have spark automatically fall back to the earliest
> offset available. The solutions I saw recommend setting auto.offset =
> earliest, but for structured streaming, you cannot set that. How do I do
> this for structured streaming?
>
> Thanks!
> --
> Cheers,
> Ruijing Li
>
 --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>


Re: Understanding spark structured streaming checkpointing system

2020-04-19 Thread Jungtaek Lim
That sounds odd. Is it intermittent, or always reproducible if you starts
with same checkpoint? What's the version of Spark?

On Fri, Apr 17, 2020 at 6:17 AM Ruijing Li  wrote:

> Hi all,
>
> I have a question on how structured streaming does checkpointing. I’m
> noticing that spark is not reading from the max / latest offset it’s seen.
> For example, in HDFS, I see it stored offset file 30 which contains
> partition: offset {1: 2000}
>
> But instead after stopping the job and restarting it, I see it instead
> reads from offset file 9 which contains {1:1000}
>
> Can someone explain why spark doesn’t take the max offset?
>
> Thanks.
> --
> Cheers,
> Ruijing Li
>


Re: How to pass a constant value to a partitioned hive table in spark

2020-04-19 Thread Mich Talebzadeh
Many thanks Ayan.

I tried that as well as follows:

val broadcastValue = "123456789"  // I assume this will be sent as a
constant for the batch
val df = spark.read.
format("com.databricks.spark.xml").
option("rootTag", "hierarchy").
option("rowTag", "sms_request").
load("/tmp/broadcast.xml")

*val newDF = df.withColumn("broadcastId", lit(broadcastValue))*
So this column broadcastId  is a static partition in Hive table whereas the
other column brand is considered a dynamic partition

newDF.createOrReplaceTempView("tmp")
// Need to create and populate target Parquet table
michtest.BroadcastStaging
//
HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")

  var sqltext = """
  CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
 partyId STRING
   , phoneNumber STRING
  )
  PARTITIONED BY (
 broadcastId STRING
   , brand STRING
)
  STORED AS PARQUET
  """
  HiveContext.sql(sqltext)
  //
  // Put data in Hive table
  //
 // Dynamic partitioning is disabled by default. We turn it on
 //spark.sql("SET hive.exec.dynamic.partition = true")
 spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
 // spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")

  sqltext = """
  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
broadcastValue, brand)
  SELECT
  ocis_party_id AS partyId
, target_mobile_no AS phoneNumber
, brand
  FROM tmp
  """

org.apache.spark.sql.catalyst.parser.ParseException:
missing STRING at ','(line 2, pos 85)

== SQL ==

  INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
broadcastValue, brand)
-^^^
  SELECT
  ocis_party_id AS partyId
, target_mobile_no AS phoneNumber
   , brand
  FROM tmp

The thing is that if I pass (broadcastId = "123456789", brand) it works
with no problem!

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 16 Apr 2020 at 13:05, ayan guha  wrote:

> Hi Mitch
>
> Add it in the DF first
>
> from pyspark.sql.functions import lit
>
> df = df.withColumn('broadcastId, lit(broadcastValue))
>
> Then you will be able to access the column in the temp view
>
> Re: Partitioning, DataFrame.write also supports partitionBy clause and you
> can use it along with saveAsTable.
>
>
> On Thu, Apr 16, 2020 at 9:47 PM Mich Talebzadeh 
> wrote:
>
>> Thanks Zhang,
>>
>> That is not working. I need to send the value for variable
>> broadcastValue, it cannot interpret it.
>>
>>  scala>   sqltext = """
>>  |   INSERT INTO TABLE michtest.BroadcastStaging PARTITION
>> (broadcastId = broadcastValue, brand = "dummy")
>>  |   SELECT
>>  |   ocis_party_id AS partyId
>>  | , target_mobile_no AS phoneNumber
>>  |   FROM tmp
>>  |   """
>> sqltext: String =
>>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
>> broadcastValue, brand = "dummy")
>>   SELECT
>>   ocis_party_id AS partyId
>> , target_mobile_no AS phoneNumber
>>   FROM tmp
>>
>>
>> *scala>   spark.sql(sqltext)*
>> org.apache.spark.sql.catalyst.parser.ParseException:
>> missing STRING at ','(line 2, pos 85)
>>
>> == SQL ==
>>
>>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
>> broadcastValue, brand = "dummy")
>>
>> -^^^
>>   SELECT
>>   ocis_party_id AS partyId
>> , target_mobile_no AS phoneNumber
>>   FROM tmp
>>
>>
>>   at
>> org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)
>>   at
>> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117)
>>   at
>> org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
>>   at
>> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69)
>>   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
>>   ... 55 elided
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss,