Current state of dataset api

2021-10-04 Thread Magnus Nilsson
Hi,

I tried using the (typed) Dataset API about three years ago. Then
there were limitations with predicate pushdown, overhead serialization
and maybe more things I've forgotten. Ultimately we chose the
Dataframe API as the sweet spot.

Does anyone know of a good overview of the current state of the
Dataset API, pros/cons as of Spark 3?

Is it fully usable, do you get the advantages of a strongly typed
dataframe? Any known limitations or drawbacks to take into account?

br,

Magnus

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Scala vs Python for ETL with Spark

2020-10-17 Thread Magnus Nilsson
Holy war is a bit dramatic don't you think?  The difference between Scala
and Python will always be very relevant when choosing between Spark and
Pyspark. I wouldn't call it irrelevant to the original question.

br,

molotch

On Sat, 17 Oct 2020 at 16:57, "Yuri Oleynikov (‫יורי אולייניקוב‬‎)" <
yur...@gmail.com> wrote:

> It seems that thread converted to holy war that has nothing to do with
> original question. If it is, it’s super disappointing
>
> Отправлено с iPhone
>
> > 17 окт. 2020 г., в 15:53, Molotch  написал(а):
> >
> > I would say the pros and cons of Python vs Scala is both down to Spark,
> the
> > languages in themselves and what kind of data engineer you will get when
> you
> > try to hire for the different solutions.
> >
> > With Pyspark you get less functionality and increased complexity with the
> > py4j java interop compared to vanilla Spark. Why would you want that?
> Maybe
> > you want the Python ML tools and have a clear use case, then go for it.
> If
> > not, avoid the increased complexity and reduced functionality of Pyspark.
> >
> > Python vs Scala? Idiomatic Python is a lesson in bad programming
> > habits/ideas, there's no other way to put it. Do you really want
> programmers
> > enjoying coding i such a language hacking away at your system?
> >
> > Scala might be far from perfect with the plethora of ways to express
> > yourself. But Python < 3.5 is not fit for anything except simple
> scripting
> > IMO.
> >
> > Doing exploratory data analysis in a Jupiter notebook, Pyspark seems
> like a
> > fine idea. Coding an entire ETL library including state management, the
> > whole kitchen including the sink, Scala everyday of the week.
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Scala vs Python for ETL with Spark

2020-10-17 Thread Magnus Nilsson
I'm sorry you were offended. I'm not an expert in Python and I wasn't
trying to attack you personally. It's just an opinion about what makes a
language better or worse, it's not the single source of truth. You don't
have to take offense. In the end its about context and what you're trying
to achieve under what circumstances.

I know a little about both programming and ETL. To say I know nothing is
taking it a bit far. I don't know everything worth to know, that's for sure
and goes without saying.

It's fine to love Python and good for you being able to write Python
programs wiping Java commercial stacks left and right. It's just my opinion
that mutable dynamically typed languages encourage/enforce bad habits.

The larger the application and team gets, the worse off you are (again just
an opinion). Not everyone agrees (just look at Pythons popularity) but it's
definitely a relevant aspect when deciding going Spark or Pyspark.


br,

molotch

On Sat, 17 Oct 2020 at 16:40, Sasha Kacanski  wrote:

> And you are an expert on python! Idiomatic...
> Please do everyone a favor and stop commenting on things you have no
> idea...
> I build ETL systems python that wiped java commercial stacks left and
> right. Pyspark was and is  and will be a second class citizen in spark
> world. That has nothing to do with python.
> And as far as scala is concerned good luck with it...
>
>
>
>
>
> On Sat, Oct 17, 2020, 8:53 AM Molotch  wrote:
>
>> I would say the pros and cons of Python vs Scala is both down to Spark,
>> the
>> languages in themselves and what kind of data engineer you will get when
>> you
>> try to hire for the different solutions.
>>
>> With Pyspark you get less functionality and increased complexity with the
>> py4j java interop compared to vanilla Spark. Why would you want that?
>> Maybe
>> you want the Python ML tools and have a clear use case, then go for it. If
>> not, avoid the increased complexity and reduced functionality of Pyspark.
>>
>> Python vs Scala? Idiomatic Python is a lesson in bad programming
>> habits/ideas, there's no other way to put it. Do you really want
>> programmers
>> enjoying coding i such a language hacking away at your system?
>>
>> Scala might be far from perfect with the plethora of ways to express
>> yourself. But Python < 3.5 is not fit for anything except simple scripting
>> IMO.
>>
>> Doing exploratory data analysis in a Jupiter notebook, Pyspark seems like
>> a
>> fine idea. Coding an entire ETL library including state management, the
>> whole kitchen including the sink, Scala everyday of the week.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Magnus Nilsson
Thank you, so that would mean spark gets the current latest offset(s) when
the trigger fires and then process all available messages in the topic upto
and including that offset as long as maxOffsetsPerTrigger is the default of
None (or large enought to handle all available messages).

I think the word micro-batch confused me (more like mega-batch in some
cases). It makes sense though, this makes Trigger.Once a fixed interval
trigger that's only fired once and not repeatedly.


On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim 
wrote:

