Re: [pyspark 2.4+] BucketBy SortBy doesn't retain sort order

2020-03-03 Thread Rishi Shah
Hi All,

Just checking in to see if anyone has any advice on this.

Thanks,
Rishi

On Mon, Mar 2, 2020 at 9:21 PM Rishi Shah  wrote:

> Hi All,
>
> I have 2 large tables (~1TB), I used the following to save both the
> tables. Then when I try to join both tables with join_column, it still does
> shuffle & sort before the join. Could someone please help?
>
> df.repartition(2000).write.bucketBy(1,
> join_column).sortBy(join_column).saveAsTable(tablename)
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Regards,

Rishi Shah


Stateful Spark Streaming: Required attribute 'value' not found

2020-03-03 Thread Something Something
In a Stateful Spark Streaming application I am writing the 'OutputRow' in
the 'updateAcrossEvents' but I keep getting this error (*Required attribute
'value' not found*) while it's trying to write to Kafka. I know from the
documentation that 'value' attribute needs to be set but how do I do that
in the 'Stateful Structured Streaming'? Where & how do I add this 'value'
attribute in the following code? *Note: I am using Spark 2.3.1*

withEventTime
  .as[R00tJsonObject]
  .withWatermark("event_time", "5 minutes")
  .groupByKey(row => (row.value.Id, row.value.time.toString, row.value.cId))
  
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "myTopic")
  .option("checkpointLocation", "/Users/username/checkpointLocation")
  .outputMode("update")
  .start()
  .awaitTermination()


Example of Stateful Spark Structured Streaming with Kafka

2020-03-03 Thread Something Something
There are lots of examples on 'Stateful Structured Streaming' in 'The
Definitive Guide' book BUT all of them read JSON from a 'path'. That's
working for me.

Now I need to read from Kafka.

I Googled but I couldn't find any example. I am struggling to Map the
'Value' of the Kafka message to my JSON. Any help would be appreciated.
Here's what I am trying:

val query = withEventTime
  .as[R00tJsonObject]
  .withWatermark("event_time", "5 minutes")
  .groupByKey(row => (row.report.id, row.report.time.toString,
row.report.cId))
  
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", *"myTopic"*)
  .option("checkpointLocation", "/Users/username/checkpointLocation")
  .outputMode("update")
  .start().awaitTermination


cannot resolve 'arrivalTime' given input columns: [value, event_time];


Re: How to collect Spark dataframe write metrics

2020-03-03 Thread Zohar Stiro
Hi,

to get DataFrame level write metrics you can take a look at the following
trait :
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala
and a basic implementation example:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala


and here is an example of how it is being used in FileStreamSink:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L178

- about the good practise - it depends on your use case but Generally
speaking I would not do it - at least not for checking your logic/ checking
spark is working correctly.

‫בתאריך יום א׳, 1 במרץ 2020 ב-14:32 מאת ‪Manjunath Shetty H‬‏ <‪
manjunathshe...@live.com‬‏>:‬

> Hi all,
>
> Basically my use case is to validate the DataFrame rows count before and
> after writing to HDFS. Is this even to good practice ? Or Should relay on
> spark for guaranteed writes ?.
>
> If it is a good practice to follow then how to get the DataFrame level
> write metrics ?
>
> Any pointers would be helpful.
>
>
> Thanks and Regards
> Manjunath
>