> If I understand correctly, Trigger.once executes only one micro-batch and
> terminates, that's all. Your understanding of structured streaming applies
> there as well.
>
> It's like a hybrid approach as bringing incremental processing from
> micro-batch but having processing interval as batch. That said, while it
> enables to get both sides of benefits, it's basically structured streaming,
> inheriting all the limitations on the structured streaming, compared to the
> batch query.
>
> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
> Trigger.once will "ignore" the read limit per micro-batch on data source
> (like maxOffsetsPerTrigger) and process all available input as possible.
> (Data sources should migrate to the new API to take effect, but works for
> built-in data sources like file and Kafka.)
>
> 1. https://issues.apache.org/jira/browse/SPARK-30669
>
> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:
>
>> I've always had a question about Trigger.Once that I never got around to
>> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>>
>> Will Trigger.Once get the last offset(s) when it starts and then quit
>> once it hits this offset(s) or will the job run until no new messages is
>> added to the topic for a particular amount of time?
>>
>> br,
>>
>> Magnus
>>
>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>>
>>> Hi Rishi,
>>>
>>> That is exactly why Trigger.Once was created for Structured Streaming.
>>> The way we look at streaming is that it doesn't have to be always real
>>> time, or 24-7 always on. We see streaming as a workflow that you have to
>>> repeat indefinitely. See this blog post for more details!
>>>
>>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>>
>>> Best,
>>> Burak
>>>
>>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I recently started playing with spark streaming, and checkpoint
>>>> location feature looks very promising. I wonder if anyone has an opinion
>>>> about using spark streaming with checkpoint location option as a slow batch
>>>> processing solution. What would be the pros and cons of utilizing streaming
>>>> with checkpoint location feature to achieve fault tolerance in batch
>>>> processing application?
>>>>
>>>> --
>>>> Regards,
>>>>
>>>> Rishi Shah
>>>>
>>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Magnus Nilsson
I've always had a question about Trigger.Once that I never got around to
ask or test for myself. If you have a 24/7 stream to a Kafka topic.

Will Trigger.Once get the last offset(s) when it starts and then quit once
it hits this offset(s) or will the job run until no new messages is added
to the topic for a particular amount of time?

br,

Magnus

On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:

> Hi Rishi,
>
> That is exactly why Trigger.Once was created for Structured Streaming. The
> way we look at streaming is that it doesn't have to be always real time, or
> 24-7 always on. We see streaming as a workflow that you have to repeat
> indefinitely. See this blog post for more details!
>
> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>
> Best,
> Burak
>
> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I recently started playing with spark streaming, and checkpoint location
>> feature looks very promising. I wonder if anyone has an opinion about using
>> spark streaming with checkpoint location option as a slow batch processing
>> solution. What would be the pros and cons of utilizing streaming with
>> checkpoint location feature to achieve fault tolerance in batch processing
>> application?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>


Re: Unablee to get to_timestamp with Timezone Information

2020-03-31 Thread Magnus Nilsson
And to answer your question (sorry, read too fast). The string is not in
proper ISO8601. Extended form must be used throughout, ie
2020-04-11T20:40:00-05:00, there's a colon (:) lacking in the UTC offset
info.

br,

Magnus

On Tue, Mar 31, 2020 at 7:11 PM Magnus Nilsson  wrote:

> Timestamps aren't timezoned. If you parse ISO8601 strings they will be
> converted to UTC automatically.
>
> If you parse timestamps without timezone they will converted to the the
> timezone the server Spark is running on uses. You can change the timezone
> Spark uses with spark.conf.set("spark.sql.session.timeZone", "UTC").
> Timestamps represent a point in time, the clock representation of that
> instant is dependent on sparks timezone settings both for parsing (non
> ISO8601) strings and showing timestamps.
>
> br,
>
> Magnus
>
> On Tue, Mar 31, 2020 at 6:14 PM Chetan Khatri 
> wrote:
>
>> Hi Spark Users,
>>
>> I am losing the timezone value from below format, I tried couple of
>> formats but not able to make it. Can someone throw lights?
>>
>> scala> val sampleDF = Seq("2020-04-11T20:40:00-0500").toDF("value")
>> sampleDF: org.apache.spark.sql.DataFrame = [value: string]
>>
>> scala> sampleDF.select('value, to_timestamp('value,
>> "-MM-dd\'T\'HH:mm:ss")).show(false)
>>
>> +++
>> |value   |to_timestamp(`value`,
>> '-MM-dd\'T\'HH:mm:ss')|
>>
>> +++
>> |2020-04-11T20:40:00-0500|2020-04-11 20:40:00
>> |
>>
>> +++
>>
>> Thanks
>>
>


Re: Unablee to get to_timestamp with Timezone Information

2020-03-31 Thread Magnus Nilsson
Timestamps aren't timezoned. If you parse ISO8601 strings they will be
converted to UTC automatically.

If you parse timestamps without timezone they will converted to the the
timezone the server Spark is running on uses. You can change the timezone
Spark uses with spark.conf.set("spark.sql.session.timeZone", "UTC").
Timestamps represent a point in time, the clock representation of that
instant is dependent on sparks timezone settings both for parsing (non
ISO8601) strings and showing timestamps.

br,

Magnus

On Tue, Mar 31, 2020 at 6:14 PM Chetan Khatri 
wrote:

> Hi Spark Users,
>
> I am losing the timezone value from below format, I tried couple of
> formats but not able to make it. Can someone throw lights?
>
> scala> val sampleDF = Seq("2020-04-11T20:40:00-0500").toDF("value")
> sampleDF: org.apache.spark.sql.DataFrame = [value: string]
>
> scala> sampleDF.select('value, to_timestamp('value,
> "-MM-dd\'T\'HH:mm:ss")).show(false)
> +++
> |value   |to_timestamp(`value`, '-MM-dd\'T\'HH:mm:ss')|
> +++
> |2020-04-11T20:40:00-0500|2020-04-11 20:40:00 |
> +++
>
> Thanks
>


Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread Magnus Nilsson
Been a while but I remember reading on Stack Overflow you can use a UDF as
a join condition to trick catalyst into not reshuffling the partitions, ie
use regular equality on the column you partitioned or bucketed by and your
custom comparer for the other columns. Never got around to try it out
hough. I really would like a native way to tell catalyst not to reshuffle
just because you use more columns in the join condition.

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H 
wrote:

> Hi All,
>
> We have 10 tables in data warehouse (hdfs/hive) written using ORC format.
> We are serving a usecase on top of that by joining 4-5 tables using Hive as
> of now. But it is not fast as we wanted it to be, so we are thinking of
> using spark for this use case.
>
> Any suggestion on this ? Is it good idea to use the Spark for this use
> case ? Can we get better performance by using spark ?
>
> Any pointers would be helpful.
>
> *Notes*:
>
>- Data is partitioned by date (MMdd) as integer.
>- Query will fetch data for last 7 days from some tables while joining
>with other tables.
>
>
> *Approach we thought of as now :*
>
>- Create dataframe for each table and partition by same column for all
>tables ( Lets say Country as partition column )
>- Register all tables as temporary tables
>- Run the sql query with joins
>
> But the problem we are seeing with this approach is , even though we
> already partitioned using country it still does hashParittioning +
> shuffle during join. All the table join contain `Country` column with some
> extra column based on the table.
>
> Is there any way to avoid these shuffles ? and improve performance ?
>
>
> Thanks and regards
> Manjunath
>


Re: Schema store for Parquet

2020-03-04 Thread Magnus Nilsson
Apache Atlas is the apache data catalog. Maybe want to look into that. It
depends on what your use case is.

On Wed, Mar 4, 2020 at 8:01 PM Ruijing Li  wrote:

> Thanks Lucas and Magnus,
>
> Would there be any open source solutions other than Apache Hive metastore,
> if we don’t wish to use Apache Hive and spark?
>
> Thanks.
>
> On Wed, Mar 4, 2020 at 10:40 AM lucas.g...@gmail.com 
> wrote:
>
>> Or AWS glue catalog if you're in AWS
>>
>> On Wed, 4 Mar 2020 at 10:35, Magnus Nilsson  wrote:
>>
>>> Google hive metastore.
>>>
>>> On Wed, Mar 4, 2020 at 7:29 PM Ruijing Li  wrote:
>>>
>>>> Hi all,
>>>>
>>>> Has anyone explored efforts to have a centralized storage of schemas of
>>>> different parquet files? I know there is schema management for Avro, but
>>>> couldn’t find solutions for parquet schema management. Thanks!
>>>> --
>>>> Cheers,
>>>> Ruijing Li
>>>>
>>> --
> Cheers,
> Ruijing Li
>


Re: Schema store for Parquet

2020-03-04 Thread Magnus Nilsson
Google hive metastore.

On Wed, Mar 4, 2020 at 7:29 PM Ruijing Li  wrote:

> Hi all,
>
> Has anyone explored efforts to have a centralized storage of schemas of
> different parquet files? I know there is schema management for Avro, but
> couldn’t find solutions for parquet schema management. Thanks!
> --
> Cheers,
> Ruijing Li
>


Re: Questions for platform to choose

2019-08-21 Thread Magnus Nilsson
Well, you are posting on the Spark mailing list. Though for streaming I'd
recommend Flink over Spark any day of the week. Flink was written as a
streaming platform from the beginning quickly aligning the API with the
theoretical framework of Google's Dataflow whitepaper. It's awesome for
streaming. Spark not so much so far. Might become better, though the inital
use case for Spark wasn't streaming, they might overcome that or not. I'd
still go with Flink for streaming.

If you need cross platform support you can take a look at Beam. Beam has
Dataflow, Spark and Flink runners among others.

Regards,

Magnus

On Wed, Aug 21, 2019 at 8:43 AM Eliza  wrote:

> Hello,
>
> We have all of spark, flink, storm, kafka installed.
> For realtime streaming calculation, which one is the best above?
> Like other big players, the logs in our stack are huge.
>
> Thanks.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


CPU:s per task

2019-07-17 Thread Magnus Nilsson
Hello all,

TLDR; Can the number of cores used by a task vary or is it always one core
per task? Is there a UI, metrics or logs I can check to see the number of
cores used by the task?

I have an ETL-pipeline where I do some transformations. In one of the
stages which ought to be quite CPU-heavy there's only a single task running
for a few minutes. I'm trying to determine if this means only one cpu core
is in use or if a single task could use many cores under the cover?

When I read data from an Event Hub the stage includes as many tasks as
there are partitions in the Event Hub up to the maximum nr of cores
available in the cluster. Clearly those tasks use one core each and are
limited in parallellism by the cluster size.

Regards,

Magnus


Re: Spark structural streaming sinks output late

2019-07-10 Thread Magnus Nilsson
Well, you should get updates every 10 seconds as long as there are events
surviving your quite aggressive watermark condition. Spark will try to drop
(not guaranteed) all events with a timestamp more than 500 milliseconds
before the current watermark timestamp. Try to increase the watermark
timespan and collect the max("timestamp") besides count on every trigger to
see what's going on in your stream. Could be that you have one producer out
of sync (clock sync) adding one message every two minutes. That will make
you drop all the other messages when you run with such a low watermark
tolerance.

Regards,

Magnus



On Wed, Jul 10, 2019 at 9:20 AM Kamalanathan Venkatesan <
kamalanatha...@in.ey.com> wrote:

> Hello,
>
>
>
> Any observations on what am I doing wrong?
>
>
>
> Thanks,
>
> -Kamal
>
>
>
> *From:* Kamalanathan Venkatesan
> *Sent:* Tuesday, July 09, 2019 7:25 PM
> *To:* 'user@spark.apache.org' 
> *Subject:* Spark structural streaming sinks output late
>
>
>
> Hello,
>
>
>
> I have below spark structural streaming code and I was expecting the
> results to be printed on the console every 10 seconds. But, I notice the
> sink to console happening at every ~2 mins and above.
>
> May I know what am I doing wrong?
>
>
>
> *def* streaming(): Unit = {
>
> System.setProperty("hadoop.home.dir", "/Documents/ ")
>
> *val* conf: SparkConf = *new* SparkConf().setAppName("Histogram").
> setMaster("local[8]")
>
> conf.set("spark.eventLog.enabled", "false");
>
> *val* sc: SparkContext = *new* SparkContext(conf)
>
> *val* sqlcontext = *new* SQLContext(sc)
>
> *val* spark = SparkSession.builder().config(conf).getOrCreate()
>
>
>
> *import* sqlcontext.implicits._
>
> *import* org.apache.spark.sql.functions.window
>
>
>
> *val* inputDf = spark.readStream.format("kafka")
>
>   .option("kafka.bootstrap.servers", "localhost:9092")
>
>   .option("subscribe", "wonderful")
>
>   .option("startingOffsets", "latest")
>
>   .load()
>
> *import* scala.concurrent.duration._
>
>
>
> *val* personJsonDf = inputDf.selectExpr("CAST(key AS STRING)", "CAST(value
> AS STRING)", "timestamp")
>
>   .withWatermark("timestamp", "500 milliseconds")
>
>   .groupBy(
>
> window(*$**"timestamp"*, "10 seconds")).count()
>
>
>
> *val* consoleOutput = personJsonDf.writeStream
>
>   .outputMode("complete")
>
>   .format("console")
>
>   .option("truncate", "false")
>
>   .outputMode(OutputMode.Update())
>
>   .start()
>
> consoleOutput.awaitTermination()
>
>   }
>
>
>
> *object* SparkExecutor {
>
>   *val* spE: SparkExecutor = *new* SparkExecutor();
>
>   *def* main(args: Array[*String*]): Unit = {
>
> println("test")
>
> spE.streaming
>
>   }
>
> }
>
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you have received this communication in error, please notify
> us immediately by responding to this email and then delete it from your
> system. The firm is neither liable for the proper and complete transmission
> of the information contained in this communication nor for any delay in its
> receipt.
>


Re: Structured Streaming foreach function

2019-06-23 Thread Magnus Nilsson
Row is a generic ordered collection of fields that most likely contain a
Schema of StructType. You need to keep track of the datatypes of the fields
yourself.

If you want compile time safety of datatypes (and intellisense support) you
need to use RDD:s or the Dataset[T] api. Dataset[T] might incur overhead
and break partition filtering pushdown etc. if you don't take care but it
will give you compile time errors. You still need to make sure the real
underlying data types conform to the schema when you cast the Dataframe
though. There's no Dataset api for Python though.

https://spark.apache.org/docs/2.4.2/api/java/org/apache/spark/sql/Row.html

Basically you need to check the schema of your input and treat you columns
accordingly.

DataType reference.
http://spark.apache.org/docs/latest/sql-reference.html


On Sun, Jun 23, 2019 at 11:15 AM RanXin  wrote:

> I use spark 2.4.3, python to build a structured streaming. May I know the
> data type of the parameter "row" in process_row function? The following
> codes is how the official programming guide instruct us to deal with
> foreach
> function:
> def process_row(row):
>   # Write row to storage
>   pass
>
> query = streamingDF.writeStream.foreach(process_row).start()
>
> Thanks a lot.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Magnus Nilsson
Well, you could do a repartition on cityname/nrOfCities and use the
spark_partition_id function or the mappartitionswithindex dataframe method
to add a city Id column. Then just split the dataframe into two subsets. Be
careful of hashcollisions on the reparition Key though, or more than one
city might end up in the same partition (you can use a custom partitioner).

It all depends on what kind of Id you want/need for the city value. I.e.
will you later need to append new city Id:s or not. Do you always handle
the entire dataset when you make this change or not.

On the other hand, getting a distinct list of citynames is a non shuffling
fast operation, add a row_number column and do a broadcast join with the
original dataset and then split into two subsets. Probably a bit faster
than reshuffling the entire dataframe. As always the proof is in the
pudding.

//Magnus

On Thu, Jun 6, 2019 at 2:53 PM Marcelo Valle 
wrote:

> Akshay,
>
> First of all, thanks for the answer. I *am* using monotonically increasing
> id, but that's not my problem.
> My problem is I want to output 2 tables from 1 data frame, 1 parent table
> with ID for the group by and 1 child table with the parent id without the
> group by.
>
> I was able to solve this problem by grouping by, generating a parent data
> frame with an id, then joining the parent dataframe with the original one
> to get a child dataframe with a parent id.
>
> I would like to find a solution without this second join, though.
>
> Thanks,
> Marcelo.
>
>
> On Thu, 6 Jun 2019 at 10:49, Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
>> Hi Marcelo,
>>
>> If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
>> inbuilt function "monotonically_increasing_id" in Spark.
>> A little tweaking using Spark sql inbuilt functions can enable you to
>> achieve this without having to write code or define RDDs with map/reduce
>> functions.
>>
>> Akshay Bhardwaj
>> +91-97111-33849
>>
>>
>> On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
>> wrote:
>>
>>> Hi all,
>>>
>>> I am new to spark and I am trying to write an application using
>>> dataframes that normalize data.
>>>
>>> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
>>> CITY, CITY_NICKNAME
>>>
>>> Here is what I want to do:
>>>
>>>
>>>1. Map by country, then for each country generate a new ID and write
>>>to a new dataframe `countries`, which would have COUNTRY_ID, COUNTRY -
>>>country ID would be generated, probably using 
>>> `monotonically_increasing_id`.
>>>2. For each country, write several lines on a new dataframe
>>>`cities`, which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. 
>>> COUNTRY_ID
>>>would be the same generated on country table and ID would be another ID I
>>>generate.
>>>
>>> What's the best way to do this, hopefully using only dataframes (no low
>>> level RDDs) unless it's not possible?
>>>
>>> I clearly see a MAP/Reduce process where for each KEY mapped I generate
>>> a row in countries table with COUNTRY_ID and for every value I write a row
>>> in cities table. But how to implement this in an easy and efficient way?
>>>
>>> I thought about using a `GroupBy Country` and then using `collect` to
>>> collect all values for that country, but then I don't know how to generate
>>> the country id and I am not sure about memory efficiency of `collect` for a
>>> country with too many cities (bare in mind country/city is just an example,
>>> my real entities are different).
>>>
>>> Could anyone point me to the direction of a good solution?
>>>
>>> Thanks,
>>> Marcelo.
>>>
>>> This email is confidential [and may be protected by legal privilege]. If
>>> you are not the intended recipient, please do not copy or disclose its
>>> content but contact the sender immediately upon receipt.
>>>
>>> KTech Services Ltd is registered in England as company number 10704940.
>>>
>>> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
>>> United Kingdom
>>>
>>
> This email is confidential [and may be protected by legal privilege]. If
> you are not the intended recipient, please do not copy or disclose its
> content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
> United Kingdom
>


Re: Upsert for hive tables

2019-05-30 Thread Magnus Nilsson
Since parquet don't support updates you have to backfill your dataset. If
that is your regular scenario you should partition your parquet files so
backfilling becomes easier.

As the data is structured now you have to update everything just to upsert
quite a small amount of changed data. Look at your data, look at your use
case and use partitioning (and bucketing if you want to eliminate/reduce
shuffle joins) to store your data in a more optimal way.

Lets say your large table is a timeline of events stretching three years
back but your updated data is only from the last week or month. If you'd
partition by year/month/week/day you could just backfill the partitions
that was updated. Adapt the pattern to your particular scenario and data
size.

If everything is random and no sure way to decide what partition updated
will happen in you could just break down your dataset by key %
(suitable_partition_size/assumed_total_size_of_dataset). There are alot of
partitioning schemes but the point is you have to limit the amount of data
to read from disk, filter and write back to get better performance.

regards,

Magnus

On Wed, May 29, 2019 at 7:20 PM Tomasz Krol  wrote:

> Hey Guys,
>
> I am wondering what would be your approach to following scenario:
>
> I have two tables - one (Table A) is relatively small (e.g 50GB) and
> second one (Table B) much bigger (e.g. 3TB). Both are parquet tables.
>
>  I want to ADD all records from Table A to Table B which dont exist in
> Table B yet. I use only one field (e.g. key) to check existence for
> specific record.
>
> Then I want to UPDATE (by values from Table A) all records in Table B
> which also exist in Table A. To determine if specific record exist I use
> also the same "key" field.
>
> To achieve above I run following sql queries:
>
> 1. Find existing records and insert into temp table
>
> insert into temp_table select a.cols from Table A a left semi join Table B
> b on a.key = b.key
>
> 2. Find new records and insert them into temp table
>
> insert into temp_table select a.cols from Table A a left anti join Table B
> b on a.key = b.key
>
> 3. Find existing records in Table B which dont exist in   Table A
>
> insert into temp_table select b.cols from Table B b left anti join Table A
> a a.key = b. key
>
> In that way I built Table B updated with records from Table A.
> However, the problem here is the step 3, because I am inserting almost 3
> TB of data that takes obviously some time.
> I was trying different approaches but no luck.
>
> I am wondering whats your ideas how can we perform this scenario
> efficiently in Spark?
>
> Cheers
>
> Tom
> --
> Tomasz Krol
> patric...@gmail.com
>


Logging DataFrame API pipelines

2019-04-02 Thread Magnus Nilsson
Hello all,

How do you log what is happening inside your Spark Dataframe pipelines?

I would like to collect statistics along the way, mostly count of rows at
particular steps, to see where rows where filtered and what not. Is there
any other way to do this than calling .count on the dataframe?

 Regards,

Magnus


Re: Windowing LAG function Usage in Spark2.2 Dataset scala

2019-03-14 Thread Magnus Nilsson
import org.apache.spark.sql.expressions.Window

val partitionBy = Window.partitionBy("name", "sit").orderBy("data_date")

val newDf = df.withColumn("PreviousDate", lag("uniq_im",
1).over(partitionBy))

Cheers...

On Thu, Mar 14, 2019 at 4:55 AM anbu  wrote:

> Hi,
>
> To calculate LAG functions difference for the two data_date(current date
> and
> previous date) on the same column
> Could you please help me to implement the below scenario using scala spark
> Dataset.
> uniq_im - LAG(uniq_im,1,0) OVER PARTITION BY(name,sit,plc,country,state)
> order by (data_date) as calc_value.
>
> could you please help me how to implement using spark scala Dataset.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How can I parse an "unnamed" json array present in a column?

2019-02-24 Thread Magnus Nilsson
Well, I'm guessing the file is small enough so you don't have any memory
issues.

If you're using spark to read the file use the spark.sql.functions.concat
function. If you use Scala use the string method concat.

val prepend = """{"next":"""
val append = "}"

df.select(concat(prepend, $"rawStringFromFileColumn", append) as "values")
or
val df =
Seq(prepend.concat(rawStringFromFile).concat(append)).toDF("values")

val df2 = df.select(from_json($"values", schema))

Haven't tried it, might be a comma wrong somewhere.

Extracting the function, compiling it and using in your own library you
mean? I wouldn't even bother looking into it to be honest. Might work,
might not. Might have to pull in half of spark to compile it, might not. Is
it really worth your time investigating when the workaround is so simple
and you know it will be fixed once you upgrade to a newer Spark version?


On Sun, Feb 24, 2019 at 10:17 PM  wrote:

> Unfortunately , I can’t change the source system , so changing the JSON at
> runtime is the best I can do right now.
>
>
>
> Is there any preferred way to modify the String other than an UDF or map
> on the string?
>
>
>
> At the moment I am modifying it returning a generic type “t” so I can use
> the same UDF  for many different JSONs that have the same issue.
>
>
>
> Also , is there any advantage(if possible) to extract the function from
> the original source code and run it on an older version of Spark?
>
>
>
>
>
> *From:* Magnus Nilsson 
> *Sent:* Sunday, February 24, 2019 5:34 AM
> *To:* Yeikel 
> *Cc:* user@spark.apache.org
> *Subject:* Re: How can I parse an "unnamed" json array present in a
> column?
>
>
>
> That's a bummer, if you're unable to upgrade to Spark 2.3+ your best bet
> is probably to prepend/append the jsonarray-string and locate the json
> array as the value of a root attribute in a json-document (as in your first
> work around). I mean, it's such an easy and safe fix, you can still do it
> even if you stream the file.
>
>
>
> Even better, make the source system create a JSON-lines file instead of an
> json array if possible.
>
>
>
> When I use Datasets (Tungsten) I basically try to stay there and use the
> available column functions unless I have no choice but to serialize and run
> custom advanced calculations/parsings. In your case just modifying the
> string and use the tested from_json function beats the available
> alternatives if you ask me.
>
>
>
>
>
> On Sun, Feb 24, 2019 at 1:13 AM  wrote:
>
> What you suggested works in Spark 2.3 , but in the version that I am using
> (2.1) it produces the following exception :
>
>
>
> found   : org.apache.spark.sql.types.ArrayType
>
> required: org.apache.spark.sql.types.StructType
>
>ds.select(from_json($"news", schema) as "news_parsed").show(false)
>
>
>
> Is it viable/possible to export a function from 2.3 to 2.1?  What other
> options do I have?
>
>
>
> Thank you.
>
>
>
>
>
> *From:* Magnus Nilsson 
> *Sent:* Saturday, February 23, 2019 3:43 PM
> *Cc:* user@spark.apache.org
> *Subject:* Re: How can I parse an "unnamed" json array present in a
> column?
>
>
>
> Use spark.sql.types.ArrayType instead of a Scala Array as the root type
> when you define the schema and it will work.
>
>
>
> Regards,
>
>
>
> Magnus
>
>
>
> On Fri, Feb 22, 2019 at 11:15 PM Yeikel  wrote:
>
> I have an "unnamed" json array stored in a *column*.
>
> The format is the following :
>
> column name : news
>
> Data :
>
> [
>   {
> "source": "source1",
> "name": "News site1"
>   },
>{
> "source": "source2",
> "name": "News site2"
>   }
> ]
>
>
> Ideally , I'd like to parse it as :
>
> news ARRAY>
>
> I've tried the following :
>
> import org.apache.spark.sql.Encoders
> import org.apache.spark.sql.types._;
>
> val entry = scala.io.Source.fromFile("1.txt").mkString
>
> val ds = Seq(entry).toDF("news")
>
> val schema = Array(new StructType().add("name", StringType).add("source",
> StringType))
>
> ds.select(from_json($"news", schema) as "news_parsed").show(false)
>
> But this is not allowed :
>
> found   : Array[org.apache.spark.sql.types.StructType]
> required: org.apache.spark.sql.types.StructType
>
>
> I also tried passing the following schema :
>
> val schema = StructType(new StructType().add("name

Re: How can I parse an "unnamed" json array present in a column?

2019-02-24 Thread Magnus Nilsson
That's a bummer, if you're unable to upgrade to Spark 2.3+ your best bet is
probably to prepend/append the jsonarray-string and locate the json array
as the value of a root attribute in a json-document (as in your first work
around). I mean, it's such an easy and safe fix, you can still do it even
if you stream the file.

Even better, make the source system create a JSON-lines file instead of an
json array if possible.

When I use Datasets (Tungsten) I basically try to stay there and use the
available column functions unless I have no choice but to serialize and run
custom advanced calculations/parsings. In your case just modifying the
string and use the tested from_json function beats the available
alternatives if you ask me.


On Sun, Feb 24, 2019 at 1:13 AM  wrote:

> What you suggested works in Spark 2.3 , but in the version that I am using
> (2.1) it produces the following exception :
>
>
>
> found   : org.apache.spark.sql.types.ArrayType
>
> required: org.apache.spark.sql.types.StructType
>
>ds.select(from_json($"news", schema) as "news_parsed").show(false)
>
>
>
> Is it viable/possible to export a function from 2.3 to 2.1?  What other
> options do I have?
>
>
>
> Thank you.
>
>
>
>
>
> *From:* Magnus Nilsson 
> *Sent:* Saturday, February 23, 2019 3:43 PM
> *Cc:* user@spark.apache.org
> *Subject:* Re: How can I parse an "unnamed" json array present in a
> column?
>
>
>
> Use spark.sql.types.ArrayType instead of a Scala Array as the root type
> when you define the schema and it will work.
>
>
>
> Regards,
>
>
>
> Magnus
>
>
>
> On Fri, Feb 22, 2019 at 11:15 PM Yeikel  wrote:
>
> I have an "unnamed" json array stored in a *column*.
>
> The format is the following :
>
> column name : news
>
> Data :
>
> [
>   {
> "source": "source1",
> "name": "News site1"
>   },
>{
> "source": "source2",
> "name": "News site2"
>   }
> ]
>
>
> Ideally , I'd like to parse it as :
>
> news ARRAY>
>
> I've tried the following :
>
> import org.apache.spark.sql.Encoders
> import org.apache.spark.sql.types._;
>
> val entry = scala.io.Source.fromFile("1.txt").mkString
>
> val ds = Seq(entry).toDF("news")
>
> val schema = Array(new StructType().add("name", StringType).add("source",
> StringType))
>
> ds.select(from_json($"news", schema) as "news_parsed").show(false)
>
> But this is not allowed :
>
> found   : Array[org.apache.spark.sql.types.StructType]
> required: org.apache.spark.sql.types.StructType
>
>
> I also tried passing the following schema :
>
> val schema = StructType(new StructType().add("name",
> StringType).add("source", StringType))
>
> But this only parsed the first record :
>
> ++
> |news_parsed |
> ++
> |[News site1,source1]|
> ++
>
>
> I am aware that if I fix the JSON like this :
>
> {
>   "news": [
> {
>   "source": "source1",
>   "name": "News site1"
> },
> {
>   "source": "source2",
>   "name": "News site2"
> }
>   ]
> }
>
> The parsing works as expected , but I would like to avoid doing that if
> possible.
>
> Another approach that I can think of is to map on it and parse it using
> third party libraries like Gson , but  I am not sure if this is any better
> than fixing the json beforehand.
>
> I am running Spark 2.1
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How can I parse an "unnamed" json array present in a column?

2019-02-23 Thread Magnus Nilsson
Use spark.sql.types.ArrayType instead of a Scala Array as the root type
when you define the schema and it will work.

Regards,

Magnus

On Fri, Feb 22, 2019 at 11:15 PM Yeikel  wrote:

> I have an "unnamed" json array stored in a *column*.
>
> The format is the following :
>
> column name : news
>
> Data :
>
> [
>   {
> "source": "source1",
> "name": "News site1"
>   },
>{
> "source": "source2",
> "name": "News site2"
>   }
> ]
>
>
> Ideally , I'd like to parse it as :
>
> news ARRAY>
>
> I've tried the following :
>
> import org.apache.spark.sql.Encoders
> import org.apache.spark.sql.types._;
>
> val entry = scala.io.Source.fromFile("1.txt").mkString
>
> val ds = Seq(entry).toDF("news")
>
> val schema = Array(new StructType().add("name", StringType).add("source",
> StringType))
>
> ds.select(from_json($"news", schema) as "news_parsed").show(false)
>
> But this is not allowed :
>
> found   : Array[org.apache.spark.sql.types.StructType]
> required: org.apache.spark.sql.types.StructType
>
>
> I also tried passing the following schema :
>
> val schema = StructType(new StructType().add("name",
> StringType).add("source", StringType))
>
> But this only parsed the first record :
>
> ++
> |news_parsed |
> ++
> |[News site1,source1]|
> ++
>
>
> I am aware that if I fix the JSON like this :
>
> {
>   "news": [
> {
>   "source": "source1",
>   "name": "News site1"
> },
> {
>   "source": "source2",
>   "name": "News site2"
> }
>   ]
> }
>
> The parsing works as expected , but I would like to avoid doing that if
> possible.
>
> Another approach that I can think of is to map on it and parse it using
> third party libraries like Gson , but  I am not sure if this is any better
> than fixing the json beforehand.
>
> I am running Spark 2.1
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Structured Streaming restart results in illegal state exception

2018-11-21 Thread Magnus Nilsson
Hello,

I'm evaluating Structured Streaming trying to understand how resilient the
pipeline is to failures. I ran a small test streaming data from an Azure
Event Hub using Azure Databricks saving the data into a parquet file on the
Databricks filesystem dbfs:/.

I did an unclean shutdown by cancelling the query. Then tried to restart
the query without changing any parameters. This lead to an aborted job due
to an illegal state exception.

"Caused by: java.lang.IllegalStateException: dbfs:/test02/_spark_metadata/2
doesn't exist when compacting batch 9 (compactInterval: 10)"

Now I'm trying to reconcile how stream checkpointing works with the commit
logs in the _spark_metadata/ directory in the data output folder.

How does Spark know there ought to be a _spark_metadata/2 file? And why is
the missing file considered an illegal state. How is the commit metadata in
the dbfs:/ file system integrated with structured streaming checkpointing?
I haven't found any metadata that links a particular committed file (i.e.
where there's a corresponding log file in the _spark_metadata/ folder) to
what batch created it. As far as I know checkpointing and commit logs are
separate from the file stores commit metadata. Somewhere Spark needs to
track what files where created from what batch to be able to uphold exactly
once processing to file stores.

If it does one would think Spark could clean up the dirty writes in the
sink folder and restart the stream from the last good known offset. This is
what I had hoped would happen. No such luck though.

I want to start over from the last known good state and resume the stream
from there. Any input from the list on this issue is greatly appreciated.

Is there any good blog post or other documentation on the file sink
metadata handling in Spark? I've looked but only found synoptic
documentation and nothing that explains the handling in detail.

Thanks,

Magnus


Structured Streaming to file sink results in illegal state exception

2018-11-21 Thread Magnus Nilsson
I'm evaluating Structured Streaming trying to understand how resilient the
pipeline is. I ran a small test streaming data from an Azure Event Hub
using Azure Databricks saving the data into a parquet file on the
Databricks filesystem dbfs:/.

I did an unclean shutdown by cancelling the query. Then tried to restart
the query without changing any parameters. This lead to an illegal state
exception.

"Caused by: java.lang.IllegalStateException: dbfs:/test02/_spark_metadata/2
doesn't exist when compacting batch 9 (compactInterval: 10)"

Now I'm trying to reconcile how checkpointing works with the commit logs in
the _spark_metadata/ directory in the data output folder.

There isn't any _spark_metadata/2 file, that is correct. How does Spark
know there ought to be one? The streaming query created 9 offset log files
and 8 commit log files in the checkpoint directory. The data folder's
_spark_metdata/ folder contains two files listing two files each, the data
directory itself contains 10 parquet files.

If I understand correctly on the input side this means that nine trigger
batches has been started, eight has been finished. On the output side 10
files have been started and four have been finished (commited). Six of them
are "uncommited", ie dirty or in progress writes as far as Spark is
concerned.

I have yet to find where the translation from batch to output files are
logged. If the pipeline is capable of exactly-once-delivery semantics to a
file store shouldn't the translation from batch per partition to resulting
commited file in the data folder be logged somewhere?

Ie in this scenario shouldn't Spark look up what batches are saved in the
commited output files, clean up the dirty writes, then replay the stream
from the last known good position?

I want to back to the last known good state and resume the stream from
there. Any input from the list is greatly appreciated.

Is there any good blog post or other documentation on the metadata handling
in Spark? I've looked but only found synoptic documentation.

Thanks,

Magnus


Re: Spark DataSets and multiple write(.) calls

2018-11-19 Thread Magnus Nilsson
Magnus Nilsson
9:43 AM (0 minutes ago)
to info
I had the same requirements. As far as I know the only way is to extend the
foreachwriter, cache the microbatch result and write to each output.

https://docs.databricks.com/spark/latest/structured-streaming/foreach.html

Unfortunately it seems as if you have to make a new connection "per batch"
instead of creating one long lasting connections for the pipeline as such.
Ie you might have to implement some sort of connection pooling by yourself
depending on sink.

Regards,

Magnus


On Mon, Nov 19, 2018 at 9:13 AM Dipl.-Inf. Rico Bergmann <
i...@ricobergmann.de> wrote:

> Hi!
>
> I have a SparkSQL programm, having one input and 6 ouputs (write). When
> executing this programm every call to write(.) executes the plan. My
> problem is, that I want all these writes to happen in parallel (inside
> one execution plan), because all writes have a common and compute
> intensive subpart, that can be shared by all plans. Is there a
> possibility to do this? (Caching is not a solution because the input
> dataset is way to large...)
>
> Hoping for advises ...
>
> Best, Rico B.
>
>
> ---
> Diese E-Mail wurde von Avast Antivirus-Software auf Viren geprüft.
> https://www.avast.com/antivirus
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Event Hubs properties kvp-value adds " to strings

2018-10-31 Thread Magnus Nilsson
Hello all,

I have this peculiar problem where quote " characters are added to the
beginning and end of my string values.

I get data using Structured Streaming from an Azure Event Hub using a Scala
Notebook in Azure Databricks.

The Dataframe schema received contain a property of type Map named
"properties" containing string/string key value pairs. Only one pair in my
case.

I input data from a C# program where the "properties" property in the
EventData schema is a Dictionary where I input a
string/string property pair.

If I input a string value of _Hello_ in C# I get back a string value of
_"Hello"_ in my Dataframe.

In the body tag of the Schema I input a UTF-8 byte[] and get back a UTF-8
byte[] which I cast to string, this turns out correct, no " are added to
the string value.

If I try to pass in a UTF-8 string as a byte[] as the value of the KvP I
get a serialization exception so that's a no go.

Any idea on why or where the quote characters "" are added to the value of
the string when I read them in Spark?

Any ideas would greatly be appreciated.

//Magnus