[no subject]

2023-08-23 Thread ayan guha
Unsubscribe--
Best Regards,
Ayan Guha


Re: What is the best way to organize a join within a foreach?

2023-04-26 Thread ayan guha
e or two other columns like fictitious email etc.
>>>>>
>>>>> Also for each user_id, provide 10 rows of orders table, meaning that
>>>>> orders table has 5 x 10 rows for each user_id.
>>>>>
>>>>> both as comma separated csv file
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Lead Solutions Architect/Engineering Lead
>>>>> Palantir Technologies Limited
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
>>>>> marco.costant...@rocketfncl.com> wrote:
>>>>>
>>>>>> Thanks Mich,
>>>>>> I have not but I will certainly read up on this today.
>>>>>>
>>>>>> To your point that all of the essential data is in the 'orders'
>>>>>> table; I agree! That distills the problem nicely. Yet, I still have some
>>>>>> questions on which someone may be able to shed some light.
>>>>>>
>>>>>> 1) If my 'orders' table is very large, and will need to be aggregated
>>>>>> by 'user_id', how will Spark intelligently optimize on that constraint
>>>>>> (only read data for relevent 'user_id's). Is that something I have to
>>>>>> instruct Spark to do?
>>>>>>
>>>>>> 2) Without #1, even with windowing, am I asking each partition to
>>>>>> search too much?
>>>>>>
>>>>>> Please, if you have any links to documentation I can read on *how*
>>>>>> Spark works under the hood for these operations, I would appreciate it if
>>>>>> you give them. Spark has become a pillar on my team and knowing it in 
>>>>>> more
>>>>>> detail is warranted.
>>>>>>
>>>>>> Slightly pivoting the subject here; I have tried something. It was a
>>>>>> suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>>>>>> script I now have the line:
>>>>>>
>>>>>> ```
>>>>>> grouped_orders_df =
>>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>>>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>>>>> ```
>>>>>> (json is ultimately needed)
>>>>>>
>>>>>> This actually achieves my goal by putting all of the 'orders' in a
>>>>>> single Array column. Now my worry is, will this column become too large 
>>>>>> if
>>>>>> there are a great many orders. Is there a limit? I have search for
>>>>>> documentation on such a limit but could not find any.
>>>>>>
>>>>>> I truly appreciate your help Mich and team,
>>>>>> Marco.
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Have you thought of using  windowing function
>>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>>>> achieve this?
>>>>>>>
>>>>>>> Effectively all your information is in the orders table.
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>> Mich Talebzadeh,
>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>> Palantir Technologies Limited
>>>>>>> London
>>>>>>> United Kingdom
>>>>>>>
>>>>>>>
>>>>>>>view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>>>>>> marco.costant...@rocketfncl.com> wrote:
>>>>>>>
>>>>>>>> I have two tables: {users, orders}. In this example, let's say that
>>>>>>>> for each 1 User in the users table, there are 10 Orders in the 
>>>>>>>> orders
>>>>>>>> table.
>>>>>>>>
>>>>>>>> I have to use pyspark to generate a statement of Orders for each
>>>>>>>> User. So, a single user will need his/her own list of Orders. 
>>>>>>>> Additionally,
>>>>>>>> I need to send this statement to the real-world user via email (for
>>>>>>>> example).
>>>>>>>>
>>>>>>>> My first intuition was to apply a DataFrame.foreach() on the users
>>>>>>>> DataFrame. This way, I can rely on the spark workers to handle the 
>>>>>>>> email
>>>>>>>> sending individually. However, I now do not know the best way to get 
>>>>>>>> each
>>>>>>>> User's Orders.
>>>>>>>>
>>>>>>>> I will soon try the following (pseudo-code):
>>>>>>>>
>>>>>>>> ```
>>>>>>>> users_df = 
>>>>>>>> orders_df = 
>>>>>>>>
>>>>>>>> #this is poorly named for max understandability in this context
>>>>>>>> def foreach_function(row):
>>>>>>>>   user_id = row.user_id
>>>>>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>>>>>
>>>>>>>>   #here, I'd get any User info from 'row'
>>>>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>>>>   #then, I'd prepare the email and send it
>>>>>>>>
>>>>>>>> users_df.foreach(foreach_function)
>>>>>>>> ```
>>>>>>>>
>>>>>>>> It is my understanding that if I do my user-specific work in the
>>>>>>>> foreach function, I will capitalize on Spark's scalability when doing 
>>>>>>>> that
>>>>>>>> work. However, I am worried of two things:
>>>>>>>>
>>>>>>>> If I take all Orders up front...
>>>>>>>>
>>>>>>>> Will that work?
>>>>>>>> Will I be taking too much? Will I be taking Orders on partitions
>>>>>>>> who won't handle them (different User).
>>>>>>>>
>>>>>>>> If I create the orders_df (filtered) within the foreach function...
>>>>>>>>
>>>>>>>> Will it work?
>>>>>>>> Will that be too much IO to DB?
>>>>>>>>
>>>>>>>> The question ultimately is: How can I achieve this goal efficiently?
>>>>>>>>
>>>>>>>> I have not yet tried anything here. I am doing so as we speak, but
>>>>>>>> am suffering from choice-paralysis.
>>>>>>>>
>>>>>>>> Please and thank you.
>>>>>>>>
>>>>>>> --
Best Regards,
Ayan Guha


Re: Got Error Creating permanent view in Postgresql through Pyspark code

2023-01-05 Thread ayan guha
Hi

What you are trying to do does not make sense. I suggest you to understand
how Views work in SQL. IMHO you are better off creating a table.

Ayan

On Fri, 6 Jan 2023 at 12:20 am, Stelios Philippou 
wrote:

> Vajiha,
>
> I dont see your query working as you hope it will.
>
> spark.sql will execute a query on a database level
>
> to retrieve the temp view you need to go from the sessions.
> i.e
>
> session.sql("SELECT * FROM TEP_VIEW")
>
> You might need to retrieve the data in a collection and iterate over them
> to do batch insertion using spark.sql("INSERt ...");
>
> Hope this helps
>
> Stelios
>
>
> --
> Hi Stelios Philippou,
> I need to create a view table in Postgresql DB using pyspark code. But I'm
> unable to create a view table, I can able to create table through pyspark
> code.
> I need to know Whether through Pyspark code can I create view table in
> postgresql database or not. Thanks for you reply
>
> Pyspark Code:
> df.createOrReplaceTempView("TEMP_VIEW")
> spark.sql("CREATE VIEW TEMP1 AS SELECT * FROM TEMP_VIEW")
>
> On Wed, 4 Jan 2023 at 15:10, Vajiha Begum S A <
> vajihabegu...@maestrowiz.com> wrote:
>
>>
>> I have tried to Create a permanent view in Postgresql DB through Pyspark
>> code, but I have received the below error message. Kindly help me to create
>> a permanent view table in the database.How shall create permanent view
>> using Pyspark code. Please do reply.
>>
>> *Error Message::*
>> *Exception has occurred: Analysis Exception*
>> Not allowed to create a permanent view `default`.`TEMP1` by referencing a
>> temporary view TEMP_VIEW. Please create a temp view instead by CREATE TEMP
>> VIEW
>>
>>
>> Regards,
>> Vajiha
>> Research Analyst
>> MW Solutions
>>
> --
Best Regards,
Ayan Guha


Re: Profiling data quality with Spark

2022-12-27 Thread ayan guha
The way I would approach is to evaluate GE, Deequ (there is a python
binding called pydeequ) and others like Delta Live tables with expectations
from Data Quality feature perspective. All these tools have their pros and
cons, and all of them are compatible with spark as a compute engine.

Also, you may want to look at dbt based DQ toolsets if sql is your thing.

On Wed, 28 Dec 2022 at 3:14 pm, Sean Owen  wrote:

> I think this is kind of mixed up. Data warehouses are simple SQL
> creatures; Spark is (also) a distributed compute framework. Kind of like
> comparing maybe a web server to Java.
> Are you thinking of Spark SQL? then I dunno sure you may well find it more
> complicated, but it's also just a data warehousey SQL surface.
>
> But none of that relates to the question of data quality tools. You could
> use GE with Redshift, or indeed with Spark - are you familiar with it? It's
> probably one of the most common tools people use with Spark for this in
> fact. It's just a Python lib at heart and you can apply it with Spark, but
> _not_ with a data warehouse, so I'm not sure what you're getting at.
>
> Deequ is also commonly seen. It's actually built on Spark, so again,
> confused about this "use Redshift or Snowflake not Spark".
>
> On Tue, Dec 27, 2022 at 9:55 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> SPARK is just another querying engine with a lot of hype.
>>
>> I would highly suggest using Redshift (storage and compute decoupled
>> mode) or Snowflake without all this super complicated understanding of
>> containers/ disk-space, mind numbing variables, rocket science tuning, hair
>> splitting failure scenarios, etc. After that try to choose solutions like
>> Athena, or Trino/ Presto, and then come to SPARK.
>>
>> Try out solutions like  "great expectations" if you are looking for data
>> quality and not entirely sucked into the world of SPARK and want to keep
>> your options open.
>>
>> Dont get me wrong, SPARK used to be great in 2016-2017, but there are
>> superb alternatives now and the industry, in this recession, should focus
>> on getting more value for every single dollar they spend.
>>
>> Best of luck.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Dec 27, 2022 at 7:30 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Well, you need to qualify your statement on data quality. Are you
>>> talking about data lineage here?
>>>
>>> HTH
>>>
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 27 Dec 2022 at 19:25, rajat kumar 
>>> wrote:
>>>
>>>> Hi Folks
>>>> Hoping you are doing well, I want to implement data quality to detect
>>>> issues in data in advance. I have heard about few frameworks like GE/Deequ.
>>>> Can anyone pls suggest which one is good and how do I get started on it?
>>>>
>>>> Regards
>>>> Rajat
>>>>
>>> --
Best Regards,
Ayan Guha


Re: [EXTERNAL] Re: Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-06 Thread ayan guha
ic:
> Suppose I start the application with maxExecutors=500, executors.cores=2,
> because that's the amount of resources needed for the ETL part. But for the
> DL part I only need 20 GPUs. SLS API only allows to set the resources per
> executor/task, so Spark would (try to) allocate up to 500 GPUs, assuming I
> configure the profile with 1 GPU per executor.
> So, the question is how do I limit the stage resources to 20 GPUs total?
>
> Thanks again,
> Shay
>
> --
> *From:* Artemis User 
> *Sent:* Thursday, November 3, 2022 5:23 PM
>
> *To:* user@spark.apache.org 
> *Subject:* [EXTERNAL] Re: Re: Stage level scheduling - lower the number
> of executors when using GPUs
>
>
> *ATTENTION:* This email originated from outside of GM.
>
>   Shay,  You may find this video helpful (with some API code samples that
> you are looking for).  https://www.youtube.com/watch?v=JNQu-226wUc=171s.
> The issue here isn't how to limit the number of executors but to request
> for the right GPU-enabled executors dynamically.  Those executors used in
> pre-GPU stages should be returned back to resource managers with dynamic
> resource allocation enabled (and with the right DRA policies).  Hope this
> helps..
>
> Unfortunately there isn't a lot of detailed docs for this topic since GPU
> acceleration is kind of new in Spark (not straightforward like in TF).   I
> wish the Spark doc team could provide more details in the next release...
>
> On 11/3/22 2:37 AM, Shay Elbaz wrote:
>
> Thanks Artemis. We are *not* using Rapids, but rather using GPUs through
> the Stage Level Scheduling feature with ResourceProfile. In Kubernetes
> you have to turn on shuffle tracking for dynamic allocation, anyhow.
> The question is how we can limit the *number of executors *when building
> a new ResourceProfile, directly (API) or indirectly (some advanced
> workaround).
>
> Thanks,
> Shay
>
>
> --
> *From:* Artemis User  
> *Sent:* Thursday, November 3, 2022 1:16 AM
> *To:* user@spark.apache.org 
> 
> *Subject:* [EXTERNAL] Re: Stage level scheduling - lower the number of
> executors when using GPUs
>
>
> *ATTENTION:* This email originated from outside of GM.
>
>   Are you using Rapids for GPU support in Spark?  Couple of options you
> may want to try:
>
>1. In addition to dynamic allocation turned on, you may also need to
>turn on external shuffling service.
>2. Sounds like you are using Kubernetes.  In that case, you may also
>need to turn on shuffle tracking.
>3. The "stages" are controlled by the APIs.  The APIs for dynamic
>resource request (change of stage) do exist, but only for RDDs (e.g.
>TaskResourceRequest and ExecutorResourceRequest).
>
>
> On 11/2/22 11:30 AM, Shay Elbaz wrote:
>
> Hi,
>
> Our typical applications need less *executors* for a GPU stage than for a
> CPU stage. We are using dynamic allocation with stage level scheduling, and
> Spark tries to maximize the number of executors also during the GPU stage,
> causing a bit of resources chaos in the cluster. This forces us to use a
> lower value for 'maxExecutors' in the first place, at the cost of the CPU
> stages performance. Or try to solve this in the Kubernets scheduler level,
> which is not straightforward and doesn't feel like the right way to go.
>
> Is there a way to effectively use less executors in Stage Level
> Scheduling? The API does not seem to include such an option, but maybe
> there is some more advanced workaround?
>
> Thanks,
> Shay
>
>
>
>
>
>
>
> --
Best Regards,
Ayan Guha


Re: log transfering into hadoop/spark

2022-08-02 Thread ayan guha
ELK or Splunk agents typically.

Or if you are in cloud then there are cloud native solutions which can
forward logs to object store, which can then be read like hdfs.

On Tue, 2 Aug 2022 at 4:43 pm, pengyh  wrote:

> since flume is not continued to develop.
> what's the current opensource tool to transfer webserver logs into
> hdfs/spark?
>
> thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: [pyspark delta] [delta][Spark SQL]: Getting an Analysis Exception. The associated location (path) is not empty

2022-08-02 Thread ayan guha
ark.sparkContext.parallelize(string_20210609.split('\n'))
>>>
>>> # FILES WILL SHOW UP ON THE LEFT UNDER THE FOLDER ICON IF YOU WANT TO 
>>> BROWSE THEM
>>> OUTPUT_DELTA_PATH = './output/delta/'
>>>
>>> spark.sql('CREATE DATABASE IF NOT EXISTS EXERCISE')
>>>
>>> spark.sql('''
>>> CREATE TABLE IF NOT EXISTS EXERCISE.WORKED_HOURS(
>>> worked_date date
>>> , worker_id int
>>> , delete_flag string
>>> , hours_worked double
>>> ) USING DELTA
>>>
>>>
>>> PARTITIONED BY (worked_date)
>>> LOCATION "{0}"
>>> '''.format(OUTPUT_DELTA_PATH)
>>> )
>>>
>>> *Error Message:*
>>>
>>> AnalysisException Traceback (most recent call 
>>> last) in   4 spark.sql('CREATE 
>>> DATABASE IF NOT EXISTS EXERCISE')  5 > 6 spark.sql('''  7 
>>> CREATE TABLE IF NOT EXISTS EXERCISE.WORKED_HOURS(  8 
>>> worked_date date
>>> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\session.py in 
>>> sql(self, sqlQuery)647 [Row(f1=1, f2=u'row1'), Row(f1=2, 
>>> f2=u'row2'), Row(f1=3, f2=u'row3')]648 """--> 649 
>>> return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)650   
>>>   651 @since(2.0)
>>> \Users\kyjan\spark-3.0.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py
>>>  in __call__(self, *args)   13021303 answer = 
>>> self.gateway_client.send_command(command)-> 1304 return_value = 
>>> get_return_value(   1305 answer, self.gateway_client, 
>>> self.target_id, self.name)   1306
>>> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in 
>>> deco(*a, **kw)132 # Hide where the exception came from 
>>> that shows a non-Pythonic133 # JVM exception 
>>> message.--> 134 raise_from(converted)135 
>>> else:136 raise
>>> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in 
>>> raise_from(e)
>>> AnalysisException: Cannot create table ('`EXERCISE`.`WORKED_HOURS`'). The 
>>> associated location ('output/delta') is not empty.;
>>>
>>>
>>> --
>>> Best Wishes,
>>> Kumba Janga
>>>
>>> "The only way of finding the limits of the possible is by going beyond
>>> them into the impossible"
>>> -Arthur C. Clarke
>>>
>>
>
> --
> Best Wishes,
> Kumba Janga
>
> "The only way of finding the limits of the possible is by going beyond
> them into the impossible"
> -Arthur C. Clarke
>
-- 
Best Regards,
Ayan Guha


Re: Salting technique doubt

2022-07-31 Thread ayan guha
One option is create a separate column in table A with salting. Use it as
partition key. Use original column for joining.

Ayan

On Sun, 31 Jul 2022 at 6:45 pm, Jacob Lynn  wrote:

> The key is this line from Amit's email (emphasis added):
>
> > Change the join_col to *all possible values* of the sale.
>
> The two tables are treated asymmetrically:
>
> 1. The skewed table gets random salts appended to the join key.
> 2. The other table gets all possible salts appended to the join key (e.g.
> using a range array literal + explode).
>
> Thus guarantees that every row in the skewed table will match a row in the
> other table. This StackOverflow answer
> <https://stackoverflow.com/a/57951114/1892435> gives an example.
>
> Op zo 31 jul. 2022 om 10:41 schreef Amit Joshi  >:
>
>> Hi Sid,
>>
>> I am not sure I understood your question.
>> But the keys cannot be different post salting in both the tables, this is
>> what i have shown in the explanation.
>> You salt Table A and then explode Table B to create all possible values.
>>
>> In your case, I do not understand, what Table B has x_8/9. It should be
>> all possible values which you used to create salt.
>>
>> I hope you understand.
>>
>> Thanks
>>
>>
>>
>> On Sun, Jul 31, 2022 at 10:02 AM Sid  wrote:
>>
>>> Hi Amit,
>>>
>>> Thanks for your reply. However, your answer doesn't seem different from
>>> what I have explained.
>>>
>>> My question is after salting if the keys are different like in my
>>> example then post join there would be no results assuming the join type as
>>> inner join because even though the keys are segregated in different
>>> partitions based on unique keys they are not matching because x_1/x_2
>>> !=x_8/x_9
>>>
>>> How do you ensure that the results are matched?
>>>
>>> Best,
>>> Sid
>>>
>>> On Sun, Jul 31, 2022 at 1:34 AM Amit Joshi 
>>> wrote:
>>>
>>>> Hi Sid,
>>>>
>>>> Salting is normally a technique to add random characters to existing
>>>> values.
>>>> In big data we can use salting to deal with the skewness.
>>>> Salting in join cas be used as :
>>>> * Table A-*
>>>> Col1, join_col , where join_col values are {x1, x2, x3}
>>>> x1
>>>> x1
>>>> x1
>>>> x2
>>>> x2
>>>> x3
>>>>
>>>> *Table B-*
>>>> join_col, Col3 , where join_col  value are {x1, x2}
>>>> x1
>>>> x2
>>>>
>>>> *Problem: *Let say for table A, data is skewed on x1
>>>> Now salting goes like this.  *Salt value =2*
>>>> For
>>>> *table A, *create a new col with values by salting join col
>>>> *New_Join_Col*
>>>> x1_1
>>>> x1_2
>>>> x1_1
>>>> x2_1
>>>> x2_2
>>>> x3_1
>>>>
>>>> For *Table B,*
>>>> Change the join_col to all possible values of the sale.
>>>> join_col
>>>> x1_1
>>>> x1_2
>>>> x2_1
>>>> x2_2
>>>>
>>>> And then join it like
>>>> table1.join(table2, where tableA.new_join_col == tableB. join_col)
>>>>
>>>> Let me know if you have any questions.
>>>>
>>>> Regards
>>>> Amit Joshi
>>>>
>>>>
>>>> On Sat, Jul 30, 2022 at 7:16 PM Sid  wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I was trying to understand the Salting technique for the column where
>>>>> there would be a huge load on a single partition because of the same keys.
>>>>>
>>>>> I referred to one youtube video with the below understanding:
>>>>>
>>>>> So, using the salting technique we can actually change the joining
>>>>> column values by appending some random number in a specified range.
>>>>>
>>>>> So, suppose I have these two values in a partition of two different
>>>>> tables:
>>>>>
>>>>> Table A:
>>>>> Partition1:
>>>>> x
>>>>> .
>>>>> .
>>>>> .
>>>>> x
>>>>>
>>>>> Table B:
>>>>> Partition1:
>>>>> x
>>>>> .
>>>>> .
>>>>> .
>>>>> x
>>>>>
>>>>> After Salting it would be something like the below:
>>>>>
>>>>> Table A:
>>>>> Partition1:
>>>>> x_1
>>>>>
>>>>> Partition 2:
>>>>> x_2
>>>>>
>>>>> Table B:
>>>>> Partition1:
>>>>> x_3
>>>>>
>>>>> Partition 2:
>>>>> x_8
>>>>>
>>>>> Now, when I inner join these two tables after salting in order to
>>>>> avoid data skewness problems, I won't get a match since the keys are
>>>>> different after applying salting techniques.
>>>>>
>>>>> So how does this resolves the data skewness issue or if there is some
>>>>> understanding gap?
>>>>>
>>>>> Could anyone help me in layman's terms?
>>>>>
>>>>> TIA,
>>>>> Sid
>>>>>
>>>> --
Best Regards,
Ayan Guha


Re: very simple UI on webpage to display x/y plots+histogram of data stored in hive

2022-07-18 Thread ayan guha
If possible, start with a Jupyter or databricks notebook

On Tue, 19 Jul 2022 at 7:35 am, Joris Billen 
wrote:

> Thank you - looks like it COULD do it.
> Have to try if I can have a simple UI, user selects one out of 100
> options, and receives the correct x/y plot and correct histogram of data
> stored in hive and retrieved with spark into pandas…
>
> Many thanks for your suggestion!
>
>
> On 18 Jul 2022, at 15:08, Sean Owen  wrote:
>
> You pull your data via Spark to a pandas DF and do whatever you want
>
>
> --
Best Regards,
Ayan Guha


Re: reading each JSON file from dataframe...

2022-07-12 Thread ayan guha
-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>
>> +-+--+---+
>>
>> Cheers,
>> Enrico
>>
>>
>>
>>
>> Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:
>>
>> Hello there,
>>
>> I have a dataframe with the following...
>>
>>
>> +-+---+---+
>> |entity_id|file_path
>>  |other_useful_id|
>>
>> +-+---+---+
>>
>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>
>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>
>> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>>
>> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>
>> +-+---+---+
>>
>> I would like to read each row from `file_path` and write the result to
>> another dataframe containing `entity_id`, `other_useful_id`,
>> `json_content`, `file_path`.
>> Assume that I already have the required HDFS url libraries in my
>> classpath.
>>
>> Please advice,
>> Muthu
>>
>>
>>
>>
>

-- 
Best Regards,
Ayan Guha


Re: Question about bucketing and custom partitioners

2022-04-11 Thread ayan guha
IMHO you should ask this to dev email for better response and suggestions

On Tue, 12 Apr 2022 at 1:47 am, David Diebold 
wrote:

> Hello,
>
> I have a few questions related to bucketing and custom partitioning in
> dataframe api.
>
> I am considering bucketing to perform one-side free shuffle join in
> incremental jobs, but there is one thing that I'm not happy with.
> Data is likely to grow/skew over time. At some point, i would need to
> change amount of buckets which would provoke shuffle.
>
> Instead of this, I would like to use a custom partitioner, that would
> replace shuffle by narrow transformation.
> That is something that was feasible with RDD developer api. For example, I
> could use such partitioning scheme:
> partition_id = (nb_partitions-1) * ( hash(column) - Int.minValue) /
> (Int.maxValue - Int.minValue)
> When I multiply amount of partitions by 2 each new partition depends only
> on one partition from parent (=> narrow transformation)
>
> So, here are my questions :
>
> 1/ Is it possible to use custom partitioner when saving a dataframe with
> bucketing ?
> 2/ Still with the API dataframe, is it possible to apply custom
> partitioner to a dataframe ?
> Is it possible to repartition the dataframe with a narrow
> transformation like what could be done with RDD ?
> Is there some sort of dataframe developer API ? Do you have any
> pointers on this ?
>
> Thanks !
>
> David
>
-- 
Best Regards,
Ayan Guha


Re: how to change data type for columns of dataframe

2022-04-01 Thread ayan guha
Please use cast. Also I would strongly recommend to go through spark doco,
its pretty good.

On Sat, 2 Apr 2022 at 12:43 pm,  wrote:

> Hi
>
> I got a dataframe object from other application, it means this obj is
> not generated by me.
> How can I change the data types for some columns in this dataframe?
>
> For example, change the column type from Int to Float.
>
> Thanks.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: pivoting panda dataframe

2022-03-16 Thread ayan guha
;>
>>>>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)})
>>>>>
>>>>> df.transform(lambda x: x + 1)
>>>>>
>>>>> You will now see that all numbers are +1
>>>>>
>>>>> You can find more information about pandas API on spark transform
>>>>> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html?highlight=pyspark%20pandas%20dataframe%20transform#pyspark.pandas.DataFrame.transform
>>>>> or in yours notbook
>>>>> df.transform?
>>>>>
>>>>> Signature:
>>>>> df.transform(
>>>>> func: Callable[..., ForwardRef('Series')],
>>>>> axis: Union[int, str] = 0,
>>>>> *args: Any,
>>>>> **kwargs: Any,) -> 'DataFrame'Docstring:
>>>>> Call ``func`` on self producing a Series with transformed values
>>>>> and that has the same length as its input.
>>>>>
>>>>> See also `Transform and apply a function
>>>>> <https://koalas.readthedocs.io/en/latest/user_guide/transform_apply.html>`_.
>>>>>
>>>>> .. note:: this API executes the function once to infer the type which is
>>>>>  potentially expensive, for instance, when the dataset is created 
>>>>> after
>>>>>  aggregations or sorting.
>>>>>
>>>>>  To avoid this, specify return type in ``func``, for instance, as 
>>>>> below:
>>>>>
>>>>>  >>> def square(x) -> ps.Series[np.int32]:
>>>>>  ... return x ** 2
>>>>>
>>>>>  pandas-on-Spark uses return type hint and does not try to infer the 
>>>>> type.
>>>>>
>>>>> .. note:: the series within ``func`` is actually multiple pandas series 
>>>>> as the
>>>>> segments of the whole pandas-on-Spark series; therefore, the length 
>>>>> of each series
>>>>> is not guaranteed. As an example, an aggregation against each series
>>>>> does work as a global aggregation but an aggregation of each segment. 
>>>>> See
>>>>> below:
>>>>>
>>>>> >>> def func(x) -> ps.Series[np.int32]:
>>>>> ... return x + sum(x)
>>>>>
>>>>> Parameters
>>>>> --
>>>>> func : function
>>>>> Function to use for transforming the data. It must work when pandas 
>>>>> Series
>>>>> is passed.
>>>>> axis : int, default 0 or 'index'
>>>>> Can only be set to 0 at the moment.
>>>>> *args
>>>>> Positional arguments to pass to func.
>>>>> **kwargs
>>>>> Keyword arguments to pass to func.
>>>>>
>>>>> Returns
>>>>> ---
>>>>> DataFrame
>>>>> A DataFrame that must have the same length as self.
>>>>>
>>>>> Raises
>>>>> --
>>>>> Exception : If the returned DataFrame has a different length than self.
>>>>>
>>>>> See Also
>>>>> 
>>>>> DataFrame.aggregate : Only perform aggregating type operations.
>>>>> DataFrame.apply : Invoke function on DataFrame.
>>>>> Series.transform : The equivalent function for Series.
>>>>>
>>>>> Examples
>>>>> 
>>>>> >>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)}, columns=['A', 
>>>>> >>> 'B'])
>>>>> >>> df
>>>>>A  B
>>>>> 0  0  1
>>>>> 1  1  2
>>>>> 2  2  3
>>>>>
>>>>> >>> def square(x) -> ps.Series[np.int32]:
>>>>> ... return x ** 2
>>>>> >>> df.transform(square)
>>>>>A  B
>>>>> 0  0  1
>>>>> 1  1  4
>>>>> 2  4  9
>>>>>
>>>>> You can omit the type hint and let pandas-on-Spark infer its type.
>>>>>
>>>>> >>> df.transform(lambda x: x ** 2)
>>>>>A  B
>>>>> 0  0  1
>>>>> 1  1  4
>>>>> 2  4  9
>>>>>
>>>>> For multi-index columns:
>>>>>
>>>>> >>> df.columns = [('X', 'A'), ('X', 'B')]
>>>>> >>> df.transform(square)  # doctest: +NORMALIZE_WHITESPACE
>>>>>X
>>>>>A  B
>>>>> 0  0  1
>>>>> 1  1  4
>>>>> 2  4  9
>>>>>
>>>>> >>> (df * -1).transform(abs)  # doctest: +NORMALIZE_WHITESPACE
>>>>>X
>>>>>A  B
>>>>> 0  0  1
>>>>> 1  1  2
>>>>> 2  2  3
>>>>>
>>>>> You can also specify extra arguments.
>>>>>
>>>>> >>> def calculation(x, y, z) -> ps.Series[int]:
>>>>> ... return x ** y + z
>>>>> >>> df.transform(calculation, y=10, z=20)  # doctest: 
>>>>> >>> +NORMALIZE_WHITESPACE
>>>>>   X
>>>>>   A  B
>>>>> 020 21
>>>>> 121   1044
>>>>> 2  1044  59069File:  /opt/spark/python/pyspark/pandas/frame.pyType:   
>>>>>method
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> tir. 15. mar. 2022 kl. 19:33 skrev Andrew Davidson >>>> >:
>>>>>
>>>>>> Hi Bjorn
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have been looking for spark transform for a while. Can you send me
>>>>>> a link to the pyspark function?
>>>>>>
>>>>>>
>>>>>>
>>>>>> I assume pandas transform is not really an option. I think it will
>>>>>> try to pull the entire dataframe into the drivers memory.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Kind regards
>>>>>>
>>>>>>
>>>>>>
>>>>>> Andy
>>>>>>
>>>>>>
>>>>>>
>>>>>> p.s. My real problem is that spark does not allow you to bind
>>>>>> columns. You can use union() to bind rows. I could get the equivalent of
>>>>>> cbind() using union().transform()
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *Bjørn Jørgensen 
>>>>>> *Date: *Tuesday, March 15, 2022 at 10:37 AM
>>>>>> *To: *Mich Talebzadeh 
>>>>>> *Cc: *"user @spark" 
>>>>>> *Subject: *Re: pivoting panda dataframe
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.transpose.html
>>>>>>  we
>>>>>> have that transpose in pandas api for spark to.
>>>>>>
>>>>>>
>>>>>>
>>>>>> You also have stack() and multilevel
>>>>>> https://pandas.pydata.org/pandas-docs/stable/user_guide/reshaping.html
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> tir. 15. mar. 2022 kl. 17:50 skrev Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com>:
>>>>>>
>>>>>>
>>>>>> hi,
>>>>>>
>>>>>>
>>>>>>
>>>>>> Is it possible to pivot a panda dataframe by making the row column
>>>>>> heading?
>>>>>>
>>>>>>
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>  [image: Image removed by sender.]  view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Bjørn Jørgensen
>>>>>> Vestre Aspehaug 4
>>>>>> <https://www.google.com/maps/search/Vestre+Aspehaug+4?entry=gmail=g>,
>>>>>> 6010 Ålesund
>>>>>> Norge
>>>>>>
>>>>>> +47 480 94 297
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Bjørn Jørgensen
>>>>> Vestre Aspehaug 4
>>>>> <https://www.google.com/maps/search/Vestre+Aspehaug+4?entry=gmail=g>,
>>>>> 6010 Ålesund
>>>>> Norge
>>>>>
>>>>> +47 480 94 297
>>>>>
>>>> --
Best Regards,
Ayan Guha


Re: Decompress Gzip files from EventHub with Structured Streaming

2022-03-08 Thread ayan guha
Hi

IMHO this is not the best use of spark. I would suggest to use simple azure
function to unzip.

Is there any specific reason to use gzip over event hub?

If you can wait 10-20 sec to process, you can use eventhub capture to write
data to storage and  then process it.

It all depends on compute you are willing to pay, every 3 sec of scheduled
job should not give you any benefit over streaming.

Best
Ayan

On Wed, 9 Mar 2022 at 5:42 am, Data Guy  wrote:

> Hi everyone,
>
> **
>
> Context: I have events coming into Databricks from an Azure Event Hub in a
> Gzip compressed format. Currently, I extract the files with a UDF and send
> the unzipped data into the silver layer in my Delta Lake with .write. Note
> that even though data comes in continuously I do not use .writeStream as of
> now.
>
> I have a few design-related questions that I hope someone with experience
> could help me with!
>
>1. Is there a better way to extract Gzip files than a UDF?
>2. Is Spark Structured Streaming or Batch with Databricks Jobs better?
>(Pipeline runs every 3 hours once, but the data is continuously coming from
>Event Hub)
>3. Should I use Autoloader or just simply stream data into Databricks
>using Event Hubs?
>
> I am especially curious about the trade-offs and the best way forward. I
> don't have massive amounts of data.
>
> Thank you very much in advance!
>
> Best wishes,
> Maurizio Vancho Argall
>
> --
Best Regards,
Ayan Guha


Re: Question about spark.sql min_by

2022-02-21 Thread ayan guha
Why this can not be done by window function? Or is min by is just a short
hand?

On Tue, 22 Feb 2022 at 12:42 am, Sean Owen  wrote:

> From the source code, looks like this function was added to pyspark in
> Spark 3.3, up for release soon. It exists in SQL. You can still use it in
> SQL with `spark.sql(...)` in Python though, not hard.
>
> On Mon, Feb 21, 2022 at 4:01 AM David Diebold 
> wrote:
>
>> Hello all,
>>
>> I'm trying to use the spark.sql min_by aggregation function with pyspark.
>> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>>
>> I have a dataframe made of these columns:
>> - productId : int
>> - sellerId : int
>> - price : double
>>
>> For each product, I want to get the seller who sells the product for the
>> cheapest price.
>>
>> Naive approach would be to do this, but I would expect two shuffles:
>>
>> import spark.sql.functions as F
>> cheapest_prices_df  =
>> df.groupby('productId').agg(F.min('price').alias('price'))
>> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
>> 'price'])
>>
>> I would had loved to do this instead :
>>
>> import spark.sql.functions as F
>> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
>> F.min_by('sellerId', 'price'))
>>
>> Unfortunately min_by does not seem available in pyspark sql functions,
>> whereas I can see it in the doc :
>> https://spark.apache.org/docs/latest/api/sql/index.html
>>
>> I have managed to use min_by with this approach but it looks slow (maybe
>> because of temp table creation ?):
>>
>> df.createOrReplaceTempView("table")
>> cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId,
>> min(price) from table group by productId")
>>
>> Is there a way I can rely on min_by directly in groupby ?
>> Is there some code missing in pyspark wrapper to make min_by visible
>> somehow ?
>>
>> Thank you in advance for your help.
>>
>> Cheers
>> David
>>
> --
Best Regards,
Ayan Guha


Re: Cast int to string not possible?

2022-02-17 Thread ayan guha
Can you try to cast any other Int field which is NOT a partition column?

On Thu, 17 Feb 2022 at 7:34 pm, Gourav Sengupta 
wrote:

> Hi,
>
> This appears interesting, casting INT to STRING has never been an issue
> for me.
>
> Can you just help us with the output of : df.printSchema()  ?
>
> I prefer to use SQL, and the method I use for casting is: CAST(< name>> AS STRING) <>.
>
> Regards,
> Gourav
>
>
>
>
>
>
> On Thu, Feb 17, 2022 at 6:02 AM Rico Bergmann 
> wrote:
>
>> Here is the code snippet:
>>
>> var df = session.read().parquet(basepath);
>> for(Column partition : partitionColumnsList){
>>   df = df.withColumn(partition.getName(),
>> df.col(partition.getName()).cast(partition.getType()));
>> }
>>
>> Column is a class containing Schema Information, like for example the
>> name of the column and the data type of the column.
>>
>> Best, Rico.
>>
>> > Am 17.02.2022 um 03:17 schrieb Morven Huang :
>> >
>> > Hi Rico, you have any code snippet? I have no problem casting int to
>> string.
>> >
>> >> 2022年2月17日 上午12:26,Rico Bergmann  写道:
>> >>
>> >> Hi!
>> >>
>> >> I am reading a partitioned dataFrame into spark using automatic type
>> inference for the partition columns. For one partition column the data
>> contains an integer, therefor Spark uses IntegerType for this column. In
>> general this is supposed to be a StringType column. So I tried to cast this
>> column to StringType. But this fails with AnalysisException “cannot cast
>> int to string”.
>> >>
>> >> Is this a bug? Or is it really not allowed to cast an int to a string?
>> >>
>> >> I’m using Spark 3.1.1
>> >>
>> >> Best regards
>> >>
>> >> Rico.
>> >>
>> >> -
>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>
>> >
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
Best Regards,
Ayan Guha


Re: Does spark support something like the bind function in R?

2022-02-08 Thread ayan guha
Hi

In python, or in general in spark, you can just "read" the files and select
the column. I am assuming you are reading each file individually in
separate dataframes and joining them. Instead, you can read all the files
in single dataframe and select 1 column.

On Wed, Feb 9, 2022 at 2:55 AM Andrew Davidson 
wrote:

> I need to create a single table by selecting one column from thousands of
> files. The columns are all of the same type, have the same number of rows
> and rows names. I am currently using join. I get OOM on mega-mem cluster
> with 2.8 TB.
>
>
>
> Does spark have something like cbind() “Take a sequence of vector, matrix
> or data-frame arguments and combine by *c*olumns or *r*ows,
> respectively. “
>
>
>
> https://www.rdocumentation.org/packages/base/versions/3.6.2/topics/cbind
>
>
>
> Digging through the spark documentation I found a udf example
>
> https://spark.apache.org/docs/latest/sparkr.html#dapply
>
>
>
> ```
>
> *# Convert waiting time from hours to seconds.*
>
> *# Note that we can apply UDF to DataFrame.*
>
> schema <- structType(structField("eruptions", "double"), structField(
> "waiting", "double"),
>
>  structField("waiting_secs", "double"))
>
> df1 <- dapply(df, *function*(x) { x <- cbind(x, x$waiting * 60) }, schema)
>
> head(collect(df1))
>
> *##  eruptions waiting waiting_secs*
>
> *##1 3.600  79 4740*
>
> *##2 1.800  54 3240*
>
> *##3 3.333  74 4440*
>
> *##4 2.283  62 3720*
>
> *##5 4.533  85 5100*
>
> *##6 2.883  55 3300*
>
> ```
>
>
>
> I wonder if this is just a wrapper around join? If so it is probably not
> going to help me out.
>
>
>
> Also I would prefer to work in python
>
>
>
> Any thoughts?
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
>
>


-- 
Best Regards,
Ayan Guha


Re: add an auto_increment column

2022-02-07 Thread ayan guha
For this req you can rank or dense rank.

On Tue, 8 Feb 2022 at 1:12 pm,  wrote:

> Hello,
>
> For this query:
>
> >>> df.select("*").orderBy("amount",ascending=False).show()
> +--+--+
> | fruit|amount|
> +--+--+
> |tomato| 9|
> | apple| 6|
> |cherry| 5|
> |orange| 3|
> +--+--+
>
>
> I want to add a column "top", in which the value is: 1,2,3... meaning
> top1, top2, top3...
>
> How can I do it?
>
> Thanks.
>
>
>
>
> On 07/02/2022 21:18, Gourav Sengupta wrote:
> > Hi,
> >
> > can we understand the requirement first?
> >
> > What is that you are trying to achieve by auto increment id? Do you
> > just want different ID's for rows, or you may want to keep track of
> > the record count of a table as well, or do you want to do use them for
> > surrogate keys?
> >
> > If you are going to insert records multiple times in a table, and
> > still have different values?
> >
> > I think without knowing the requirements all the above responses, like
> > everything else where solutions are reached before understanding the
> > problem, has high chances of being wrong.
> >
> > Regards,
> > Gourav Sengupta
> >
> > On Mon, Feb 7, 2022 at 2:21 AM Siva Samraj 
> > wrote:
> >
> >> Monotonically_increasing_id() will give the same functionality
> >>
> >> On Mon, 7 Feb, 2022, 6:57 am ,  wrote:
> >>
> >>> For a dataframe object, how to add a column who is auto_increment
> >>> like
> >>> mysql's behavior?
> >>>
> >>> Thank you.
> >>>
> >>>
> >>
> > -
> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: add an auto_increment column

2022-02-06 Thread ayan guha
Try this:
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html



On Mon, 7 Feb 2022 at 12:27 pm,  wrote:

> For a dataframe object, how to add a column who is auto_increment like
> mysql's behavior?
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: How to delete the record

2022-01-27 Thread ayan guha
>>>>
>>>>> So you mean if I use those file formats it will do the work of CDC
>>>>> automatically or I would have to handle it via code ?
>>>>>
>>>>> Hi Mich,
>>>>>
>>>>> Not sure if I understood you. Let me try to explain my scenario.
>>>>> Suppose there is a Id "1" which is inserted today, so I transformed and
>>>>> ingested it. Now suppose if this user id is deleted from the source 
>>>>> itself.
>>>>> Then how can I delete it in my transformed db
>>>>> ?
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 27 Jan 2022, 22:44 Sean Owen,  wrote:
>>>>>
>>>>>> This is what storage engines like Delta, Hudi, Iceberg are for. No
>>>>>> need to manage it manually or use a DBMS. These formats allow deletes,
>>>>>> upserts, etc of data, using Spark, on cloud storage.
>>>>>>
>>>>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Where ETL data is stored?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *But now the main problem is when the record at the source is
>>>>>>> deleted, it should be deleted in my final transformed record too.*
>>>>>>>
>>>>>>>
>>>>>>> If your final sync (storage) is data warehouse, it should be soft
>>>>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>>
>>>>>>>view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I am using Spark incremental approach for bringing the latest data
>>>>>>>> everyday. Everything works fine.
>>>>>>>>
>>>>>>>> But now the main problem is when the record at the source is
>>>>>>>> deleted, it should be deleted in my final transformed record too.
>>>>>>>>
>>>>>>>> How do I capture such changes and change my table too ?
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>> Sid
>>>>>>>>
>>>>>>>> --
Best Regards,
Ayan Guha


Re: What happens when a partition that holds data under a task fails

2022-01-23 Thread ayan guha
ords distributed across 5 nodes and the
>>>>> partition of the first node holding 2 records failed. I understand that it
>>>>> will re-process this partition but how will it come to know that XYZ
>>>>> partition was holding XYZ data so that it will pick again only those
>>>>> records and reprocess it? In case of failure of a partition, is there a
>>>>> data loss? or is it stored somewhere?
>>>>>
>>>>> Maybe my question is very naive but I am trying to understand it in a
>>>>> better way.
>>>>>
>>>>> On Fri, Jan 21, 2022 at 11:32 PM Sean Owen  wrote:
>>>>>
>>>>>> In that case, the file exists in parts across machines. No, tasks
>>>>>> won't re-read the whole file; no task does or can do that. Failed
>>>>>> partitions are reprocessed, but as in the first pass, the same partition 
>>>>>> is
>>>>>> processed.
>>>>>>
>>>>>> On Fri, Jan 21, 2022 at 12:00 PM Siddhesh Kalgaonkar <
>>>>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello team,
>>>>>>>
>>>>>>> I am aware that in case of memory issues when a task fails, it will
>>>>>>> try to restart 4 times since it is a default number and if it still 
>>>>>>> fails
>>>>>>> then it will cause the entire job to fail.
>>>>>>>
>>>>>>> But suppose if I am reading a file that is distributed across nodes
>>>>>>> in partitions. So, what will happen if a partition fails that holds some
>>>>>>> data? Will it re-read the entire file and get that specific subset of 
>>>>>>> data
>>>>>>> since the driver has the complete information? or will it copy the data 
>>>>>>> to
>>>>>>> the other working nodes or tasks and try to run it?
>>>>>>>
>>>>>>

-- 
Best Regards,
Ayan Guha


Re: Naming files while saving a Dataframe

2021-07-17 Thread ayan guha
Hi Eric - yes that maybe the best way to resolve this. I have not seen any
specific way to define names of the actual files written by spark. Finally,
make sure you optimize number of files written.

On Sun, Jul 18, 2021 at 2:39 AM Eric Beabes 
wrote:

> Reason we've two jobs writing to the same directory is that the data is
> partitioned by 'day' (mmdd) but the job runs hourly. Maybe the only way
> to do this is to create an hourly partition (/mmdd/hh). Is that the
> only way to solve this?
>
> On Fri, Jul 16, 2021 at 5:45 PM ayan guha  wrote:
>
>> IMHO - this is a bad idea esp in failure scenarios.
>>
>> How about creating a subfolder each for the jobs?
>>
>> On Sat, 17 Jul 2021 at 9:11 am, Eric Beabes 
>> wrote:
>>
>>> We've two (or more) jobs that write data into the same directory via a
>>> Dataframe.save method. We need to be able to figure out which job wrote
>>> which file. Maybe provide a 'prefix' to the file names. I was wondering if
>>> there's any 'option' that allows us to do this. Googling didn't come up
>>> with any solution so thought of asking the Spark experts on this mailing
>>> list.
>>>
>>> Thanks in advance.
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>

-- 
Best Regards,
Ayan Guha


Re: Naming files while saving a Dataframe

2021-07-16 Thread ayan guha
IMHO - this is a bad idea esp in failure scenarios.

How about creating a subfolder each for the jobs?

On Sat, 17 Jul 2021 at 9:11 am, Eric Beabes 
wrote:

> We've two (or more) jobs that write data into the same directory via a
> Dataframe.save method. We need to be able to figure out which job wrote
> which file. Maybe provide a 'prefix' to the file names. I was wondering if
> there's any 'option' that allows us to do this. Googling didn't come up
> with any solution so thought of asking the Spark experts on this mailing
> list.
>
> Thanks in advance.
>
-- 
Best Regards,
Ayan Guha


Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread ayan guha
t;>>,
>>>>>>>>> col("newtopic_value.status").alias("status")). \
>>>>>>>>>  writeStream. \
>>>>>>>>>  outputMode('append'). \
>>>>>>>>>  option("truncate", "false"). \
>>>>>>>>>   *   foreachBatch(sendToControl). \*
>>>>>>>>>  trigger(processingTime='2 seconds'). \
>>>>>>>>>  queryName(config['MDVariables']['newtopic']).
>>>>>>>>> \
>>>>>>>>>  start()
>>>>>>>>>
>>>>>>>>> result = streamingDataFrame.select( \
>>>>>>>>>  col("parsed_value.rowkey").alias("rowkey") \
>>>>>>>>>, col("parsed_value.ticker").alias("ticker") \
>>>>>>>>>,
>>>>>>>>> col("parsed_value.timeissued").alias("timeissued") \
>>>>>>>>>, col("parsed_value.price").alias("price")). \
>>>>>>>>>  writeStream. \
>>>>>>>>>  outputMode('append'). \
>>>>>>>>>  option("truncate", "false"). \
>>>>>>>>>  *foreachBatch(sendToSink). \*
>>>>>>>>>  trigger(processingTime='30 seconds'). \
>>>>>>>>>  option('checkpointLocation',
>>>>>>>>> checkpoint_path). \
>>>>>>>>>  queryName(config['MDVariables']['topic']). \
>>>>>>>>>  start()
>>>>>>>>> print(result)
>>>>>>>>>
>>>>>>>>> except Exception as e:
>>>>>>>>> print(f"""{e}, quitting""")
>>>>>>>>> sys.exit(1)
>>>>>>>>>
>>>>>>>>> Inside that function say *sendToSink *you can get the df and
>>>>>>>>> batchId
>>>>>>>>>
>>>>>>>>> def sendToSink(df, batchId):
>>>>>>>>> if(len(df.take(1))) > 0:
>>>>>>>>> print(f"""md batchId is {batchId}""")
>>>>>>>>> df.show(100,False)
>>>>>>>>> df. persist()
>>>>>>>>> # write to BigQuery batch table
>>>>>>>>> s.writeTableToBQ(df, "append",
>>>>>>>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>>>>>>>>> df.unpersist()
>>>>>>>>> print(f"""wrote to DB""")
>>>>>>>>> else:
>>>>>>>>> print("DataFrame md is empty")
>>>>>>>>>
>>>>>>>>> And you have created DF from the other topic newtopic
>>>>>>>>>
>>>>>>>>> def sendToControl(dfnewtopic, batchId):
>>>>>>>>> if(len(dfnewtopic.take(1))) > 0:
>>>>>>>>> ..
>>>>>>>>>
>>>>>>>>> Now you have  two dataframe* df* and *dfnewtopic* in the same
>>>>>>>>> session. Will you be able to join these two dataframes through common 
>>>>>>>>> key
>>>>>>>>> value?
>>>>>>>>>
>>>>>>>>> HTH
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>view my Linkedin profile
>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other property 
>>>>>>>>> which may
>>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>>>> damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello! Sure thing!
>>>>>>>>>>
>>>>>>>>>> I'm reading them *separately*, both are apps written with Scala
>>>>>>>>>> + Spark Structured Streaming.
>>>>>>>>>>
>>>>>>>>>> I feel like I missed some details on my original thread (sorry it
>>>>>>>>>> was past 4 AM) and it was getting frustrating
>>>>>>>>>> Please let me try to clarify some points:
>>>>>>>>>>
>>>>>>>>>> *Transactions Created Consumer*
>>>>>>>>>> ---
>>>>>>>>>> | Kafka trx-created-topic   |   <--- (Scala + SparkStructured
>>>>>>>>>> Streaming) ConsumerApp --->  Sinks to ---> Postgres DB Table
>>>>>>>>>> (Transactions)
>>>>>>>>>> ---
>>>>>>>>>>
>>>>>>>>>> *Transactions Processed Consumer*
>>>>>>>>>> -
>>>>>>>>>> | Kafka trx-processed-topic |  <---   1) (Scala + SparkStructured
>>>>>>>>>> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a")
>>>>>>>>>> -   2) Selects the
>>>>>>>>>> Ids
>>>>>>>>>> -
>>>>>>>>>> |   Postgres / Trx table |. <--- 3) Fetches the rows w/
>>>>>>>>>> the matching ids that have status 'created (let's call it "b")
>>>>>>>>>> - 4)  Performs an
>>>>>>>>>> intersection between "a" and "b" resulting in a 
>>>>>>>>>> "b_that_needs_sinking" (but
>>>>>>>>>> now there's some "b_leftovers" that were out of the intersection)
>>>>>>>>>>  5)  Sinks
>>>>>>>>>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as
>>>>>>>>>> unprocessed (not persisted)
>>>>>>>>>>  6) However,
>>>>>>>>>> those "b_leftovers" would, ultimately, be processed at some point 
>>>>>>>>>> (even if
>>>>>>>>>> it takes like 1-3 days) - when their corresponding transaction_id are
>>>>>>>>>>  pushed
>>>>>>>>>> to the "trx-created-topic" Kafka topic, and are then processed by 
>>>>>>>>>> that
>>>>>>>>>> first consumer
>>>>>>>>>>
>>>>>>>>>> So, what I'm trying to accomplish is find a way to reprocess
>>>>>>>>>> those "b_leftovers" *without *having to restart the app
>>>>>>>>>> Does that make sense?
>>>>>>>>>>
>>>>>>>>>> PS: It doesn't necessarily have to be real streaming, if
>>>>>>>>>> micro-batching (legacy Spark Streaming) would allow such a thing, it 
>>>>>>>>>> would
>>>>>>>>>> technically work (although I keep hearing it's not advisable)
>>>>>>>>>>
>>>>>>>>>> Thank you so much!
>>>>>>>>>>
>>>>>>>>>> Kind regards
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh <
>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Can you please clarify if you are reading these two topics
>>>>>>>>>>> separately or within the same scala or python script in Spark 
>>>>>>>>>>> Structured
>>>>>>>>>>> Streaming?
>>>>>>>>>>>
>>>>>>>>>>> HTH
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>view my Linkedin profile
>>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any 
>>>>>>>>>>> other
>>>>>>>>>>> property which may arise from relying on this email's technical 
>>>>>>>>>>> content is
>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <
>>>>>>>>>>> bruno.ar...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello guys,
>>>>>>>>>>>>
>>>>>>>>>>>> I've been struggling with this for some days now, without
>>>>>>>>>>>> success, so I would highly appreciate any enlightenment. The 
>>>>>>>>>>>> simplified
>>>>>>>>>>>> scenario is the following:
>>>>>>>>>>>>
>>>>>>>>>>>>- I've got 2 topics in Kafka (it's already like that in
>>>>>>>>>>>>production, can't change it)
>>>>>>>>>>>>   - transactions-created,
>>>>>>>>>>>>   - transaction-processed
>>>>>>>>>>>>- Even though the schema is not exactly the same, they all
>>>>>>>>>>>>share a correlation_id, which is their "transaction_id"
>>>>>>>>>>>>
>>>>>>>>>>>> So, long story short, I've got 2 consumers, one for each topic,
>>>>>>>>>>>> and all I wanna do is sink them in a chain order. I'm writing them 
>>>>>>>>>>>> w/ Spark
>>>>>>>>>>>> Structured Streaming, btw
>>>>>>>>>>>>
>>>>>>>>>>>> So far so good, the caveat here is:
>>>>>>>>>>>>
>>>>>>>>>>>> - I cannot write a given "*processed" *transaction unless
>>>>>>>>>>>> there is an entry of that same transaction with the status "
>>>>>>>>>>>> *created*".
>>>>>>>>>>>>
>>>>>>>>>>>> - There is *no* guarantee that any transactions in the topic
>>>>>>>>>>>> "transaction-*processed*" have a match (same transaction_id)
>>>>>>>>>>>> in the "transaction-*created*" at the moment the messages are
>>>>>>>>>>>> fetched.
>>>>>>>>>>>>
>>>>>>>>>>>> So the workflow so far is:
>>>>>>>>>>>> - Msgs from the "transaction-created" just get synced to
>>>>>>>>>>>> postgres, no questions asked
>>>>>>>>>>>>
>>>>>>>>>>>> - As for the "transaction-processed", it goes as follows:
>>>>>>>>>>>>
>>>>>>>>>>>>- a) Messages are fetched from the Kafka topic
>>>>>>>>>>>>- b) Select the transaction_id of those...
>>>>>>>>>>>>- c) Fetch all the rows w/ the corresponding id from a
>>>>>>>>>>>>Postgres table AND that have the status "CREATED"
>>>>>>>>>>>>- d) Then, a pretty much do a intersection between the two
>>>>>>>>>>>>datasets, and sink only on "processed" ones that have with step 
>>>>>>>>>>>> c
>>>>>>>>>>>>- e) Persist the resulting dataset
>>>>>>>>>>>>
>>>>>>>>>>>> But the rows (from the 'processed') that were not part of the
>>>>>>>>>>>> intersection get lost afterwards...
>>>>>>>>>>>>
>>>>>>>>>>>> So my question is:
>>>>>>>>>>>> - Is there ANY way to reprocess/replay them at all WITHOUT
>>>>>>>>>>>> restarting the app?
>>>>>>>>>>>> - For this scenario, should I fall back to Spark Streaming,
>>>>>>>>>>>> instead of Structured Streaming?
>>>>>>>>>>>>
>>>>>>>>>>>> PS: I was playing around with Spark Streaming (legacy) and
>>>>>>>>>>>> managed to commit only the ones in the microbatches that were fully
>>>>>>>>>>>> successful (still failed to find a way to "poll" for the 
>>>>>>>>>>>> uncommitted ones
>>>>>>>>>>>> without restarting, though).
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you very much in advance!
>>>>>>>>>>>>
>>>>>>>>>>>> --
Best Regards,
Ayan Guha


Re: Insert into table with one the value is derived from DB function using spark

2021-06-19 Thread ayan guha
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 18 Jun 2021 at 20:49, Anshul Kala  wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am using spark to ingest data from file to database Oracle table .
>>>> For one of the fields , the value to be populated is generated from a
>>>> function that is written in database .
>>>>
>>>> The input to the function is one of the fields of data frame
>>>>
>>>> I wanted to use spark.dbc.write to perform the operation, which
>>>> generates the insert query at back end .
>>>>
>>>> For example : It can generate the insert query as :
>>>>
>>>> Insert into table values (?,?, getfunctionvalue(?) )
>>>>
>>>> Please advise if it is possible in spark and if yes , how can it be
>>>> done
>>>>
>>>> This is little urgent for me . So any help is appreciated
>>>>
>>>> Thanks
>>>> Anshul
>>>>
>>> --
Best Regards,
Ayan Guha


Re: Spark-sql can replace Hive ?

2021-06-10 Thread ayan guha
Would you mind expanding the ask? Spark Sql can use hive by itaelf

On Thu, 10 Jun 2021 at 8:58 pm, Battula, Brahma Reddy
 wrote:

> Hi
>
>
>
> Would like know any refences/docs to replace hive with spark-sql
> completely like how migrate the existing data in hive.?
>
>
>
> thanks
>
>
>
>
>
-- 
Best Regards,
Ayan Guha


Re: DF blank value fill

2021-05-21 Thread ayan guha
Hi

You can do something like this:

SELECT MainKey, Subkey,
  case when val1 is null then newval1 else val1 end val1,
  case when val2 is null then newval2 else val1 end val2,
  case when val3 is null then newval3 else val1 end val3
 from (select mainkey,subkey, val1,val2, val3,
 first_value() over (partitionby mainkey, subkey order
by val1 nulls last) newval1,
 first_value() over (partitionby mainkey, subkey order
by val2 nulls last) newval2,
 first_value() over (partitionby mainkey, subkey order
by val3 nulls last) newval3
from table) x

On Fri, May 21, 2021 at 9:29 PM Bode, Meikel, NMA-CFD <
meikel.b...@bertelsmann.de> wrote:

> Hi all,
>
>
>
> My df looks like follows:
>
>
>
> Situation:
>
> MainKey, SubKey, Val1, Val2, Val3, …
>
> 1, 2, a, null, c
>
> 1, 2, null, null, c
>
> 1, 3, null, b, null
>
> 1, 3, a, null, c
>
>
>
>
>
> Desired outcome:
>
> 1, 2, a, b, c
>
> 1, 2, a, b, c
>
> 1, 3, a, b, c
>
> 1, 3, a, b, c
>
>
>
>
>
> How could I populate/synchronize empty cells of all records with the same
> combination of MainKey and SubKey with the respective value of other rows
> with the same key combination?
>
> A certain value, if not null, of a col is guaranteed to be unique within
> the df. If a col exists then there is at least one row with a not-null
> value.
>
>
>
> I am using pyspark.
>
>
>
> Thanks for any hint,
>
> Best
>
> Meikel
>


-- 
Best Regards,
Ayan Guha


Re: PySpark Write File Container exited with a non-zero exit code 143

2021-05-19 Thread ayan guha
Hi -- Notice the additional "y" in red (as Mich mentioned)

pyspark --conf queue=default --conf executory-memory=24G

On Thu, May 20, 2021 at 12:02 PM Clay McDonald <
stuart.mcdon...@bateswhite.com> wrote:

> How so?
>
>
>
> *From:* Mich Talebzadeh 
> *Sent:* Wednesday, May 19, 2021 5:45 PM
> *To:* Clay McDonald 
> *Cc:* user@spark.apache.org
> *Subject:* Re: PySpark Write File Container exited with a non-zero exit
> code 143
>
>
>
> *  *** EXTERNAL EMAIL ***   *
>
>
>
>
>
> Hi Clay,
>
>
>
> Those parameters you are passing are not valid
>
>
>
> pyspark --conf queue=default --conf executory-memory=24G
>
>
>
> Python 3.7.3 (default, Apr  3 2021, 20:42:31)
>
> [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
>
> Type "help", "copyright", "credits" or "license" for more information.
>
> Warning: Ignoring non-Spark config property: executory-memory
>
> Warning: Ignoring non-Spark config property: queue
>
> 2021-05-19 22:28:20,521 WARN util.NativeCodeLoader: Unable to load
> native-hadoop library for your platform... using builtin-java classes where
> applicable
>
> Setting default log level to "WARN".
>
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
>
> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/__ / .__/\_,_/_/ /_/\_\   version 3.1.1
>
>   /_/
>
>
>
> Using Python version 3.7.3 (default, Apr  3 2021 20:42:31)
>
> Spark context Web UI available at http://rhes75:4040
>
> Spark context available as 'sc' (master = local[*], app id =
> local-1621459701490).
>
> SparkSession available as 'spark'.
>
>
>
> Also
>
>
>
> pyspark dynamic_ARRAY_generator_parquet.py
>
>
>
> Running python applications through 'pyspark' is not supported as of Spark
> 2.0.
>
> Use ./bin/spark-submit 
>
>
>
>
>
> This works
>
>
>
> $SPARK_HOME/bin/spark-submit --master local[4]
> dynamic_ARRAY_generator_parquet.py
>
>
>
>
>
> See
>
>
>
>  https://spark.apache.org/docs/latest/submitting-applications.html
>
>
>
> HTH
>
>
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Wed, 19 May 2021 at 20:10, Clay McDonald <
> stuart.mcdon...@bateswhite.com> wrote:
>
> Hello all,
>
>
>
> I’m hoping someone can give me some direction for troubleshooting this
> issue, I’m trying to write from Spark on an HortonWorks(Cloudera) HDP
> cluster. I ssh directly to the first datanode and run PySpark with the
> following command; however, it is always failing no matter what size I set
> memory in Yarn Containers and Yarn Queues. Any suggestions?
>
>
>
>
>
>
>
> pyspark --conf queue=default --conf executory-memory=24G
>
>
>
> --
>
>
>
> HDFS_RAW="/HDFS/Data/Test/Original/MyData_data/"
>
> #HDFS_OUT="/ HDFS/Data/Test/Processed/Convert_parquet/Output"
>
> HDFS_OUT="/tmp"
>
> ENCODING="utf-16"
>
>
>
> fileList1=[
>
> 'Test _2003.txt'
>
> ]
>
> from  pyspark.sql.functions import regexp_replace,col
>
> for f in fileList1:
>
> fname=f
>
> fname_noext=fname.split('.')[0]
>
> df =
> spark.read.option("delimiter","|").option("encoding",ENCODING).option("multiLine",True).option('wholeFile',"true").csv('{}/{}'.format(HDFS_RAW,fname),
> header=True)
>
> lastcol=df.columns[-1]
>
> print('showing {}'.format(fname))
>
> if ('\r' in lastcol):
>
> lastcol=lastcol.replace('\r','')
>
> df=df.withColumn(lastcol,
> regexp_replace(col("{}\r".format(lastcol)), "[\r]",
> "")).drop('{}\r'.format(lastcol))
>
>
> df.write.format('parquet').mode('overwrite').save("{}/{}".format(HDFS_OUT,fname_noext))
>
>
>
>
>
>
>
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task
> 0.3 in stage 1.0 (TID 4, DataNode01.mydomain.com, executor 5):
> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
> Reason: Container marked as failed:
> container_e331_1621375512548_0021_01_06 on host:
> DataNode01.mydomain.com. Exit status: 143. Diagnostics: [2021-05-19
> 18:09:06.392]Container killed on request. Exit code is 143
> [2021-05-19 18:09:06.413]Container exited with a non-zero exit code 143.
> [2021-05-19 18:09:06.414]Killed by external signal
>
>
>
>
>
> THANKS! CLAY
>
>
>
>

-- 
Best Regards,
Ayan Guha


Re: Merge two dataframes

2021-05-19 Thread ayan guha
Hi Kushagra

 I still think this is a bad idea. By definition data in a dataframe or rdd
is unordered, you are imposing an order where there is none, and if it
works it will be by chance. For example a simple repartition may disrupt
the row ordering. It is just too unpredictable.

I would suggest you fix upstream and add correct identifier to each of the
streams. It will for sure a much better solution.

On Wed, 19 May 2021 at 7:21 pm, Mich Talebzadeh 
wrote:

> That generation of row_number() has to be performed through a window call
> and I don't think there is any way around it without orderBy()
>
> df1 =
> df1.select(F.row_number().over(Window.partitionBy().orderBy(df1['amount_6m'])).alias("row_num"),"amount_6m")
>
> The problem is that without partitionBy() clause data will be skewed
> towards one executor.
>
> WARN window.WindowExec: No Partition Defined for Window operation! Moving
> all data to a single partition, this can cause serious performance
> degradation.
>
> Cheers
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 12 May 2021 at 17:33, Andrew Melo  wrote:
>
>> Hi,
>>
>> In the case where the left and right hand side share a common parent like:
>>
>> df = spark.read.someDataframe().withColumn('rownum', row_number())
>> df1 = df.withColumn('c1', expensive_udf1('foo')).select('c1', 'rownum')
>> df2 = df.withColumn('c2', expensive_udf2('bar')).select('c2', 'rownum')
>> df_joined = df1.join(df2, 'rownum', 'inner')
>>
>> (or maybe replacing row_number() with monotonically_increasing_id())
>>
>> Is there some hint/optimization that can be done to let Spark know
>> that the left and right hand-sides of the join share the same
>> ordering, and a sort/hash merge doesn't need to be done?
>>
>> Thanks
>> Andrew
>>
>> On Wed, May 12, 2021 at 11:07 AM Sean Owen  wrote:
>> >
>> > Yeah I don't think that's going to work - you aren't guaranteed to get
>> 1, 2, 3, etc. I think row_number() might be what you need to generate a
>> join ID.
>> >
>> > RDD has a .zip method, but (unless I'm forgetting!) DataFrame does not.
>> You could .zip two RDDs you get from DataFrames and manually convert the
>> Rows back to a single Row and back to DataFrame.
>> >
>> >
>> > On Wed, May 12, 2021 at 10:47 AM kushagra deep <
>> kushagra94d...@gmail.com> wrote:
>> >>
>> >> Thanks Raghvendra
>> >>
>> >> Will the ids for corresponding columns  be same always ? Since
>> monotonic_increasing_id() returns a number based on partitionId and the row
>> number of the partition  ,will it be same for corresponding columns? Also
>> is it guaranteed that the two dataframes will be divided into logical spark
>> partitions with the same cardinality for each partition ?
>> >>
>> >> Reg,
>> >> Kushagra Deep
>> >>
>> >> On Wed, May 12, 2021, 21:00 Raghavendra Ganesh <
>> raghavendr...@gmail.com> wrote:
>> >>>
>> >>> You can add an extra id column and perform an inner join.
>> >>>
>> >>> val df1_with_id = df1.withColumn("id", monotonically_increasing_id())
>> >>>
>> >>> val df2_with_id = df2.withColumn("id", monotonically_increasing_id())
>> >>>
>> >>> df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show()
>> >>>
>> >>> +-+-+
>> >>>
>> >>> |amount_6m|amount_9m|
>> >>>
>> >>> +-+-+
>> >>>
>> >>> |  100|  500|
>> >>>
>> >>> |  200|  600|
>> >>>
>> >>> |  300|  700|
>> >>>
>> >>> |  400|  800|
>> >>>
>> >>> |  500|  900|
>> >>>
>> >>> +-+-+
>> >>>
>> >>>
>> >>> --
>> >>> Raghavendra
>
>
>> >>>
>> >>>
>> >>> On Wed, May 12, 2021 at 6:20 PM kushagra deep <
>> kushagra94d...@gmail.com> wrote:
>> >>>>
>> >>>> Hi All,
>> >>>>
>> >>>> I have two dataframes
>> >>>>
>> >>>> df1
>> >>>>
>> >>>> amount_6m
>> >>>>  100
>> >>>>  200
>> >>>>  300
>> >>>>  400
>> >>>>  500
>> >>>>
>> >>>> And a second data df2 below
>> >>>>
>> >>>>  amount_9m
>> >>>>   500
>> >>>>   600
>> >>>>   700
>> >>>>   800
>> >>>>   900
>> >>>>
>> >>>> The number of rows is same in both dataframes.
>> >>>>
>> >>>> Can I merge the two dataframes to achieve below df
>> >>>>
>> >>>> df3
>> >>>>
>> >>>> amount_6m | amount_9m
>> >>>> 100   500
>> >>>>  200  600
>> >>>>  300  700
>> >>>>  400  800
>> >>>>  500  900
>> >>>>
>> >>>> Thanks in advance
>> >>>>
>> >>>> Reg,
>> >>>> Kushagra Deep
>> >>>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
Best Regards,
Ayan Guha


Re: [Spark Catalog API] Support for metadata Backup/Restore

2021-05-07 Thread ayan guha
Just a consideration:

Is there a value in backup/restore metadata within spark? I would strongly
argue if the metadata is valuable enough and persistent enough, why dont
just use external metastore? It is fairly straightforward process. Also
regardless you are in cloud or not, database bkp is a routine and
established pattern in most organizations.
You can also enhance HA and DR by having replicas across zones and regions
etc etc

Thoughts?




On Sat, 8 May 2021 at 7:02 am, Tianchen Zhang 
wrote:

> For now we are thinking about adding two methods in Catalog API, not SQL
> commands:
> 1. spark.catalog.backup, which backs up the current catalog.
> 2. spark.catalog.restore(file), which reads the DFS file and recreates the
> entities described in that file.
>
> Can you please give an example of exposing client APIs to the end users in
> this approach? The users can only call backup or restore, right?
>
> Thanks,
> Tianchen
>
> On Fri, May 7, 2021 at 12:27 PM Wenchen Fan  wrote:
>
>> If a catalog implements backup/restore, it can easily expose some client
>> APIs to the end-users (e.g. REST API), I don't see a strong reason to
>> expose the APIs to Spark. Do you plan to add new SQL commands in Spark to
>> backup/restore a catalog?
>>
>> On Tue, May 4, 2021 at 2:39 AM Tianchen Zhang 
>> wrote:
>>
>>> Hi all,
>>>
>>> Currently the user-facing Catalog API doesn't support backup/restore
>>> metadata. Our customers are asking for such functionalities. Here is a
>>> usage example:
>>> 1. Read all metadata of one Spark cluster
>>> 2. Save them into a Parquet file on DFS
>>> 3. Read the Parquet file and restore all metadata in another Spark
>>> cluster
>>>
>>> From the current implementation, Catalog API has the list methods
>>> (listDatabases, listFunctions, etc.) but they don't return enough
>>> information in order to restore an entity (for example, listDatabases lose
>>> "properties" of the database and we need "describe database extended" to
>>> get them). And it only supports createTable (not any other entity
>>> creations). The only way we can backup/restore an entity is using Spark SQL.
>>>
>>> We want to introduce the backup and restore from an API level. We are
>>> thinking of doing this simply by adding backup() and restore() in
>>> CatalogImpl, as ExternalCatalog already includes all the methods we need to
>>> retrieve and recreate entities. We are wondering if there is any concern or
>>> drawback of this approach. Please advise.
>>>
>>> Thank you in advance,
>>> Tianchen
>>>
>> --
Best Regards,
Ayan Guha


Re: Graceful shutdown SPARK Structured Streaming

2021-05-06 Thread ayan guha
testream add the following line to be able to identify topic
>> name
>>
>> trigger(processingTime='30 seconds'). \
>> *queryName('md'). *\
>>
>> Next the controlling topic (called newtopic)  has the following
>>
>> foreachBatch(*sendToControl*). \
>> trigger(processingTime='2 seconds'). \
>> queryName('newtopic'). \
>>
>> That method sendToControl does what is needed
>>
>> def sendToControl(dfnewtopic, batchId):
>> if(len(dfnewtopic.take(1))) > 0:
>> #print(f"""newtopic batchId is {batchId}""")
>> #dfnewtopic.show(10,False)
>> queue = dfnewtopic.select(col("queue")).collect()[0][0]
>> status = dfnewtopic.select(col("status")).collect()[0][0]
>>
>> if((queue == 'md')) & (status == 'false')):
>>   spark_session = s.spark_session(config['common']['appName'])
>>   active = spark_session.streams.active
>>   for e in active:
>>  #print(e)
>>  name = e.name
>>  if(name == 'md'):
>> print(f"""Terminating streaming process {name}""")
>> e.stop()
>> else:
>> print("DataFrame newtopic is empty")
>>
>> This seems to work as I checked it to ensure that in this case data was
>> written and saved to the target sink (BigQuery table). It will wait until
>> data is written completely meaning the current streaming message is
>> processed and there is a latency there (meaning waiting for graceful
>> completion)
>>
>> This is the output
>>
>> Terminating streaming process md
>> wrote to DB  ## this is the flag  I added to ensure the current
>> micro-bath was completed
>> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md [id
>> = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
>> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>>
>> The various termination processes are described in
>>
>> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
>> (apache.org)
>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>
>>
>> This is the idea I came up with which allows ending the streaming process
>> with least cost.
>>
>> HTH
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
>> gourav.sengupta.develo...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> just thought of reaching out once again and seeking out your kind help
>>> to find out what is the best way to stop SPARK streaming gracefully. Do we
>>> still use the methods of creating a file as in SPARK 2.4.x which is several
>>> years old method or do we have a better approach in SPARK 3.1?
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> -- Forwarded message -
>>> From: Gourav Sengupta 
>>> Date: Wed, Apr 21, 2021 at 10:06 AM
>>> Subject: Graceful shutdown SPARK Structured Streaming
>>> To: 
>>>
>>>
>>> Dear friends,
>>>
>>> is there any documentation available for gracefully stopping SPARK
>>> Structured Streaming in 3.1.x?
>>>
>>> I am referring to articles which are 4 to 5 years old and was wondering
>>> whether there is a better way available today to gracefully shutdown a
>>> SPARK streaming job.
>>>
>>> Thanks a ton in advance for all your kind help.
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>> --
Best Regards,
Ayan Guha


Spark Streaming with Files

2021-04-23 Thread ayan guha
Hi

In one of the spark summit demo, it is been alluded that we should think
batch jobs in streaming pattern, using "run once" in a schedule.
I find this idea very interesting and I understand how this can be achieved
for sources like kafka, kinesis or similar. in fact we have implemented
this model for cosmos changefeed.

My question is: can this model extend to file based sources? I understand
it can be for append only file  streams. The use case I have is: A CDC tool
like aws dms or shareplex or similar writing changes to a stream of files,
in date based folders. So it just goes on like T1, T2 etc folders. Also,
lets assume files are written every 10 mins, but I want to process them
every 4 hours.
Can I use streaming method so that it can manage checkpoints on its own?

Best - Ayan
-- 
Best Regards,
Ayan Guha


Re: Python level of knowledge for Spark and PySpark

2021-04-14 Thread ayan guha
The answer always is "it depends". At the outset it seems you are in pretty
good shape and have all the key skills you need. All I can suggest is try
to take inherent benefits of the language and hone your coding practices


On Thu, 15 Apr 2021 at 2:25 am, ashok34...@yahoo.com.INVALID
 wrote:

> Hi gurus,
>
> I have knowledge of Java, Scala and good enough knowledge of Spark, Spark
> SQL and Spark Functional programing with Scala.
>
> I have started using Python with Spark PySpark.
>
> Wondering, in order to be proficient in PySpark, how much good knowledge
> of Python programing is needed? I know the answer may be very good
> knowledge, but in practice how much is good enough. I can write Python in
> IDE like PyCharm similar to the way Scala works and can run the programs.
> Does expert knowledge of Python is prerequisite for PySpark? I also know
> Pandas and am also familiar with plotting routines like matplotlib.
>
> Warmest
>
> Ashok
>
-- 
Best Regards,
Ayan Guha


Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread ayan guha
Hi - interesting stuff. My stand always was to use spark native functions,
pandas and python native - in this order.

To OP - did you try the code? What kind of perf are you seeing? Just
curious, why do you think UDFs are bad?

On Sat, 10 Apr 2021 at 2:36 am, Sean Owen  wrote:

> Actually, good question, I'm not sure. I don't think that Spark would
> vectorize these operations over rows.
> Whereas in a pandas UDF, given a DataFrame, you can apply operations like
> sin to 1000s of values at once in native code via numpy. It's trivially
> 'vectorizable' and I've seen good wins over, at least, a single-row UDF.
>
> On Fri, Apr 9, 2021 at 9:14 AM ayan guha  wrote:
>
>> Hi Sean - absolutely open to suggestions.
>>
>> My impression was using spark native functions should provide similar
>> perf as scala ones because serialization penalty should not be there,
>> unlike native python udfs.
>>
>> Is it wrong understanding?
>>
>>
>>
>> On Fri, 9 Apr 2021 at 10:55 pm, Rao Bandaru  wrote:
>>
>>> Hi All,
>>>
>>>
>>> yes ,i need to add the below scenario based code to the executing spark
>>> job,while executing this it took lot of time to complete,please suggest
>>> best way to get below requirement without using UDF
>>>
>>>
>>> Thanks,
>>>
>>> Ankamma Rao B
>>> --
>>> *From:* Sean Owen 
>>> *Sent:* Friday, April 9, 2021 6:11 PM
>>> *To:* ayan guha 
>>> *Cc:* Rao Bandaru ; User 
>>> *Subject:* Re: [Spark SQL]:to calculate distance between four
>>> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk
>>> dataframe
>>>
>>> This can be significantly faster with a pandas UDF, note, because you
>>> can vectorize the operations.
>>>
>>> On Fri, Apr 9, 2021, 7:32 AM ayan guha  wrote:
>>>
>>> Hi
>>>
>>> We are using a haversine distance function for this, and wrapping it in
>>> udf.
>>>
>>> from pyspark.sql.functions import acos, cos, sin, lit, toRadians, udf
>>> from pyspark.sql.types import *
>>>
>>> def haversine_distance(long_x, lat_x, long_y, lat_y):
>>> return acos(
>>> sin(toRadians(lat_x)) * sin(toRadians(lat_y)) +
>>> cos(toRadians(lat_x)) * cos(toRadians(lat_y)) *
>>> cos(toRadians(long_x) - toRadians(long_y))
>>> ) * lit(6371.0)
>>>
>>> distudf = udf(haversine_distance, FloatType())
>>>
>>> in case you just want to use just Spark SQL, you can still utilize the
>>> functions shown above to implement in SQL.
>>>
>>> Any reason you do not want to use UDF?
>>>
>>> Credit
>>> <https://stackoverflow.com/questions/38994903/how-to-sum-distances-between-data-points-in-a-dataset-using-pyspark>
>>>
>>>
>>> On Fri, Apr 9, 2021 at 10:19 PM Rao Bandaru 
>>> wrote:
>>>
>>> Hi All,
>>>
>>>
>>>
>>> I have a requirement to calculate distance between four
>>> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the *pysaprk
>>> dataframe *with the help of from *geopy* import *distance *without
>>> using *UDF* (user defined function)*,*Please help how to achieve this
>>> scenario and do the needful.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Ankamma Rao B
>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>> --
>> Best Regards,
>> Ayan Guha
>>
> --
Best Regards,
Ayan Guha


Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread ayan guha
Hi Sean - absolutely open to suggestions.

My impression was using spark native functions should provide similar perf
as scala ones because serialization penalty should not be there, unlike
native python udfs.

Is it wrong understanding?



On Fri, 9 Apr 2021 at 10:55 pm, Rao Bandaru  wrote:

> Hi All,
>
>
> yes ,i need to add the below scenario based code to the executing spark
> job,while executing this it took lot of time to complete,please suggest
> best way to get below requirement without using UDF
>
>
> Thanks,
>
> Ankamma Rao B
> --
> *From:* Sean Owen 
> *Sent:* Friday, April 9, 2021 6:11 PM
> *To:* ayan guha 
> *Cc:* Rao Bandaru ; User 
> *Subject:* Re: [Spark SQL]:to calculate distance between four
> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk
> dataframe
>
> This can be significantly faster with a pandas UDF, note, because you can
> vectorize the operations.
>
> On Fri, Apr 9, 2021, 7:32 AM ayan guha  wrote:
>
> Hi
>
> We are using a haversine distance function for this, and wrapping it in
> udf.
>
> from pyspark.sql.functions import acos, cos, sin, lit, toRadians, udf
> from pyspark.sql.types import *
>
> def haversine_distance(long_x, lat_x, long_y, lat_y):
> return acos(
> sin(toRadians(lat_x)) * sin(toRadians(lat_y)) +
> cos(toRadians(lat_x)) * cos(toRadians(lat_y)) *
> cos(toRadians(long_x) - toRadians(long_y))
> ) * lit(6371.0)
>
> distudf = udf(haversine_distance, FloatType())
>
> in case you just want to use just Spark SQL, you can still utilize the
> functions shown above to implement in SQL.
>
> Any reason you do not want to use UDF?
>
> Credit
> <https://stackoverflow.com/questions/38994903/how-to-sum-distances-between-data-points-in-a-dataset-using-pyspark>
>
>
> On Fri, Apr 9, 2021 at 10:19 PM Rao Bandaru  wrote:
>
> Hi All,
>
>
>
> I have a requirement to calculate distance between four
> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the *pysaprk
> dataframe *with the help of from *geopy* import *distance *without using
> *UDF* (user defined function)*,*Please help how to achieve this scenario
> and do the needful.
>
>
>
> Thanks,
>
> Ankamma Rao B
>
>
>
> --
> Best Regards,
> Ayan Guha
>
> --
Best Regards,
Ayan Guha


Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread ayan guha
Hi

We are using a haversine distance function for this, and wrapping it in
udf.

from pyspark.sql.functions import acos, cos, sin, lit, toRadians, udf
from pyspark.sql.types import *

def haversine_distance(long_x, lat_x, long_y, lat_y):
return acos(
sin(toRadians(lat_x)) * sin(toRadians(lat_y)) +
cos(toRadians(lat_x)) * cos(toRadians(lat_y)) *
cos(toRadians(long_x) - toRadians(long_y))
) * lit(6371.0)

distudf = udf(haversine_distance, FloatType())

in case you just want to use just Spark SQL, you can still utilize the
functions shown above to implement in SQL.

Any reason you do not want to use UDF?

Credit
<https://stackoverflow.com/questions/38994903/how-to-sum-distances-between-data-points-in-a-dataset-using-pyspark>


On Fri, Apr 9, 2021 at 10:19 PM Rao Bandaru  wrote:

> Hi All,
>
>
>
> I have a requirement to calculate distance between four
> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the *pysaprk
> dataframe *with the help of from *geopy* import *distance *without using
> *UDF* (user defined function)*,*Please help how to achieve this scenario
> and do the needful.
>
>
>
> Thanks,
>
> Ankamma Rao B
>


-- 
Best Regards,
Ayan Guha


Re: Rdd - zip with index

2021-03-24 Thread ayan guha
;> Category (1),Country Incorporated (1),Proprietor (1) Address (1),Proprietor
>>>> (1) Address (2),Proprietor (1) Address (3),Proprietor Name (2),Company
>>>> Registration No. (2),Proprietorship Category (2),Country Incorporated
>>>> (2),Proprietor (2) Address (1),Proprietor (2) Address (2),Proprietor (2)
>>>> Address (3),Proprietor Name (3),Company Registration No. (3),Proprietorship
>>>> Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor
>>>> (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company
>>>> Registration No. (4),Proprietorship Category (4),Country Incorporated
>>>> (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4)
>>>> Address (3),Date Proprietor Added,Additional Proprietor Indicator
>>>>
>>>>
>>>> 10GB is not much of a big CSV file
>>>>
>>>> that will resolve the header anyway.
>>>>
>>>>
>>>> Also how are you running the spark, in a local mode (single jvm) or
>>>> other distributed modes (yarn, standalone) ?
>>>>
>>>>
>>>> HTH
>>>>
>>>

-- 
Best Regards,
Ayan Guha


Re: Rdd - zip with index

2021-03-23 Thread ayan guha
Best case is use dataframe and df.columns will automatically give you
column names. Are you sure your file is indeed in csv? maybe it is easier
if you share the code?

On Wed, 24 Mar 2021 at 2:12 pm, Sean Owen  wrote:

> It would split 10GB of CSV into multiple partitions by default, unless
> it's gzipped. Something else is going on here.
>
> ‪On Tue, Mar 23, 2021 at 10:04 PM ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎
>  wrote:‬
>
>> I’m not Spark core developer and do not want to confuse you but it seems
>> logical to me that just reading from single file (no matter what format of
>> the file is used) gives no parallelism unless you do repartition by some
>> column just after csv load, but the if you’re telling you’ve already tried
>> repartition with no luck...
>>
>>
>> > On 24 Mar 2021, at 03:47, KhajaAsmath Mohammed 
>> wrote:
>> >
>> > So spark by default doesn’t split the large 10gb file when loaded?
>> >
>> > Sent from my iPhone
>> >
>> >> On Mar 23, 2021, at 8:44 PM, Yuri Oleynikov (‫יורי אולייניקוב‬‎) <
>> yur...@gmail.com> wrote:
>> >>
>> >> Hi, Mohammed
>> >> I think that the reason that only one executor is running and have
>> single partition is because you have single file that might be read/loaded
>> into memory.
>> >>
>> >> In order to achieve better parallelism I’d suggest to split the csv
>> file.
>> >>
>>
>> --
Best Regards,
Ayan Guha


Re: How to apply ranger policies on Spark

2020-11-24 Thread ayan guha
AFAIK, Ranger secures Hive (JDBC) server only. Unfortunately Spark does not
interact with HS2, but directly interacts with Metastore. Hence, the only
way to use Ranger policies if you use Hive via JDBC. Another option is HDFS
or Storage ACLs, which are coarse grain control over file path etc. You can
use Ranger to manage HDFS ACLs as well. In such scenario spark will be
bound by those policies.

On Tue, Nov 24, 2020 at 5:26 PM Dennis Suhari 
wrote:

> Hi Joyan,
>
> Spark uses its own metastore. Using Ranger you need to use the Hive
> Metastore. For this you need to point to Hive Metastore and use HiveContext
> in your Spark Code.
>
> Br,
>
> Dennis
>
> Von meinem iPhone gesendet
>
> Am 23.11.2020 um 19:04 schrieb joyan sil :
>
> 
>
> Hi,
>
> We have ranger policies defined on the hive table and authorization works
> as expected when we use hive cli and beeline. But when we access those hive
> tables using spark-shell or spark-submit it does not work.
>
>  Any suggestions to make Ranger work with Spark?
>
>
> Regards
>
> Joyan
>
>

-- 
Best Regards,
Ayan Guha


Re: Apache Spark Connector for SQL Server and Azure SQL

2020-10-26 Thread ayan guha
I would suggest to ask microsoft and databricks, this forum is for apache
spark.

if you are interested please drop me a note separately as I m keen to
understand the issue as we use same setup

Ayan

On Mon, 26 Oct 2020 at 11:53 pm,  wrote:

> Hi,
>
>
>
> In a project where I work with Databricks, we use this connector to read /
> write data to Azure SQL Database. Currently with Spark 2.4.5 and Scala 2.11.
>
>
>
> But those setups are getting old. What happens if we update Spark to 3.0.1
> or higher and Scala 2.12.
>
> This connector does not work according to the versions it supports. What
> should we do? Don't use the connector or is there another way to work?
>
>
>
> I appreciate any type of information that helps me.
>
>
>
> Med vänlig hälsning / Best regards
>
> *Alejandra Lemmo*
>
> *Data Engineer *
> Customer Analytic
>
> Address: Evenemangsgatan 13, 169 56 Solna
> <https://www.google.com/maps/search/Evenemangsgatan+13,+169+56+Solna?entry=gmail=g>,
> 16956 Solna
>
> D +46735249832
>
> M +46735249832
>
>
> alejandra.le...@vattenfall.com
> www.vattenfall.se
>
> Please consider the environment before printing this e-mail
>
> Confidentiality: C2 - Internal
>
-- 
Best Regards,
Ayan Guha


Re: Why spark-submit works with package not with jar

2020-10-20 Thread ayan guha
Hi

One way to think of this is --packages is better when you have third party
dependency and --jars is better when you have custom in-house built jars.

On Wed, 21 Oct 2020 at 3:44 am, Mich Talebzadeh 
wrote:

> Thanks Sean and Russell. Much appreciated.
>
> Just to clarify recently I had issues with different versions of Google
> Guava jar files in building Uber jar file (to evict the unwanted ones).
> These used to work a year and half ago using Google Dataproc compute
> engines (comes with Spark preloaded) and I could create an Uber jar file.
>
> Unfortunately this has become problematic now so tried to use spark-submit
> instead as follows:
>
> ${SPARK_HOME}/bin/spark-submit \
> --master yarn \
> --deploy-mode client \
> --conf spark.executor.memoryOverhead=3000 \
> --class org.apache.spark.repl.Main \
> --name "Spark shell on Yarn" "$@"
> --driver-class-path /home/hduser/jars/ddhybrid.jar \
> --jars /home/hduser/jars/spark-bigquery-latest.jar, \
>/home/hduser/jars/ddhybrid.jar \
> --packages com.github.samelamin:spark-bigquery_2.11:0.2.6
>
> Effectively tailored spark-shell. However, I do not think there is a
> mechanism to resolve jar conflicts without  building an Uber jar file
> through SBT?
>
> Cheers
>
>
>
> On Tue, 20 Oct 2020 at 16:54, Russell Spitzer 
> wrote:
>
>> --jar Adds only that jar
>> --package adds the Jar and a it's dependencies listed in maven
>>
>> On Tue, Oct 20, 2020 at 10:50 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a scenario that I use in Spark submit as follows:
>>>
>>> spark-submit --driver-class-path /home/hduser/jars/ddhybrid.jar --jars
>>> /home/hduser/jars/spark-bigquery-latest.jar,/home/hduser/jars/ddhybrid.jar,
>>> */home/hduser/jars/spark-bigquery_2.11-0.2.6.jar*
>>>
>>> As you can see the jar files needed are added.
>>>
>>>
>>> This comes back with error message as below
>>>
>>>
>>> Creating model test.weights_MODEL
>>>
>>> java.lang.NoClassDefFoundError:
>>> com/google/api/client/http/HttpRequestInitializer
>>>
>>>   at
>>> com.samelamin.spark.bigquery.BigQuerySQLContext.bq$lzycompute(BigQuerySQLContext.scala:19)
>>>
>>>   at
>>> com.samelamin.spark.bigquery.BigQuerySQLContext.bq(BigQuerySQLContext.scala:19)
>>>
>>>   at
>>> com.samelamin.spark.bigquery.BigQuerySQLContext.runDMLQuery(BigQuerySQLContext.scala:105)
>>>
>>>   ... 76 elided
>>>
>>> Caused by: java.lang.ClassNotFoundException:
>>> com.google.api.client.http.HttpRequestInitializer
>>>
>>>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>
>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>
>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>
>>>
>>>
>>> So there is an issue with finding the class, although the jar file used
>>>
>>>
>>> /home/hduser/jars/spark-bigquery_2.11-0.2.6.jar
>>>
>>> has it.
>>>
>>>
>>> Now if *I remove the above jar file and replace it with the same
>>> version but package* it works!
>>>
>>>
>>> spark-submit --driver-class-path /home/hduser/jars/ddhybrid.jar --jars
>>> /home/hduser/jars/spark-bigquery-latest.jar,/home/hduser/jars/ddhybrid.jar
>>> *-**-packages com.github.samelamin:spark-bigquery_2.11:0.2.6*
>>>
>>>
>>> I have read the write-ups about packages searching the maven
>>> libraries etc. Not convinced why using the package should make so much
>>> difference between a failure and success. In other words, when to use a
>>> package rather than a jar.
>>>
>>>
>>> Any ideas will be appreciated.
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>> --
Best Regards,
Ayan Guha


Re: Count distinct and driver memory

2020-10-19 Thread ayan guha
Do not do collect. This brings results back to driver. instead do count
distinct and write it out.

On Tue, 20 Oct 2020 at 6:43 am, Nicolas Paris 
wrote:

> > I was caching it because I didn't want to re-execute the DAG when I
> > ran the count query. If you have a spark application with multiple
> > actions, Spark reexecutes the entire DAG for each action unless there
> > is a cache in between. I was trying to avoid reloading 1/2 a terabyte
> > of data.  Also, cache should use up executor memory, not driver
> > memory.
> why not counting the parquet file instead? writing/reading a parquet
> files is more efficients than caching in my experience.
> if you really need caching you could choose a better strategy such
> DISK.
>
> Lalwani, Jayesh  writes:
>
> > I was caching it because I didn't want to re-execute the DAG when I ran
> the count query. If you have a spark application with multiple actions,
> Spark reexecutes the entire DAG for each action unless there is a cache in
> between. I was trying to avoid reloading 1/2 a terabyte of data.  Also,
> cache should use up executor memory, not driver memory.
> >
> > As it turns out cache was the problem. I didn't expect cache to take
> Executor memory and spill over to disk. I don't know why it's taking driver
> memory. The input data has millions of partitions which results in millions
> of tasks. Perhaps the high memory usage is a side effect of caching the
> results of lots of tasks.
> >
> > On 10/19/20, 1:27 PM, "Nicolas Paris"  wrote:
> >
> > CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
> >
> >
> >
> > > Before I write the data frame to parquet, I do df.cache. After
> writing
> > > the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
> > if you write the df to parquet, why would you also cache it ?
> caching by
> > default loads the memory. this might affect  later use, such
> > collect. the resulting GC can be explained by both caching and
> collect
> >
> >
> > Lalwani, Jayesh  writes:
> >
> > > I have a Dataframe with around 6 billion rows, and about 20
> columns. First of all, I want to write this dataframe out to parquet. The,
> Out of the 20 columns, I have 3 columns of interest, and I want to find how
> many distinct values of the columns are there in the file. I don’t need the
> actual distinct values. I just need the count. I knoe that there are around
> 10-16million distinct values
> > >
> > > Before I write the data frame to parquet, I do df.cache. After
> writing the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
> > >
> > > When I run this, I see that the memory usage on my driver steadily
> increases until it starts getting future time outs. I guess it’s spending
> time in GC. Does countDistinct cause this behavior? Does Spark try to get
> all 10 million distinct values into the driver? Is countDistinct not
> recommended for data frames with large number of distinct values?
> > >
> > > What’s the solution? Should I use approx._count_distinct?
> >
> >
> > --
> > nicolas paris
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
> --
> nicolas paris
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: Scala vs Python for ETL with Spark

2020-10-11 Thread ayan guha
But when you have fairly large volume of data that is where spark comes in
the party. And I assume the requirement of using spark is already
established in the original qs and the discussion is to use python vs
scala/java.

On Sun, 11 Oct 2020 at 10:51 pm, Sasha Kacanski  wrote:

> If org has folks that can do python seriously why then spark in the first
> place. You can do workflow on your own, streaming or batch or what ever you
> want.
> I would not do anything else aside from python, but that is me.
>
> On Sat, Oct 10, 2020, 9:42 PM ayan guha  wrote:
>
>> I have one observation: is "python udf is slow due to deserialization
>> penulty" still relevant? Even after arrow is used as in memory data mgmt
>> and so heavy investment from spark dev community on making pandas first
>> class citizen including Udfs.
>>
>> As I work with multiple clients, my exp is org culture and available
>> people are most imp driver for this choice regardless the use case. Use
>> case is relevant only when there is a feature imparity
>>
>> On Sun, 11 Oct 2020 at 7:39 am, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Not quite sure how meaningful this discussion is, but in case someone is
>>> really faced with this query the question still is 'what is the use case'?
>>> I am just a bit confused with the one size fits all deterministic
>>> approach here thought that those days were over almost 10 years ago.
>>> Regards
>>> Gourav
>>>
>>> On Sat, 10 Oct 2020, 21:24 Stephen Boesch,  wrote:
>>>
>>>> I agree with Wim's assessment of data engineering / ETL vs Data
>>>> Science.I wrote pipelines/frameworks for large companies and scala was
>>>> a much better choice. But for ad-hoc work interfacing directly with data
>>>> science experiments pyspark presents less friction.
>>>>
>>>> On Sat, 10 Oct 2020 at 13:03, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Many thanks everyone for their valuable contribution.
>>>>>
>>>>> We all started with Spark a few years ago where Scala was the talk
>>>>> of the town. I agree with the note that as long as Spark stayed nish and
>>>>> elite, then someone with Scala knowledge was attracting premiums. In
>>>>> fairness in 2014-2015, there was not much talk of Data Science input (I 
>>>>> may
>>>>> be wrong). But the world has moved on so to speak. Python itself has been
>>>>> around a long time (long being relative here). Most people either knew 
>>>>> UNIX
>>>>> Shell, C, Python or Perl or a combination of all these. I recall we had a
>>>>> director a few years ago who asked our Hadoop admin for root password to
>>>>> log in to the edge node. Later he became head of machine learning
>>>>> somewhere else and he loved C and Python. So Python was a gift in 
>>>>> disguise.
>>>>> I think Python appeals to those who are very familiar with CLI and shell
>>>>> programming (Not GUI fan). As some members alluded to there are more 
>>>>> people
>>>>> around with Python knowledge. Most managers choose Python as the unifying
>>>>> development tool because they feel comfortable with it. Frankly I have not
>>>>> seen a manager who feels at home with Scala. So in summary it is a bit
>>>>> disappointing to abandon Scala and switch to Python just for the sake of 
>>>>> it.
>>>>>
>>>>> Disclaimer: These are opinions and not facts so to speak :)
>>>>>
>>>>> Cheers,
>>>>>
>>>>>
>>>>> Mich
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, 9 Oct 2020 at 21:56, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> I have come across occasions when the teams use Python with Spark for
>>>>>> ETL, for example processing data from S3 buckets into Snowflake with 
>>>>>> Spark.
>>>>>>
>>>>>> The only reason I think they are choosing Python as opposed to Scala
>>>>>> is because they are more familiar with Python. Since Spark is written in
>>>>>> Scala, itself is an indication of why I think Scala has an edge.
>>>>>>
>>>>>> I have not done one

Re: Scala vs Python for ETL with Spark

2020-10-10 Thread ayan guha
I have one observation: is "python udf is slow due to deserialization
penulty" still relevant? Even after arrow is used as in memory data mgmt
and so heavy investment from spark dev community on making pandas first
class citizen including Udfs.

As I work with multiple clients, my exp is org culture and available people
are most imp driver for this choice regardless the use case. Use case is
relevant only when there is a feature imparity

On Sun, 11 Oct 2020 at 7:39 am, Gourav Sengupta 
wrote:

> Not quite sure how meaningful this discussion is, but in case someone is
> really faced with this query the question still is 'what is the use case'?
> I am just a bit confused with the one size fits all deterministic approach
> here thought that those days were over almost 10 years ago.
> Regards
> Gourav
>
> On Sat, 10 Oct 2020, 21:24 Stephen Boesch,  wrote:
>
>> I agree with Wim's assessment of data engineering / ETL vs Data Science.
>>   I wrote pipelines/frameworks for large companies and scala was a much
>> better choice. But for ad-hoc work interfacing directly with data science
>> experiments pyspark presents less friction.
>>
>> On Sat, 10 Oct 2020 at 13:03, Mich Talebzadeh 
>> wrote:
>>
>>> Many thanks everyone for their valuable contribution.
>>>
>>> We all started with Spark a few years ago where Scala was the talk
>>> of the town. I agree with the note that as long as Spark stayed nish and
>>> elite, then someone with Scala knowledge was attracting premiums. In
>>> fairness in 2014-2015, there was not much talk of Data Science input (I may
>>> be wrong). But the world has moved on so to speak. Python itself has been
>>> around a long time (long being relative here). Most people either knew UNIX
>>> Shell, C, Python or Perl or a combination of all these. I recall we had a
>>> director a few years ago who asked our Hadoop admin for root password to
>>> log in to the edge node. Later he became head of machine learning
>>> somewhere else and he loved C and Python. So Python was a gift in disguise.
>>> I think Python appeals to those who are very familiar with CLI and shell
>>> programming (Not GUI fan). As some members alluded to there are more people
>>> around with Python knowledge. Most managers choose Python as the unifying
>>> development tool because they feel comfortable with it. Frankly I have not
>>> seen a manager who feels at home with Scala. So in summary it is a bit
>>> disappointing to abandon Scala and switch to Python just for the sake of it.
>>>
>>> Disclaimer: These are opinions and not facts so to speak :)
>>>
>>> Cheers,
>>>
>>>
>>> Mich
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, 9 Oct 2020 at 21:56, Mich Talebzadeh 
>>> wrote:
>>>
>>>> I have come across occasions when the teams use Python with Spark for
>>>> ETL, for example processing data from S3 buckets into Snowflake with Spark.
>>>>
>>>> The only reason I think they are choosing Python as opposed to Scala is
>>>> because they are more familiar with Python. Since Spark is written in
>>>> Scala, itself is an indication of why I think Scala has an edge.
>>>>
>>>> I have not done one to one comparison of Spark with Scala vs Spark with
>>>> Python. I understand for data science purposes most libraries like
>>>> TensorFlow etc. are written in Python but I am at loss to understand the
>>>> validity of using Python with Spark for ETL purposes.
>>>>
>>>> These are my understanding but they are not facts so I would like to
>>>> get some informed views on this if I can?
>>>>
>>>> Many thanks,
>>>>
>>>> Mich
>>>>
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>> --
Best Regards,
Ayan Guha


https://issues.apache.org/jira/browse/SPARK-18381

2020-09-25 Thread ayan guha
Anyone aware of any workaround for
https://issues.apache.org/jira/browse/SPARK-18381

Other than upgrade to Spark 3 I mean,,,

-- 
Best Regards,
Ayan Guha


Re: Edge AI with Spark

2020-09-24 Thread ayan guha
Too broad a question  and the short answer is yes and long answer is it
depends.

Essentially spark is a compute engine so it can be wrapped into any
containerized model and deployed at the edge. I believe there are various
implemntation available



On Thu, 24 Sep 2020 at 5:19 pm, Marco Sassarini 
wrote:

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Hi,
>
>
> I'd like to know if Spark supports edge AI: can Spark
>
> run on physical device such as mobile devices running Android/iOS?
>
>
>
>
>
> Best regards,
>
>
> Marco Sassarini
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Marco SassariniArtificial Intelligence Department*
>
>
>
>
>
>
> office: +39 0434 562 978
>
>
>
> www.overit.it
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
Best Regards,
Ayan Guha


Re: Spark :- Update record in partition.

2020-06-07 Thread ayan guha
Hi

Please look at delta.io which is a companion open source project. It
addresses the exact use case you are after.

On Mon, Jun 8, 2020 at 2:35 AM Sunil Kalra  wrote:

> Hi All,
>
> If i have to update a record in partition using spark, do i have to read
> the whole partition and update the row and overwrite the partition?
>
> Is there a way to only update 1 row like DBMS. Otherwise 1 row update
> takes a long time to rewrite the whole partition ?
>
> Thanks
> Sunil
>
>
>
>
>

-- 
Best Regards,
Ayan Guha


Re: How to pass a constant value to a partitioned hive table in spark

2020-04-16 Thread ayan guha
;   ocis_party_id AS partyId
>> > , target_mobile_no AS phoneNumber
>> >   FROM tmp
>> >   """
>> >   spark.sql($sqltext)
>> >
>> >
>> > However, this does not work!
>> >
>> >
>> > scala>   sqltext = """
>> >  |   $INSERT INTO TABLE michtest.BroadcastStaging PARTITION
>> > (broadcastId = $broadcastValue, brand = "dummy")
>> >  |   SELECT
>> >  |   ocis_party_id AS partyId
>> >  | , target_mobile_no AS phoneNumber
>> >  |   FROM tmp
>> >  |   """
>> > sqltext: String =
>> >   $INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
>> > $broadcastValue, brand = "dummy")
>> >   SELECT
>> >   ocis_party_id AS partyId
>> > , target_mobile_no AS phoneNumber
>> >   FROM tmp
>> >
>> >
>> > scala>   spark.sql($sqltext)
>> > :41: error: not found: value $sqltext
>> >  spark.sql($sqltext)
>> >
>> >
>> > Any ideas?
>> >
>> >
>> > Thanks
>> >
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn *
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> > <
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >*
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>> > *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any
>> > loss, damage or destruction of data or any other property which may
>> arise
>> > from relying on this email's technical content is explicitly disclaimed.
>> > The author will in no case be liable for any monetary damages arising
>> from
>> > such loss, damage or destruction.
>>
>

-- 
Best Regards,
Ayan Guha


Re: OFF TOPIC LIST CRITERIA

2020-03-28 Thread ayan guha
Hi

THough Sean requested a non-reply, but this is sincerely crazy.

@Zahid - what Sean said is actually mentioned in Spark's website


   - u...@spark.incubator.apache.org
   <http://mail-archives.apache.org/mod_mbox/incubator-spark-user/> -- for
   usage questions, help, and announcements. (subscribe)
   

(archives)
   <http://mail-archives.apache.org/mod_mbox/incubator-spark-user/>
   - d...@spark.incubator.apache.org
   <http://mail-archives.apache.org/mod_mbox/incubator-spark-dev/> -- for
   people who want to contribute code to Spark. (subscribe)
   

(archives)
   <http://mail-archives.apache.org/mod_mbox/incubator-spark-dev/>

Most users will probably want the User list, but individuals interested in
contributing code to the project should also subscribe to the Dev list.

Also, in ASF - https://www.apache.org/dev/contrib-email-tips

Its nothing wrong to use appropriate mailing lists, and nothing wrong if
someone points out to you.


Given I can vouch for @Sean Owen 's contribution in the
user group (and especially around CI/CD), your claims and tone in which
those are made are both sadly hilarious. Let us refrain from hurting each
other in this otherwise troubled times.

Best

Ayan




On Sat, Mar 28, 2020 at 2:11 PM Zahid Rahman  wrote:

>
> OK *user support. user@ is DONE !!!*
>
> I reported a work around to an existing bug actually to the experienced
> user.
> and "the experienced user" was "not aware" of the
> setting in the log4j.properties so he learned something new too.
> Clearly neither were you.
>
> Also it may surprise some people but  there are people who have been
> formally
> trained in software development.
> We can tell a self trained a mile away.
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> <http://www.backbutton.co.uk>
>
>
> On Sat, 28 Mar 2020 at 03:02, Sean Owen  wrote:
>
>> BCC user, dev, and I encourage others to not reply.
>>
>> I said _dev@_ is not for user support. user@ is. You heard that
>> yesterday, too, and not to cross-post.
>> You actually got answers to several questions, despite their tone,
>> from experienced developers of the project.
>>
>> Messages like yours are, I assure you, not useful to _anybody_. If we
>> let people talk like this on big community lists, yes _that_ will put
>> up barriers.
>> So, the answer for you is: you are not using either of these lists
>> appropriately right now. If you can keep it civil and on-topic, use
>> user@.
>> Otherwise we will block you from the lists.
>>
>>
>> Sean
>>
>> On Fri, Mar 27, 2020 at 9:46 PM Zahid Rahman 
>> wrote:
>> >
>> >
>> > Sean Owen says the criteria of these two emailing list is not help to
>> support some body
>> > who is new but for people who have been using the software for a long
>> time.
>> >
>> > He is implying I think that I should only send email when I find bugs
>> so that I can help him in his work.
>> > A one way street.
>> >
>> > He is suggesting the more familiar you are with this software the more
>> important you are.
>> > Some kind of Alpha male type heirachy.
>> >
>> > He wants to put a barrier in place where Apache foundation wants no
>> barriers to free learning and free software.
>> >
>> > He has not reported any bugs while I have reported so many in such a
>> short space of time.
>> > He has warned me as well
>> >
>> > So that Sean Owen does not put a barrier in place for me in my path to
>> free learning and free  Apache software
>> > I would like somebody to clarify the criteria for me.
>> >
>> >
>> > Backbutton.co.uk
>> > ¯\_(ツ)_/¯
>> > ♡۶Java♡۶RMI ♡۶
>> > Make Use Method {MUM}
>> > makeuse.org
>>
>

-- 
Best Regards,
Ayan Guha


Re: Issue with UDF Int Conversion - Str to Int

2020-03-23 Thread ayan guha
AwesomeDid not know about conv function so thanks for that

On Tue, 24 Mar 2020 at 1:23 am, Enrico Minack 
wrote:

> Ayan,
>
> no need for UDFs, the SQL API provides all you need (sha1, substring, conv
> ):
> https://spark.apache.org/docs/2.4.5/api/python/pyspark.sql.html
>
> >>> df.select(conv(substring(sha1(col("value_to_hash")), 33, 8), 16,
> 10).cast("long").alias("sha2long")).show()
> +--+
> |  sha2long|
> +--+
> | 478797741|
> |2520346415|
> +--+
>
> This creates a lean query plan:
>
> >>> df.select(conv(substring(sha1(col("value_to_hash")), 33, 8), 16,
> 10).cast("long").alias("sha2long")).explain()
> == Physical Plan ==
> Union
> :- *(1) Project [478797741 AS sha2long#74L]
> :  +- Scan OneRowRelation[]
> +- *(2) Project [2520346415 AS sha2long#76L]
>+- Scan OneRowRelation[]
>
>
> Enrico
>
>
> Am 23.03.20 um 06:13 schrieb ayan guha:
>
> Hi
>
> I am trying to implement simple hashing/checksum logic. The key logic is -
>
> 1. Generate sha1 hash
> 2. Extract last 8 chars
> 3. Convert 8 chars to Int (using base 16)
>
> Here is the cut down version of the code:
>
>
> ---
>
>
>
>
>
>
>
>
>
>
> *from pyspark.sql.functions import * from pyspark.sql.types import * from
> hashlib import sha1 as local_sha1 df = spark.sql("select '4104003141'
> value_to_hash union all  select '4102859263'") f1 = lambda x:
> str(int(local_sha1(x.encode('UTF-8')).hexdigest()[32:],16)) f2 = lambda x:
> int(local_sha1(x.encode('UTF-8')).hexdigest()[32:],16) sha2Int1 = udf( f1 ,
> StringType()) sha2Int2 = udf( f2 , IntegerType()) print(f('4102859263'))
> dfr = df.select(df.value_to_hash, sha2Int1(df.value_to_hash).alias('1'),
> sha2Int2(df.value_to_hash).alias('2')) *
> *dfr.show(truncate=False)*
>
> -
>
> I was expecting both columns should provide exact same values, however
> thats not the case *"always" *
>
> 2520346415 +-+--+---+ |value_to_hash|1 |2 |
> +-----+--+---+ |4104003141 |478797741 |478797741 | 
> |4102859263
> |2520346415|-1774620881| +-+--+---+
>
> The function working fine, as shown in the print statement. However values
> are not matching and vary widely.
>
> Any pointer?
>
> --
> Best Regards,
> Ayan Guha
>
>
> --
Best Regards,
Ayan Guha


Re: Issue with UDF Int Conversion - Str to Int

2020-03-23 Thread ayan guha
Thanks a lot. Will try.

On Mon, Mar 23, 2020 at 8:16 PM Jacob Lynn  wrote:

> You are overflowing the integer type, which goes up to a max value
> of 2147483647 (2^31 - 1). Change the return type of `sha2Int2` to
> `LongType()` and it works as expected.
>
> On Mon, Mar 23, 2020 at 6:15 AM ayan guha  wrote:
>
>> Hi
>>
>> I am trying to implement simple hashing/checksum logic. The key logic is
>> -
>>
>> 1. Generate sha1 hash
>> 2. Extract last 8 chars
>> 3. Convert 8 chars to Int (using base 16)
>>
>> Here is the cut down version of the code:
>>
>>
>> ---
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *from pyspark.sql.functions import *from pyspark.sql.types import *from
>> hashlib import sha1 as local_sha1df = spark.sql("select '4104003141'
>> value_to_hash union all  select '4102859263'")f1 = lambda x:
>> str(int(local_sha1(x.encode('UTF-8')).hexdigest()[32:],16))f2 = lambda x:
>> int(local_sha1(x.encode('UTF-8')).hexdigest()[32:],16)sha2Int1 = udf( f1 ,
>> StringType())sha2Int2 = udf( f2 , IntegerType())print(f('4102859263'))dfr =
>> df.select(df.value_to_hash, sha2Int1(df.value_to_hash).alias('1'),
>> sha2Int2(df.value_to_hash).alias('2'))*
>> *dfr.show(truncate=False)*
>>
>> -
>>
>> I was expecting both columns should provide exact same values, however
>> thats not the case *"always" *
>>
>> 2520346415 +-+--+---+ |value_to_hash|1 |2 |
>> +-+--+---+ |4104003141 |478797741 |478797741 | 
>> |4102859263
>> |2520346415|-1774620881| +-+--+---+
>>
>> The function working fine, as shown in the print statement. However
>> values are not matching and vary widely.
>>
>> Any pointer?
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>

-- 
Best Regards,
Ayan Guha


Issue with UDF Int Conversion - Str to Int

2020-03-22 Thread ayan guha
Hi

I am trying to implement simple hashing/checksum logic. The key logic is -

1. Generate sha1 hash
2. Extract last 8 chars
3. Convert 8 chars to Int (using base 16)

Here is the cut down version of the code:

---










*from pyspark.sql.functions import *from pyspark.sql.types import *from
hashlib import sha1 as local_sha1df = spark.sql("select '4104003141'
value_to_hash union all  select '4102859263'")f1 = lambda x:
str(int(local_sha1(x.encode('UTF-8')).hexdigest()[32:],16))f2 = lambda x:
int(local_sha1(x.encode('UTF-8')).hexdigest()[32:],16)sha2Int1 = udf( f1 ,
StringType())sha2Int2 = udf( f2 , IntegerType())print(f('4102859263'))dfr =
df.select(df.value_to_hash, sha2Int1(df.value_to_hash).alias('1'),
sha2Int2(df.value_to_hash).alias('2'))*
*dfr.show(truncate=False)*
-

I was expecting both columns should provide exact same values, however
thats not the case *"always" *

2520346415 +-+--+---+ |value_to_hash|1 |2 |
+-+--+---+ |4104003141 |478797741
|478797741 | |4102859263
|2520346415|-1774620881| +-+--+---+

The function working fine, as shown in the print statement. However values
are not matching and vary widely.

Any pointer?

-- 
Best Regards,
Ayan Guha


Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread ayan guha
Hi

I would first and foremost try to identify where is the most time spend
during the query. One possibility is it just takes ramp up time for
executors to be available, if thats the case then maybe a dedicated yarn
queue may help, or using Spark thriftserver may help.

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson  wrote:

> Been a while but I remember reading on Stack Overflow you can use a UDF as
> a join condition to trick catalyst into not reshuffling the partitions, ie
> use regular equality on the column you partitioned or bucketed by and your
> custom comparer for the other columns. Never got around to try it out
> hough. I really would like a native way to tell catalyst not to reshuffle
> just because you use more columns in the join condition.
>
> On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <
> manjunathshe...@live.com> wrote:
>
>> Hi All,
>>
>> We have 10 tables in data warehouse (hdfs/hive) written using ORC format.
>> We are serving a usecase on top of that by joining 4-5 tables using Hive as
>> of now. But it is not fast as we wanted it to be, so we are thinking of
>> using spark for this use case.
>>
>> Any suggestion on this ? Is it good idea to use the Spark for this use
>> case ? Can we get better performance by using spark ?
>>
>> Any pointers would be helpful.
>>
>> *Notes*:
>>
>>- Data is partitioned by date (MMdd) as integer.
>>- Query will fetch data for last 7 days from some tables while
>>joining with other tables.
>>
>>
>> *Approach we thought of as now :*
>>
>>- Create dataframe for each table and partition by same column for
>>all tables ( Lets say Country as partition column )
>>- Register all tables as temporary tables
>>- Run the sql query with joins
>>
>> But the problem we are seeing with this approach is , even though we
>> already partitioned using country it still does hashParittioning +
>> shuffle during join. All the table join contain `Country` column with some
>> extra column based on the table.
>>
>> Is there any way to avoid these shuffles ? and improve performance ?
>>
>>
>> Thanks and regards
>> Manjunath
>>
>

-- 
Best Regards,
Ayan Guha


Re: Performance tuning on the Databricks pyspark 2.4.4

2020-01-21 Thread ayan guha
For case 1, you can create 3 notebooks and 3 jobs in databricks. Then you
can run them in parallel

On Wed, 22 Jan 2020 at 3:50 am, anbutech  wrote:

> Hi sir,
>
> Could you please help me on the below two cases in the databricks pyspark
> data processing terabytes of json data read from aws s3 bucket.
>
> case 1:
>
> currently I'm reading multiple tables sequentially to get the day count
> from each table
>
> for ex: table_list.csv having one column with multiple table names
>
> year=2019
> month=12
>
> tablesDF =
>
> spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv")
> tabList = tablesDF.toPandas().values.tolist()
> for table in tabList:
> tab_name = table[0]
>
>  // Snowflake Settings and snowflake  table count()
>
> sfOptions = dict(
>   "sfURL" -> "",
>   "sfAccount" -> "",
>   "sfUser" -> "",
>   "sfPassword" -> "",
>   "sfDatabase" -> "",
>   "sfSchema" -> "",
>   "sfWarehouse" -> "",
> )
>
> // Read data as dataframe
>
> sfxdf = spark.read
>   .format("snowflake")
>   .options(**sfOptions)
>   .option("query", "select y as year,m as month,count(*) as sCount from
> {} where y={} and m={} group by year,month").format(tab_name,year,month)
>   .load()
>
> //databricks delta lake
>
>  dbxDF = spark.sql("select y as year,m as month,count(*) as dCount
> from
> db.{} where y={} and m={}" group by
> year,month).format(tab_name,year,month)
>
> resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer'
> ).na.fill(0).withColumn("flag_col", expr("dCount == sCount"))
>
> finalDF = resultDF.withColumn("table_name",
>
> lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col")
>
>
> finalDF.coalesce(1).write.format('csv').option('header',
> 'true').save("s3a://outputs/reportcsv)
>
> Question:
>
> 1) Instead of sequence based running the count query taking one by
> one
> tables ,how to parallel read all the tables from the csv file from s3 and
> distributed the jobs across the cluster.
>
> 2) Could you please how to optimize the above code in the pyspark
> for
> parallel processing all the count query at the same time.
>
>
>
> Case 2 :
>
> Multiprocessing case:
>   
>
> Could you please help me how to achieve multiprocessing on the
> above
> pyspark query to parallel running in the distributed environment.
>
> By using below snippets is there any way to achieve the parallel
> processing
> pyspark code in the cluster.
>
> # Creating a pool of 20 processes. You can set this as per your
> intended
> parallelism and your available resources.
>
>
>
>
>start = time.time()
> pool = multiprocessing.Pool(20)
> # This will execute get_counts() parallel, on each element inside
> input_paths.
> # result (a list of dictionary) is constructed when all executions are
> completed.
> //result = pool.map(get_counts, input_paths)
>
> end = time.time()
>
> result_df = pd.DataFrame(result)
> # You can use, result_df.to_csv() to store the results in a csv.
> print(result_df)
> print('Time take : {}'.format(end - start))
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: Identify bottleneck

2019-12-20 Thread ayan guha
Cool, thanks! Very helpful

On Fri, 20 Dec 2019 at 6:53 pm, Enrico Minack 
wrote:

> The issue is explained in depth here:
> https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015
>
> Am 19.12.19 um 23:33 schrieb Chris Teoh:
>
> As far as I'm aware it isn't any better. The logic all gets processed by
> the same engine so to confirm, compare the DAGs generated from both
> approaches and see if they're identical.
>
> On Fri, 20 Dec 2019, 8:56 am ayan guha,  wrote:
>
>> Quick question: Why is it better to use one sql vs multiple withColumn?
>> isnt everything eventually rewritten by catalyst?
>>
>> On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack 
>> wrote:
>>
>>> How many withColumn statements do you have? Note that it is better to
>>> use a single select, rather than lots of withColumn. This also makes drops
>>> redundant.
>>>
>>> Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is
>>> really slow. Can you try this on a single machine, i.e. run wit "local[*]".
>>>
>>> Can you rule out the writing part by counting the rows? I presume this
>>> all happens in a single stage.
>>>
>>> Enrico
>>>
>>>
>>> Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
>>>
>>> Hello
>>>
>>> I'm working on an ETL based on csv describing file systems to transform
>>> it into parquet so I can work on them easily to extract informations.
>>> I'm using Mr. Powers framework Daria to do so. I've quiet different
>>> input and a lot of transformation and the framework helps organize the
>>> code.
>>> I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and
>>> 32GB of memory each.
>>> The storage is handle by a CephFS volume mounted on all nodes.
>>> First a small description of my algorithm (it's quiet simple):
>>>
>>> Use SparkContext to load the csv.bz2 file,
>>> Chain a lot of withColumn() statement,
>>> Drop all unnecessary columns,
>>> Write parquet file to CephFS
>>>
>>> This treatment can take several hours depending on how much lines the
>>> CSV is and I wanted to identify if bz2 or network could be an issue
>>> so I run the following test (several time with consistent result) :
>>> I tried the following scenario with 20 cores and 2 core per task:
>>>
>>>- Read the csv.bz2 from CephFS with connection with 1Gb/s for each
>>>node: ~5 minutes.
>>>- Read the csv.bz2 from TMPFS(setup to look like a shared storage
>>>space): ~5 minutes.
>>>- From the 2 previous tests I concluded that uncompressing the file
>>>was part of the bottleneck so I decided to uncompress the file and store 
>>> it
>>>in TMPFS as well, result: ~5.9 minutes.
>>>
>>> The test file has 25'833'369 lines and is 370MB compressed and 3700MB
>>> uncompressed. Those results have been reproduced several time each.
>>> My question here is by what am I bottleneck in this case ?
>>>
>>> I though that the uncompressed file in RAM would be the fastest. Is it
>>> possible that my program is suboptimal reading the CSV ?
>>> In the execution logs on the cluster I have 5 to 10 seconds GC time max,
>>> and timeline shows mainly CPU time (no shuffling, no randomization overload
>>> either).
>>> I also noticed that memory storage is never used during the execution. I
>>> know from several hours of research that bz2 is the only real compression
>>> algorithm usable as an input in spark for parallelization reasons.
>>>
>>> Do you have any idea of why such a behaviour ?
>>> and do you have any idea on how to improve such treatment ?
>>>
>>> Cheers
>>>
>>> Antoine
>>>
>>>
>>> --
>> Best Regards,
>> Ayan Guha
>>
>
> --
Best Regards,
Ayan Guha


Re: Identify bottleneck

2019-12-19 Thread ayan guha
Quick question: Why is it better to use one sql vs multiple withColumn?
isnt everything eventually rewritten by catalyst?

On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack 
wrote:

> How many withColumn statements do you have? Note that it is better to use
> a single select, rather than lots of withColumn. This also makes drops
> redundant.
>
> Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is
> really slow. Can you try this on a single machine, i.e. run wit "local[*]".
>
> Can you rule out the writing part by counting the rows? I presume this all
> happens in a single stage.
>
> Enrico
>
>
> Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
>
> Hello
>
> I'm working on an ETL based on csv describing file systems to transform it
> into parquet so I can work on them easily to extract informations.
> I'm using Mr. Powers framework Daria to do so. I've quiet different input
> and a lot of transformation and the framework helps organize the code.
> I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and
> 32GB of memory each.
> The storage is handle by a CephFS volume mounted on all nodes.
> First a small description of my algorithm (it's quiet simple):
>
> Use SparkContext to load the csv.bz2 file,
> Chain a lot of withColumn() statement,
> Drop all unnecessary columns,
> Write parquet file to CephFS
>
> This treatment can take several hours depending on how much lines the CSV
> is and I wanted to identify if bz2 or network could be an issue
> so I run the following test (several time with consistent result) :
> I tried the following scenario with 20 cores and 2 core per task:
>
>- Read the csv.bz2 from CephFS with connection with 1Gb/s for each
>node: ~5 minutes.
>- Read the csv.bz2 from TMPFS(setup to look like a shared storage
>space): ~5 minutes.
>- From the 2 previous tests I concluded that uncompressing the file
>was part of the bottleneck so I decided to uncompress the file and store it
>in TMPFS as well, result: ~5.9 minutes.
>
> The test file has 25'833'369 lines and is 370MB compressed and 3700MB
> uncompressed. Those results have been reproduced several time each.
> My question here is by what am I bottleneck in this case ?
>
> I though that the uncompressed file in RAM would be the fastest. Is it
> possible that my program is suboptimal reading the CSV ?
> In the execution logs on the cluster I have 5 to 10 seconds GC time max,
> and timeline shows mainly CPU time (no shuffling, no randomization overload
> either).
> I also noticed that memory storage is never used during the execution. I
> know from several hours of research that bz2 is the only real compression
> algorithm usable as an input in spark for parallelization reasons.
>
> Do you have any idea of why such a behaviour ?
> and do you have any idea on how to improve such treatment ?
>
> Cheers
>
> Antoine
>
>
> --
Best Regards,
Ayan Guha


Re: How more than one spark job can write to same partition in the parquet file

2019-12-11 Thread ayan guha
We partitioned data logically for 2 different jobs...in our use case based
on geography...

On Thu, 12 Dec 2019 at 3:39 pm, Chetan Khatri 
wrote:

> Thanks, If you can share alternative change in design. I would love to
> hear from you.
>
> On Wed, Dec 11, 2019 at 9:34 PM ayan guha  wrote:
>
>> No we faced problem with that setup.
>>
>> On Thu, 12 Dec 2019 at 11:14 am, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hi Spark Users,
>>> would that be possible to write to same partition to the parquet file
>>> through concurrent two spark jobs with different spark session.
>>>
>>> thanks
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
> --
Best Regards,
Ayan Guha


Re: How more than one spark job can write to same partition in the parquet file

2019-12-11 Thread ayan guha
No we faced problem with that setup.

On Thu, 12 Dec 2019 at 11:14 am, Chetan Khatri 
wrote:

> Hi Spark Users,
> would that be possible to write to same partition to the parquet file
> through concurrent two spark jobs with different spark session.
>
> thanks
>
-- 
Best Regards,
Ayan Guha


Re: Is there a merge API available for writing DataFrame

2019-11-15 Thread ayan guha
You are probably looking for Spark Delta Lake tables

On Fri, 15 Nov 2019 at 7:48 pm, Sivaprasanna 
wrote:

> Hi,
>
> As the title implies, do we have a way of merging a DataFrame into a sink
> (either Table or a distribute filesystem)? I'm sure we cannot have a full
> fledged equivalent of Hive's MERGE INTO but maybe we can have a way of
> writing (updating) only those rows present in the DF, with the rest of the
> rows/data in the sink untouched.
>
> Sivaprasanna
>
-- 
Best Regards,
Ayan Guha


Re: Explode/Flatten Map type Data Using Pyspark

2019-11-14 Thread ayan guha
Hi Anbutech in that case you have variable number of columns in output df
and then in csv. it will not be the best way to read csv

On Fri, 15 Nov 2019 at 2:30 pm, anbutech  wrote:

> Hello Guha,
>
> The  number of keys will be different for each event id.for example if the
> event id:005 it is has 10 keys then i have to flatten all those 10 keys in
> the final output.here there is no fixed number of keys for each event id.
>
> 001 -> 2 keys
>
> 002 -> 4 keys
>
> 003 -> 5 keys
>
> above event id has different key values combinations and different from
> other.i want to dynamically flatten the incoming data
>
> in the ouput s3 csv file(want to write all the flattened keys in the csv
> path)
>
> flatten.csv
>
> eve_id  k1k2  k3
> 001   abc   x  y
>
> eve_id,  k1  k2   k3   k4
> 002, 12  jack 0.01 0998
>
> eve_id,   k1 k2k3  k4  k5
> 003,   aaa     device   endpoint -
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: Explode/Flatten Map type Data Using Pyspark

2019-11-14 Thread ayan guha
Hi

How do you want your final DF to look like? Is it with all 5 value columns?
Do you have a finite set of columns?

On Fri, Nov 15, 2019 at 4:50 AM anbutech  wrote:

> Hello Sir,
>
> I have a scenario to flatten the different combinations of map type(key
> value) in a column called eve_data  like below:
>
> How do we flatten the map type into proper columns using pyspark
>
>
> 1) Source Dataframe having 2 columns(event id,data)
>
> eve_id,eve_data
> 001,  "k1":"abc",
>   "k2":"xyz"
>   "k3":"10091"
>
> eve_id,eve_data
>
> 002,   "k1":"12",
>   "k2":"jack",
>"k3":"0.01",
>"k4":"0998"
>
> eve_id,eve_data
>
> 003,   "k1":"aaa",
>  "k2":"",
>   "k3":"device",
>   "k4":"endpoint",
>   "k5":"-"
>
>
> Final output:
>
> (flatten the output of each  event ids key values).The number of key values
> will be different for each event id.so i want to flatten the records for
> all
> the map type(key values) as below
>
> eve_id  k1  k2  k3
> 001abc xyz 10091
>
> eve_id,  k1  k2   k3   k4
> 002, 12  jack 0.01 0998
>
> eve_id,   k1 k2k3  k4  k5
> 003,   aaa     device endpoint -
>
>
> Thanks
> Anbu
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Best Regards,
Ayan Guha


Re: Avro file question

2019-11-04 Thread ayan guha
Assuming you always read data together one large file is good and basic
hdfs use case

On Tue, 5 Nov 2019 at 4:28 am, Yaniv Harpaz  wrote:

> It depends on your usage (when and how u read).
> the smaller files you were thinking about are also larger than the HDFS
> block size?
> I would not go for something smaller than a block.
>
> Usually (if relevant to the way you read the data) the partitioning helps
> determine that.
>
>
> Yaniv Harpaz
> [ yaniv.harpaz at gmail.com ]
>
>
> On Mon, Nov 4, 2019 at 7:03 PM Sam  wrote:
>
>> Hi,
>>
>> How do we choose between single large avro file (size much larger than
>> HDFS block size) vs multiple smaller avro files (close to HDFS block size?
>>
>> Since avro is splittable, is there even a need to split a very large avro
>> file into smaller files?
>>
>> I’m assuming that a single large avro file can also be split into
>> multiple mappers/reducers/executors during processing.
>>
>> Thanks.
>>
> --
Best Regards,
Ayan Guha


Fwd: Delta with intelligent upsett

2019-10-31 Thread ayan guha
Hi

we have a scenario where we have a large table  ie 5-6B records. The table
is repository of data from past N years. It is possible that some updates
take place on the data and thus er are using Delta table.

As part of the business process we know updates can happen only within M
years of past records where M is much smaller than N. Eg the table can hold
20 yrs of data but we know updates can happen only for last year not before
that.

Is there some way to indicate this additional intelligence to Delta so it
can look into only last years data while running a merge or update? It
seems to be an obvious performance booster.

Any thoughts?
-- 
Best Regards,
Ayan Guha
-- 
Best Regards,
Ayan Guha


Re: Driver vs master

2019-10-07 Thread ayan guha
HI

I think you are mixing terminologies here. Loosely speaking, Master manages
worker machines. Each worker machine can run one or more processes. A
process can be a driver or executor. You submit applications to the master.
Each application will have driver and executors. Master will decide where
to put each of them. In cluster mode, master will distribute the drivers
across the cluster. In client mode, master will try to run the driver
processes within master's own process. You can launch multiple master
processes as well and use them for a set of applications - this happens
when you use YARN. I am not sure how Mesos or K8 works in that score
though.

HTH...

Ayan



On Tue, Oct 8, 2019 at 12:11 PM Andrew Melo  wrote:

> Hi
>
> On Mon, Oct 7, 2019 at 19:20 Amit Sharma  wrote:
>
>> Thanks Andrew but I am asking specific to driver memory not about
>> executors memory. We have just one master and if each jobs driver.memory=4g
>> and master nodes total memory is 16gb then we can not execute more than 4
>> jobs at a time.
>
>
> I understand that. I think there's a misunderstanding with the
> terminology, though. Are you running multiple separate spark instances on a
> single machine or one instance with multiple jobs inside.
>
>
>>
>> On Monday, October 7, 2019, Andrew Melo  wrote:
>>
>>> Hi Amit
>>>
>>> On Mon, Oct 7, 2019 at 18:33 Amit Sharma  wrote:
>>>
>>>> Can you please help me understand this. I believe driver programs runs
>>>> on master node
>>>
>>> If we are running 4 spark job and driver memory config is 4g then total
>>>> 16 6b would be used of master node.
>>>
>>>
>>> This depends on what master/deploy mode you're using: if it's "local"
>>> master and "client mode" then yes tasks execute in the same JVM as the
>>> driver. In this case though, the driver JVM uses whatever much space is
>>> allocated for the driver regardless of how many threads you have.
>>>
>>>
>>> So if we will run more jobs then we need more memory on master. Please
>>>> correct me if I am wrong.
>>>>
>>>
>>> This depends on your application, but in general more threads will
>>> require more memory.
>>>
>>>
>>>
>>>>
>>>> Thanks
>>>> Amit
>>>>
>>> --
>>> It's dark in this basement.
>>>
>> --
> It's dark in this basement.
>


-- 
Best Regards,
Ayan Guha


Re: Convert a line of String into column

2019-10-05 Thread ayan guha
Do you know how many columns?

On Sat, Oct 5, 2019 at 6:39 PM Dhaval Modi  wrote:

> Hi,
>
> 1st convert  "lines"  to dataframe. You will get one column with original
> string in one row.
>
> Post this, use string split on this column to convert to Array of String.
>
> After This, you can use explode function to have each element of the array
> as columns.
>
> On Wed 2 Oct, 2019, 03:18 ,  wrote:
>
>> I want to convert a line of String to a table. For instance, I want to
>> convert following line
>>
>>... # this is a line in a text
>> file, separated by a white space
>>
>> to table
>>
>> +-+--++--+
>> |col1| col2| col3...|col6|
>> +-+-+-+-+
>> |val1|val2|val3|val6|
>> +-+--+---.+-+
>> .
>>
>> The code looks as below
>>
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.SparkSession
>>
>> val spark = SparkSession
>>   .builder
>>   .master("local")
>>   .appName("MyApp")
>>   .getOrCreate()
>>
>> import spark.implicits._
>>
>> val lines = spark.readStream.textFile("/tmp/data/")
>>
>> val words = lines.as[String].flatMap(_.split(" "))
>> words.printSchema()
>>
>> val query = words.
>>   writeStream.
>>   outputMode("append").
>>   format("console").
>>   start
>> query.awaitTermination()
>>
>> But in fact this code only turns the line into a single column
>>
>> +---+
>> | value|
>> +---+
>> |col1...|
>> |col2...|
>> | col3..|
>> |  ... |
>> |  col6 |
>> +--+
>>
>> How to achieve the effect that I want to do?
>>
>> Thanks?
>>
>>

-- 
Best Regards,
Ayan Guha


Re: Learning Spark

2019-07-04 Thread ayan guha
My best advise is to go through the docs and listen to lots of demo/videos
from spark committers.

On Fri, 5 Jul 2019 at 3:03 pm, Kurt Fehlhauer  wrote:

> Are you a data scientist or data engineer?
>
>
> On Thu, Jul 4, 2019 at 10:34 PM Vikas Garg  wrote:
>
>> Hi,
>>
>> I am new Spark learner. Can someone guide me with the strategy towards
>> getting expertise in PySpark.
>>
>> Thanks!!!
>>
> --
Best Regards,
Ayan Guha


Re: Announcing Delta Lake 0.2.0

2019-06-21 Thread ayan guha
Hi

Thanks for confirmation. We are using the workaround to create a separate
Hive external table STORED AS PARQUET with the exact location of Delta
table. Our use case is batch-driven and we are running VACUUM with 0
retention after every batch is completed. Do you see any potential problem
with this workaround, other than during the time when the batch is running
the table can provide some wrong information?

Best
Ayan

On Fri, Jun 21, 2019 at 8:03 PM Tathagata Das 
wrote:

> @ayan guha  @Gourav Sengupta
> 
> Delta Lake is OSS currently does not support defining tables in Hive
> metastore using DDL commands. We are hoping to add the necessary
> compatibility fixes in Apache Spark to make Delta Lake work with tables and
> DDL commands. So we will support them in a future release. In the meantime,
> please read/write Delta tables using paths.
>
> TD
>
> On Fri, Jun 21, 2019 at 12:49 AM Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi Ayan,
>>
>> I may be wrong about this, but I think that Delta files are in Parquet
>> format. But I am sure that you have already checked this. Am I missing
>> something?
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Fri, Jun 21, 2019 at 6:39 AM ayan guha  wrote:
>>
>>> Hi
>>> We used spark.sql to create a table using DELTA. We also have a hive
>>> metastore attached to the spark session. Hence, a table gets created in
>>> Hive metastore. We then tried to query the table from Hive. We faced
>>> following issues:
>>>
>>>1. SERDE is SequenceFile, should have been Parquet
>>>2. Scema fields are not passed.
>>>
>>> Essentially the hive DDL looks like:
>>>
>>> *CREATE TABLE `TABLE NAME`(**  `col` array COMMENT 'from
>>> deserializer')*
>>>
>>> *ROW FORMAT SERDE **
>>> 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' **WITH
>>> SERDEPROPERTIES ( **  'path'=WASB PATH**')  **STORED AS INPUTFORMAT *
>>> *  'org.apache.hadoop.mapred.SequenceFileInputFormat'*
>>>
>>> *OUTPUTFORMAT **
>>> 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'  *
>>> *LOCATION **  '* *WASB PATH'*
>>>
>>> *TBLPROPERTIES ( **  'spark.sql.create.version'='2.4.0',**
>>> 'spark.sql.sources.provider'='DELTA',**
>>> 'spark.sql.sources.schema.numParts'='1',*
>>> *  
>>> 'spark.sql.sources.schema.part.0'='{\"type\":\"struct\",\"fields\":[]}',**
>>> 'transient_lastDdlTime'='1556544657')*
>>>
>>> Is this expected? And will the use case be supported in future releases?
>>>
>>>
>>> We are now experimenting
>>>
>>> Best
>>>
>>> Ayan
>>>
>>> On Fri, Jun 21, 2019 at 11:06 AM Liwen Sun 
>>> wrote:
>>>
>>>> Hi James,
>>>>
>>>> Right now we don't have plans for having a catalog component as part of
>>>> Delta Lake, but we are looking to support Hive metastore and also DDL
>>>> commands in the near future.
>>>>
>>>> Thanks,
>>>> Liwen
>>>>
>>>> On Thu, Jun 20, 2019 at 4:46 AM James Cotrotsios <
>>>> jamescotrots...@gmail.com> wrote:
>>>>
>>>>> Is there a plan to have a business catalog component for the Data
>>>>> Lake? If not how would someone make a proposal to create an open source
>>>>> project related to that. I would be interested in building out an open
>>>>> source data catalog that would use the Hive metadata store as a baseline
>>>>> for technical metadata.
>>>>>
>>>>>
>>>>> On Wed, Jun 19, 2019 at 3:04 PM Liwen Sun 
>>>>> wrote:
>>>>>
>>>>>> We are delighted to announce the availability of Delta Lake 0.2.0!
>>>>>>
>>>>>> To try out Delta Lake 0.2.0, please follow the Delta Lake Quickstart:
>>>>>> https://docs.delta.io/0.2.0/quick-start.html
>>>>>>
>>>>>> To view the release notes:
>>>>>> https://github.com/delta-io/delta/releases/tag/v0.2.0
>>>>>>
>>>>>> This release introduces two main features:
>>>>>>
>>>>>> *Cloud storage support*
>>>>>> In addition to HDFS, you can now configure Delta Lake to read and
>>>>>> write data on cloud storage services such as Amazon S3 and Azure Blob
>>>>>> Storage. For configuration instructions, please see:
>>>>>> https://docs.delta.io/0.2.0/delta-storage.html
>>>>>>
>>>>>> *Improved concurrency*
>>>>>> Delta Lake now allows concurrent append-only writes while still
>>>>>> ensuring serializability. For concurrency control in Delta Lake, please
>>>>>> see: https://docs.delta.io/0.2.0/delta-concurrency.html
>>>>>>
>>>>>> We have also greatly expanded the test coverage as part of this
>>>>>> release.
>>>>>>
>>>>>> We would like to acknowledge all community members for contributing
>>>>>> to this release.
>>>>>>
>>>>>> Best regards,
>>>>>> Liwen Sun
>>>>>>
>>>>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>

-- 
Best Regards,
Ayan Guha


Re: Announcing Delta Lake 0.2.0

2019-06-20 Thread ayan guha
Hi
We used spark.sql to create a table using DELTA. We also have a hive
metastore attached to the spark session. Hence, a table gets created in
Hive metastore. We then tried to query the table from Hive. We faced
following issues:

   1. SERDE is SequenceFile, should have been Parquet
   2. Scema fields are not passed.

Essentially the hive DDL looks like:

*CREATE TABLE `TABLE NAME`(**  `col` array COMMENT 'from
deserializer')*

*ROW FORMAT SERDE **
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' **WITH
SERDEPROPERTIES ( **  'path'=WASB PATH**')  **STORED AS INPUTFORMAT *
*  'org.apache.hadoop.mapred.SequenceFileInputFormat'*

*OUTPUTFORMAT **
'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'  **LOCATION **
'* *WASB PATH'*

*TBLPROPERTIES ( **  'spark.sql.create.version'='2.4.0',**
'spark.sql.sources.provider'='DELTA',**
'spark.sql.sources.schema.numParts'='1',*
*  'spark.sql.sources.schema.part.0'='{\"type\":\"struct\",\"fields\":[]}',**
'transient_lastDdlTime'='1556544657')*

Is this expected? And will the use case be supported in future releases?


We are now experimenting

Best

Ayan

On Fri, Jun 21, 2019 at 11:06 AM Liwen Sun  wrote:

> Hi James,
>
> Right now we don't have plans for having a catalog component as part of
> Delta Lake, but we are looking to support Hive metastore and also DDL
> commands in the near future.
>
> Thanks,
> Liwen
>
> On Thu, Jun 20, 2019 at 4:46 AM James Cotrotsios <
> jamescotrots...@gmail.com> wrote:
>
>> Is there a plan to have a business catalog component for the Data Lake?
>> If not how would someone make a proposal to create an open source project
>> related to that. I would be interested in building out an open source data
>> catalog that would use the Hive metadata store as a baseline for technical
>> metadata.
>>
>>
>> On Wed, Jun 19, 2019 at 3:04 PM Liwen Sun 
>> wrote:
>>
>>> We are delighted to announce the availability of Delta Lake 0.2.0!
>>>
>>> To try out Delta Lake 0.2.0, please follow the Delta Lake Quickstart:
>>> https://docs.delta.io/0.2.0/quick-start.html
>>>
>>> To view the release notes:
>>> https://github.com/delta-io/delta/releases/tag/v0.2.0
>>>
>>> This release introduces two main features:
>>>
>>> *Cloud storage support*
>>> In addition to HDFS, you can now configure Delta Lake to read and write
>>> data on cloud storage services such as Amazon S3 and Azure Blob Storage.
>>> For configuration instructions, please see:
>>> https://docs.delta.io/0.2.0/delta-storage.html
>>>
>>> *Improved concurrency*
>>> Delta Lake now allows concurrent append-only writes while still ensuring
>>> serializability. For concurrency control in Delta Lake, please see:
>>> https://docs.delta.io/0.2.0/delta-concurrency.html
>>>
>>> We have also greatly expanded the test coverage as part of this release.
>>>
>>> We would like to acknowledge all community members for contributing to
>>> this release.
>>>
>>> Best regards,
>>> Liwen Sun
>>>
>>>

-- 
Best Regards,
Ayan Guha


Re: Announcing Delta Lake 0.2.0

2019-06-19 Thread ayan guha
Hi

We are using Delta features. The only problem we faced till now is Hive can
not read DELTA outputs by itself (even if the Hive metastore is shared).
However, if we create hive external table pointing to the folder (and with
Vacuum), it can read the data.

Other than that, the feature looks good and well thought out. We are doing
a volume testing now

Best
Ayan

On Thu, Jun 20, 2019 at 9:52 AM Liwen Sun  wrote:

> Hi Gourav,
>
> Thanks for the suggestion. Please open a Github issue at
> https://github.com/delta-io/delta/issues to describe your use case and
> requirements for "external tables" so we can better track this feature and
> also get feedback from the community.
>
> Regards,
> Liwen
>
> On Wed, Jun 19, 2019 at 12:11 PM Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> does Delta support external tables? I think that most users will be
>> needing this.
>>
>>
>> Regards,
>> Gourav
>>
>> On Wed, Jun 19, 2019 at 8:04 PM Liwen Sun 
>> wrote:
>>
>>> We are delighted to announce the availability of Delta Lake 0.2.0!
>>>
>>> To try out Delta Lake 0.2.0, please follow the Delta Lake Quickstart:
>>> https://docs.delta.io/0.2.0/quick-start.html
>>>
>>> To view the release notes:
>>> https://github.com/delta-io/delta/releases/tag/v0.2.0
>>>
>>> This release introduces two main features:
>>>
>>> *Cloud storage support*
>>> In addition to HDFS, you can now configure Delta Lake to read and write
>>> data on cloud storage services such as Amazon S3 and Azure Blob Storage.
>>> For configuration instructions, please see:
>>> https://docs.delta.io/0.2.0/delta-storage.html
>>>
>>> *Improved concurrency*
>>> Delta Lake now allows concurrent append-only writes while still ensuring
>>> serializability. For concurrency control in Delta Lake, please see:
>>> https://docs.delta.io/0.2.0/delta-concurrency.html
>>>
>>> We have also greatly expanded the test coverage as part of this release.
>>>
>>> We would like to acknowledge all community members for contributing to
>>> this release.
>>>
>>> Best regards,
>>> Liwen Sun
>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "Delta Lake Users and Developers" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to delta-users+unsubscr...@googlegroups.com.
>>> To view this discussion on the web visit
>>> https://groups.google.com/d/msgid/delta-users/CAE4dWq9g90NkUr_SLs2J6kFPbOpxx4wy6MEgb%3DQ5pBxkUcK%2B-A%40mail.gmail.com
>>> <https://groups.google.com/d/msgid/delta-users/CAE4dWq9g90NkUr_SLs2J6kFPbOpxx4wy6MEgb%3DQ5pBxkUcK%2B-A%40mail.gmail.com?utm_medium=email_source=footer>
>>> .
>>>
>>

-- 
Best Regards,
Ayan Guha


Re: Databricks - number of executors, shuffle.partitions etc

2019-05-15 Thread ayan guha
Well its a databricks question so better be asked in their forum.

You can set up cluster level params when you create new cluster or add them
later. Go to cluster page, ipen one cluster, expand additional config
section and add your param there as key value pair separated by space.

On Thu, 16 May 2019 at 11:46 am, Rishi Shah 
wrote:

> Hi All,
>
> Any idea?
>
> Thanks,
> -Rishi
>
> On Tue, May 14, 2019 at 11:52 PM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> How can we set spark conf parameter in databricks notebook? My cluster
>> doesn't take into account any spark.conf.set properties... it creates 8
>> worker nodes (dat executors) but doesn't honor the supplied conf
>> parameters. Any idea?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>
>
> --
> Regards,
>
> Rishi Shah
>
-- 
Best Regards,
Ayan Guha


Re: How to retrieve multiple columns values (in one row) to variables in Spark Scala method

2019-04-05 Thread ayan guha
Try like this:

val primitiveDS = spark.sql("select 1.2 avg ,2.3 stddev").collect().apply(0)
val arr = Array(primitiveDS.getDecimal(0), primitiveDS.getDecimal(1))

primitiveDS: org.apache.spark.sql.Row = [1.2,2.3] arr:
Array[java.math.BigDecimal] = Array(1.2, 2.3)


Re: Where does the Driver run?

2019-03-29 Thread ayan guha
re many explanations forks from my question.
>
>
>
> In "my-app" I create a new SparkConf, with the following code (slightly
> abbreviated):
>
>
>
>   conf.setAppName(“my-job")
>
>   conf.setMaster(“spark://master-address:7077”)
>
>   conf.set(“deployMode”, “cluster”)
>
>   // other settings like driver and executor memory requests
>
>   // the driver and executor memory requests are for all mem on the
> slaves, more than
>
>   // mem available on the launching machine with “my-app"
>
>   val jars = listJars(“/path/to/lib")
>
>   conf.setJars(jars)
>
>   …
>
>
>
> When I launch the job I see 2 executors running on the 2 workers/slaves.
> Everything seems to run fine and sometimes completes successfully. Frequent
> failures are the reason for this question.
>
>
>
> Where is the Driver running? I don’t see it in the GUI, I see 2 Executors
> taking all cluster resources. With a Yarn cluster I would expect the
> “Driver" to run on/in the Yarn Master but I am using the Spark Standalone
> Master, where is the Drive part of the Job running?
>
>
>
> If is is running in the Master, we are in trouble because I start the
> Master on one of my 2 Workers sharing resources with one of the Executors.
> Executor mem + driver mem is > available mem on a Worker. I can change this
> but need so understand where the Driver part of the Spark Job runs. Is it
> in the Spark Master, or inside and Executor, or ???
>
>
>
> The “Driver” creates and broadcasts some large data structures so the need
> for an answer is more critical than with more typical tiny Drivers.
>
>
>
> Thanks for you help!
>
>
>
>
> --
>
> Cheers!
>
>
>
> --
Best Regards,
Ayan Guha


SparkR issue

2018-10-09 Thread ayan guha
Hi

We are seeing some weird behaviour in Spark R.

We created a R Dataframe with 600K records and 29 columns. Then we tried to
convert R DF to SparkDF using

df <- SparkR::createDataFrame(rdf)

from RStudio. It hanged, we had to kill the process after 1-2 hours.

We also tried following:
df <- SparkR::createDataFrame(rdf, numPartition=4000)
df <- SparkR::createDataFrame(rdf, numPartition=300)
df <- SparkR::createDataFrame(rdf, numPartition=10)

Same result. Both scenarios seems RStudio is working and no trace of jobs
in Spark Application Master view.

Finally, we used this:

df <- SparkR::createDataFrame(rdf, schema=schema) , schema is a StructType.

This tool 25 mins to create the spark DF. However job did show up in
Application Master view and it shows only 20-30 secs. Then where did rest
of the time go?

Question:
1. Is this expected behavior? (I hope not). How should we speed up this bit?
2. We understand better options would be to read data from external
sources, but we need this data to be generated for some simulation purpose.
Whats possibly going wrong?


Best
Ayan



-- 
Best Regards,
Ayan Guha


Re: Pyspark Partitioning

2018-09-30 Thread ayan guha
Hi

There are a set pf finction which can be used with the construct
Over (partition by col order by col).

You search for rank and window functions in spark documentation.

On Mon, 1 Oct 2018 at 5:29 am, Riccardo Ferrari  wrote:

> Hi Dimitris,
>
> I believe the methods partitionBy
> <https://spark.apache.org/docs/2.2.0/api/python/pyspark.html#pyspark.RDD.partitionBy>
> and mapPartitions
> <https://spark.apache.org/docs/2.2.0/api/python/pyspark.html#pyspark.RDD.mapPartitions>
> are specific to RDDs while you're talking about DataFrames
> <https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame>.
> I guess you have few options including:
> 1. use the Dataframe.rdd
> <https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.rdd>
> call and process the returned RDD. Please note the return type for this
> call is and RDD of Row
> 2. User the groupBy
> <https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy>
> from Dataframes and start from there, this may involved defining an udf or
> leverage on the existing GroupedData
> <https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.GroupedData>
> functions.
>
> It really depends on your use-case and your performance requirements.
> HTH
>
> On Sun, Sep 30, 2018 at 8:31 PM dimitris plakas 
> wrote:
>
>> Hello everyone,
>>
>> I am trying to split a dataframe on partitions and i want to apply a
>> custom function on every partition. More precisely i have a dataframe like
>> the one below
>>
>> Group_Id | Id | Points
>> 1| id1| Point1
>> 2| id2| Point2
>>
>> I want to have a partition for every Group_Id and apply on every
>> partition a function defined by me.
>> I have tried with partitionBy('Group_Id').mapPartitions() but i receive
>> error.
>> Could you please advice me how to do it?
>>
> --
Best Regards,
Ayan Guha


Re: Time-Series Forecasting

2018-09-19 Thread ayan guha
Hi

I work mostly in data engineering and trying to promote use of sparkR
within the company I recently joined. Some of the users are working around
forecasting a bunch of things and want to use SparklyR as they found time
series implementation is better than SparkR.

Does anyone have a point of view regarding this? Is SparklyR is better than
SparkR in certain use cases?

On Thu, Sep 20, 2018 at 4:07 AM, Mina Aslani  wrote:

> Hi,
>
> Thank you for your quick response, really appreciate it.
>
> I just started learning TimeSeries forecasting, and I may try different
> methods and observe their predictions/forecasting.However, my
> understanding is that below methods are needed:
>
> - Smoothing
> - Decomposing(e.g. remove/separate trend/seasonality)
> - AR Model/MA Model/Combined Model (e.g. ARMA, ARIMA)
> - ACF (Autocorrelation Function)/PACF (Partial Autocorrelation Function)
> - Recurrent Neural Network (LSTM: Long Short Term Memory)
>
> Kindest regards,
> Mina
>
>
>
> On Wed, Sep 19, 2018 at 12:55 PM Jörn Franke  wrote:
>
>> What functionality do you need ? Ie which methods?
>>
>> > On 19. Sep 2018, at 18:01, Mina Aslani  wrote:
>> >
>> > Hi,
>> > I have a question for you. Do we have any Time-Series Forecasting
>> library in Spark?
>> >
>> > Best regards,
>> > Mina
>>
>


-- 
Best Regards,
Ayan Guha


Re: Drawing Big Data tech diagrams using Pen Tablets

2018-09-12 Thread ayan guha
FWIW...I use draw.io and it is pretty neat

On Thu, Sep 13, 2018 at 6:46 AM, Gourav Sengupta 
wrote:

> well, it may be possible to use just excel shapes ( and advanced shapes
> instead of draw.io). I think that free form, hand drawing and changing
> them to solid shapes with two taps in iPad apps and Surface apps render a
> fantastic expressive quality to diagrams. Ofcourse, as with everything, its
> a matter of choice.
>
> Regards,
> Gourav Sengupta
>
> On Wed, Sep 12, 2018 at 7:45 PM Mich Talebzadeh 
> wrote:
>
>> thanks Jorn, pretty neat
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 12 Sep 2018 at 19:38, Jörn Franke  wrote:
>>
>>> You can try cloud services such as draw.io or  similar.
>>>
>>> On 12. Sep 2018, at 20:31, Mich Talebzadeh 
>>> wrote:
>>>
>>> Hi Gourav,
>>>
>>> I have an IPAD that my son uses it and not me (for games). I don't see
>>> much value in spending $$$ on Surface. Then I had montblanc augmented paper
>>> that kinf of impressive but not really that practical.
>>>
>>> I have Visio 2010 and an upgrade to 2016 will cost good money. So I
>>> decided to get this pen tablet after seeing these diagrams. They are pretty
>>> cool
>>>
>>> 
>>>
>>> In UK it retails at £495 so not cheap but much cheaper than surface pro
>>> which is a premium brand.
>>>
>>> Cheers,
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 12 Sep 2018 at 18:34, Gourav Sengupta 
>>> wrote:
>>>
>>>> Hi Mich,
>>>>
>>>> does Apple iPad and Surface not have their own tools for this?
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>> On Tue, Sep 11, 2018 at 8:21 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Sorry this question may not be related to Spark but sure many people
>>>>> use MS Visio or other graphics tools for architecture type diagrams with
>>>>> big data including Spark.
>>>>>
>>>>> Visio is time consuming and cumbersome although that is what I use for
>>>>> tech diagrams. I tried Montblanc augmented paper
>>>>> <https://www.montblanc.com/en-gb/discover/specials/augmented-paper.html>but
>>>>> that is clumsy and expensive.
>>>>>
>>>>> I was wondering if anyone has tried some tools like Wacom Intuos Pro
>>>>> Paper Edition Pen Tablet
>>>>> <https://www.wacom.com/en/products/pen-tablets/wacom-intuos-pro-paper>
>>>>> or any similar tools for easier drawing and their recommendation?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>


-- 
Best Regards,
Ayan Guha


Big Burst of Streaming Changes

2018-07-29 Thread ayan guha
Hi

We have a situation where we are ingesting high volume streaming ingest
coming from a Oracle table.
The requirement
Whenever there is a change in Oracle table, a CDC process will write out
the change in a Kafka or Event Hub stream, and the stream will be consumed
a spark streaming application.

The Problem:
Because of some challenges in Oracle side, it is observed that commits in
Oracle happens in big bursts, regularly over couple of millions of records,
and especially delete transactions. Hence, the stream consumed by spark app
is not evenly distributed.

The Question:

a) Is there some special care should be taken to write this kind of spark
app?
b) Is it better if we rather go with spark batch which can run every hour
or so? In that case we can use event hub archival process to write data to
storage every 5 mins and then consume from hdfs/storage every hour
c) Other than a CDC tool, is there any spark package which can actually
listen to Oracle change stream? So can we use spark as the CDC tool itself?

-- 
Best Regards,
Ayan Guha


Re: the best tool to interact with Spark

2018-06-26 Thread ayan guha
Depends on what are you trying to do. I found zeppelin an excellent option
to interactively run queries and code

On Tue, Jun 26, 2018 at 10:21 PM, Donni Khan <
prince.don...@googlemail.com.invalid> wrote:

> Hi all,
>
> What is the best tool to interact easly with Spark?
>
> Thank you,
> Donni
>



-- 
Best Regards,
Ayan Guha


Spark-Mongodb connector issue

2018-06-18 Thread ayan guha
Hi Guys

I have a large mongodb collection with complex document structure. I an
facing an issue when I am getting error as

Can not cast Array to Struct. Value:BsonArray([])

The target column is indeed a struct. So the error makes sense.

I am able to successfully read from another collection with exactly same
structure but subset of data.

I am suspecting some documents are corrupted at mongodb.

Question:
1. Is there any way to filter out such documents in mongodb connector?
2. I tried to exclude the column from a custom select statement but did not
work. Is it possible?
3. Is there any way to suppress errors to a certain amount? I do not want
to stall the load of 1M record if 1 record is bad.

I know this might be a question for mongodb forum. But I started from here
as there may be some generic solution I can use. I am going to post to SO
and Mongo forum shortly.

Best
Ayan

-- 
Best Regards,
Ayan Guha


Re: Append In-Place to S3

2018-06-03 Thread ayan guha
I do not use anti join semantics, but you can use left outer join and then
filter out nulls from right side. Your data may have dups on the columns
separately but it should not have dups on the composite key ie all columns
put together.

On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones 
wrote:

> The issue is not the append vs overwrite - perhaps those responders do not
> know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to
> s3 eventual consistency issues.
>
> First, your sql query is wrong as you don’t close the parenthesis of the
> CTE (“with” part). In fact, it looks like you don’t need that with at all,
> and the query should fail to parse. If that does parse, I would open a bug
> on the spark jira.
>
> Can you provide the query that you are using to detect duplication so I
> can see if your deduplication logic matches the detection query?
>
> -TJ
>
> On Sat, Jun 2, 2018 at 10:22 Aakash Basu 
> wrote:
>
>> As Jay suggested correctly, if you're joining then overwrite otherwise
>> only append as it removes dups.
>>
>> I think, in this scenario, just change it to write.mode('overwrite')
>> because you're already reading the old data and your job would be done.
>>
>>
>> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:
>>
>>> Hi Jay,
>>>
>>> Thanks for your response. Are you saying to append the new data and then
>>> remove the duplicates to the whole data set afterwards overwriting the
>>> existing data set with new data set with appended values? I will give that
>>> a try.
>>>
>>> Cheers,
>>> Ben
>>>
>>> On Fri, Jun 1, 2018 at 11:49 PM Jay 
>>> wrote:
>>>
>>>> Benjamin,
>>>>
>>>> The append will append the "new" data to the existing data with
>>>> removing the duplicates. You would need to overwrite the file everytime if
>>>> you need unique values.
>>>>
>>>> Thanks,
>>>> Jayadeep
>>>>
>>>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:
>>>>
>>>>> I have a situation where I trying to add only new rows to an existing
>>>>> data set that lives in S3 as gzipped parquet files, looping and appending
>>>>> for each hour of the day. First, I create a DF from the existing data, 
>>>>> then
>>>>> I use a query to create another DF with the data that is new. Here is the
>>>>> code snippet.
>>>>>
>>>>> df = spark.read.parquet(existing_data_path)
>>>>> df.createOrReplaceTempView(‘existing_data’)
>>>>> new_df = spark.read.parquet(new_data_path)
>>>>> new_df.createOrReplaceTempView(’new_data’)
>>>>> append_df = spark.sql(
>>>>> """
>>>>> WITH ids AS (
>>>>> SELECT DISTINCT
>>>>> source,
>>>>> source_id,
>>>>> target,
>>>>> target_id
>>>>> FROM new_data i
>>>>> LEFT ANTI JOIN existing_data im
>>>>> ON i.source = im.source
>>>>> AND i.source_id = im.source_id
>>>>> AND i.target = im.target
>>>>> AND i.target = im.target_id
>>>>> """
>>>>> )
>>>>> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
>>>>> compression='gzip’)
>>>>>
>>>>>
>>>>> I thought this would append new rows and keep the data unique, but I
>>>>> am see many duplicates. Can someone help me with this and tell me what I 
>>>>> am
>>>>> doing wrong?
>>>>>
>>>>> Thanks,
>>>>> Ben
>>>>>
>>>> --
Best Regards,
Ayan Guha


Re: Bulk / Fast Read and Write with MSSQL Server and Spark

2018-05-23 Thread ayan guha
Curious question: what is the reason of using spark here? Why not simple
sql-based ETL?

On Thu, May 24, 2018 at 5:09 AM, Ajay <ajay.ku...@gmail.com> wrote:

> Do you worry about spark overloading the SQL server?  We have had this
> issue in the past where all spark slaves tend to send lots of data at once
> to SQL and that slows down the latency of the rest of the system. We
> overcame this by using sqoop and running it in a controlled environment.
>
> On Wed, May 23, 2018 at 7:32 AM Chetan Khatri <chetan.opensou...@gmail.com>
> wrote:
>
>> Super, just giving high level idea what i want to do. I have one source
>> schema which is MS SQL Server 2008 and target is also MS SQL Server 2008.
>> Currently there is c# based ETL application which does extract transform
>> and load as customer specific schema including indexing etc.
>>
>>
>> Thanks
>>
>> On Wed, May 23, 2018 at 7:11 PM, kedarsdixit <kedarnath_dixit@persistent.
>> com> wrote:
>>
>>> Yes.
>>>
>>> Regards,
>>> Kedar Dixit
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>
> --
> Thanks,
> Ajay
>



-- 
Best Regards,
Ayan Guha


Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread ayan guha
A relatively naive solution will be:

0. Create a dummy blank dataframe
1. Loop through the list of paths.
2. Try to create the dataframe from the path. If success then union it
cumulatively.
3. If error, just ignore it or handle as you wish.

At the end of the loop, just use the unioned df. This should not have any
additional performance overhead as declaring dataframes and union is not
expensive, unless you call any action within the loop.

Best
Ayan

On Tue, 22 May 2018 at 11:27 am, JF Chen <darou...@gmail.com> wrote:

> Thanks, Thakrar,
>
> I have tried to check the existence of path before read it, but HDFSCli
> python package seems not support wildcard.  "FileSystem.globStatus" is a
> java api while I am using python via livy Do you know any python api
> implementing the same function?
>
>
> Regard,
> Junfeng Chen
>
> On Mon, May 21, 2018 at 9:01 PM, Thakrar, Jayesh <
> jthak...@conversantmedia.com> wrote:
>
>> Probably you can do some preprocessing/checking of the paths before you
>> attempt to read it via Spark.
>>
>> Whether it is local or hdfs filesystem, you can try to check for
>> existence and other details by using the "FileSystem.globStatus" method
>> from the Hadoop API.
>>
>>
>>
>> *From: *JF Chen <darou...@gmail.com>
>> *Date: *Sunday, May 20, 2018 at 10:30 PM
>> *To: *user <user@spark.apache.org>
>> *Subject: *How to skip nonexistent file when read files with spark?
>>
>>
>>
>> Hi Everyone
>>
>> I meet a tricky problem recently. I am trying to read some file paths
>> generated by other method. The file paths are represented by wild card in
>> list, like [ '/data/*/12', '/data/*/13']
>>
>> But in practice, if the wildcard cannot match any existed path, it will
>> throw an exception:"pyspark.sql.utils.AnalysisException: 'Path does not
>> exist: ...'", and the program stops after that.
>>
>> Actually I want spark can just ignore and skip these nonexistent  file
>> path, and continues to run. I have tried python HDFSCli api to check the
>> existence of path , but hdfs cli cannot support wildcard.
>>
>>
>>
>> Any good idea to solve my problem? Thanks~
>>
>>
>>
>> Regard,
>> Junfeng Chen
>>
>
> --
Best Regards,
Ayan Guha


Re: Submit many spark applications

2018-05-16 Thread ayan guha
How about using Livy to submit jobs?

On Thu, 17 May 2018 at 7:24 am, Marcelo Vanzin <van...@cloudera.com> wrote:

> You can either:
>
> - set spark.yarn.submit.waitAppCompletion=false, which will make
> spark-submit go away once the app starts in cluster mode.
> - use the (new in 2.3) InProcessLauncher class + some custom Java code
> to submit all the apps from the same "launcher" process.
>
> On Wed, May 16, 2018 at 1:45 PM, Shiyuan <gshy2...@gmail.com> wrote:
> > Hi Spark-users,
> >  I want to submit as many spark applications as the resources permit. I
> am
> > using cluster mode on a yarn cluster.  Yarn can queue and launch these
> > applications without problems. The problem lies on spark-submit itself.
> > Spark-submit starts a jvm which could fail due to insufficient memory on
> the
> > machine where I run spark-submit if many spark-submit jvm are running.
> Any
> > suggestions on how to solve this problem? Thank you!
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: native-lzo library not available

2018-05-03 Thread ayan guha
rg.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:73)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1145)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> Driver stacktrace:
>>
>>
>>
>> I see the LZO at GPextras:
>>
>> ll
>> total 104
>> -rw-r--r-- 1 cloudera-scm cloudera-scm 35308 Oct  4  2017
>> COPYING.hadoop-lzo
>> -rw-r--r-- 1 cloudera-scm cloudera-scm 62268 Oct  4  2017
>> hadoop-lzo-0.4.15-cdh5.13.0.jar
>> lrwxrwxrwx 1 cloudera-scm cloudera-scm31 May  3 07:23 hadoop-lzo.jar
>> -> hadoop-lzo-0.4.15-cdh5.13.0.jar
>> drwxr-xr-x 2 cloudera-scm cloudera-scm  4096 Oct  4  2017 native
>>
>>
>>
>>
>> --
>> Take Care
>> Fawze Abujaber
>>
>
>
>
> --
> Take Care
> Fawze Abujaber
>



-- 
Best Regards,
Ayan Guha


Re: Read or save specific blocks of a file

2018-05-03 Thread ayan guha
Is this a recommended way of reading data in the long run? I think it might
be better to write or look for an InputFormat which supports the need

Btw Block is designed to be hdfs internal representation to enable certain
features. It would be interesting to understand the usecase where client
app really needs to know about it. It sounds like a questionable design
without that context

Best
Ayan

On Fri, 4 May 2018 at 1:46 am, Thodoris Zois <z...@ics.forth.gr> wrote:

> Hello Madhav,
>
> What I did is pretty straight-forward. Let's say that your HDFS block is
> 128 MB and you store a file of 256 MBs in HDFS, named Test.csv.
>
> First use the command: `hdfs fsck Test.csv -locations -blocks -files`. It
> will return you some very useful information including the list of blocks.
> So let's say that you want to read the first block (block 0). On the right
> side of the line that corresponds to block 0 you can find the IP of the
> machine that holds this specific block in the local file system as well as
> the blockName (BP-1737920335-xxx.xxx.x.x-1510660262864) and blockID (e.g:
> blk_1073760915_20091) that will help you later recognize it. So what you
> need from fsck is the blockName, blockID and the IP of the machine that has
> the specific block that you are interested in.
>
> After you get these you got everything you need. All you have to do is to
> connect to the specific IP and execute: `find
> /data/hdfs-data/datanode/current/blockName/current/finalized/subdir0/ -name
> blockID`. That command will return you the full path where you can find the
> contents of your file Test.csv that correspond to one block in HDFS.
>
> What I do after I get the full path is to copy the file, remove the last
> line (because there is a big chance that the last line will be included in
> the next block) and store it again to HDFS with the desired name. Then I
> can access one block of file Test.csv from HDFS. That's all, if you need
> any further information do no hesitate to contact me.
>
> - Thodoris
>
>
> On Thu, 2018-05-03 at 14:47 +0530, Madhav A wrote:
>
> Thodoris,
>
> I certainly would be interested in knowing how you were able to identify
> individual blocks and read from them. I was understanding that HDFS
> protocol abstracts this from the consumers to prevent potential data
> corruption issues. Appreciate if you please share some details of your
> approach.
>
> Thanks!
> madhav
>
> On Wed, May 2, 2018 at 3:34 AM, Thodoris Zois <z...@ics.forth.gr> wrote:
>
> That’s what I did :) If you need further information I can post my
> solution..
>
> - Thodoris
>
> On 30 Apr 2018, at 22:23, David Quiroga <quirogadf4w...@gmail.com> wrote:
>
> There might be a better way... but I wonder if it might be possible to
> access the node where the block is store and read it from the local file
> system rather than from HDFS.
>
> On Mon, Apr 23, 2018 at 11:05 AM, Thodoris Zois <z...@ics.forth.gr> wrote:
>
> Hello list,
>
> I have a file on HDFS that is divided into 10 blocks (partitions).
>
> Is there any way to retrieve data from a specific block? (e.g: using
> the blockID).
>
> Except that, is there any option to write the contents of each block
> (or of one block) into separate files?
>
> Thank you very much,
> Thodoris
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@hadoop.apache.org
> For additional commands, e-mail: user-h...@hadoop.apache.org
>
>
>
> --
Best Regards,
Ayan Guha


Re: How to read the schema of a partitioned dataframe without listing all the partitions ?

2018-04-27 Thread ayan guha
You can specify the first folder directly and read it

On Fri, 27 Apr 2018 at 9:42 pm, Walid LEZZAR <walez...@gmail.com> wrote:

> Hi,
>
> I have a parquet on S3 partitioned by day. I have 2 years of data (->
> about 1000 partitions). With spark, when I just want to know the schema of
> this parquet without even asking for a single row of data, spark tries to
> list all the partitions and the nested partitions of the parquet. Which
> makes it very slow just to build the dataframe object on Zeppelin.
>
> Is there a way to avoid that ? Is there way to tell spark : "hey, just
> read a single partition and give me the schema of that partition and
> consider it as the schema of the whole dataframe" ? (I don't care about
> schema merge, it's off by the way)
>
> Thanks.
> Walid.
>
-- 
Best Regards,
Ayan Guha


Re: How to bulk insert using spark streaming job

2018-04-19 Thread ayan guha
by writing code, I suppose :) Jokes apart, I think you need to articulate
the problem with more details for others to help.

Do you mean you want to batch up data in memory and then write as a chunk?
Where do want to insert? Etc etc...

On Fri, Apr 20, 2018 at 1:08 PM, amit kumar singh <amitiem...@gmail.com>
wrote:

> How to bulk insert using spark streaming job
>
> Sent from my iPhone
>



-- 
Best Regards,
Ayan Guha


Re: Issue with map function in Spark 2.2.0

2018-04-11 Thread ayan guha
2)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at py4j.GatewayConnection.run(GatewayConnection.java:214)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last):
>   File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, 
> in main
>   File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, 
> in process
>   File "C:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 
> 372, in dump_stream
> vs = list(itertools.islice(iterator, batch))
>   File "C:\spark\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft
> yield next(iterator)
>   File "", line 8, in parse
>   File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 565, in 
> _strptime_datetime
> tt, fraction = _strptime(data_string, format)
>   File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 362, in _strptime
> (data_string, format))
> ValueError: time data '"FL_DATE"' does not match format '%y-%m-%d'
>
>   at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
>   at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
>   at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
>   at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>   at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>   at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>   at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>   at 
> org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
>   at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>   at 
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
>   at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
>   at 
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
>   at 
> org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
>   at 
> org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:109)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
>
>
>
> Please help me in this . Thanks. Nandan Priyadarshi
>
> --
Best Regards,
Ayan Guha


Re: Need config params while doing rdd.foreach or map

2018-03-22 Thread ayan guha
Spark context runs in driver whereas the func inside foreach runs in
executor. You can pass on the param in the func so it is available in
executor

On Thu, 22 Mar 2018 at 8:18 pm, Kamalanathan Venkatesan <
kamalanatha...@in.ey.com> wrote:

> Hello All,
>
>
>
> I have custom parameter say for example file name added to the conf of
> spark context example  SparkConf.set(INPUT_FILE_NAME, fileName).
>
> I need this value inside foreach performed on an  RDD, but the when access
> spark context inside foreach, I receive spark context is null exception!
>
>
>
> Code sample:
>
>
>
> *val* conf = *new* SparkConf().setMaster(appConfig.envOrElseConfig(
> "app.sparkconf.master"))
>
>   .setAppName(appConfig.envOrElseConfig("app.appName"))
>
>   .set(“INPUT_FILE_NAME”, fileName)
>
>
>
> var sparkContext = *new* SparkContext(conf)
>
>
>
> sparkContext.addJar(sparkContextParams.jarPath)
>
>
>
> var sqlContext = *new* SQLContext(sparkContext)
>
>
>
> *var* df = sqlContext.read.format("com.databricks.spark.csv")
>
>  .option("header", "true")
>
>   .load()
>
>
>
> df.foreach( f=> {
>
>f.split(“,”)
>
> *   println(sparkContext.getConf.get(“INPUT_FILE_NAME”))*
>
> });
>
>
>
> The above *sparkContext.getConf.get(“INPUT_FILE_NAME”) throws null
> pointer exception!*
>
>
>
> Thanks,
>
> Kamal.
>
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you have received this communication in error, please notify
> us immediately by responding to this email and then delete it from your
> system. The firm is neither liable for the proper and complete transmission
> of the information contained in this communication nor for any delay in its
> receipt.
>
-- 
Best Regards,
Ayan Guha


Re: Hive to Oracle using Spark - Type(Date) conversion issue

2018-03-18 Thread ayan guha
Hi

The is not with spark in this case, it is with Oracle. If you do not know
which columns to apply date-related conversion rule, then you have a
problem.

You should try either

a) Define some config file where you can define table name, date column
name and date-format @ source  so that you can apply appropriate conversion
dynamically
b) Write data into Oracle DB with String data type but have a view which
will translate the date
c) Define Hive tables with date data type so that you can apply appropriate
conversion



On Mon, Mar 19, 2018 at 1:36 PM, Deepak Sharma <deepakmc...@gmail.com>
wrote:

> The other approach would to write to temp table and then merge the data.
> But this may be expensive solution.
>
> Thanks
> Deepak
>
> On Mon, Mar 19, 2018, 08:04 Gurusamy Thirupathy <thirug...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am trying to read data from Hive as DataFrame, then trying to write the
>> DF into the Oracle data base. In this case, the date field/column in hive
>> is with Type Varchar(20)
>> but the corresponding column type in Oracle is Date. While reading from
>> hive , the hive table names are dynamically decided(read from another
>> table) based on some job condition(ex. Job1). There are multiple tables
>> like this, so column and the table names are decided only run time. So I
>> can't do type conversion explicitly when read from Hive.
>>
>> So is there any utility/api available in Spark to achieve this conversion
>> issue?
>>
>>
>> Thanks,
>> Guru
>>
>


-- 
Best Regards,
Ayan Guha


Re: Pyspark Error: Unable to read a hive table with transactional property set as 'True'

2018-03-02 Thread ayan guha
 |
> |   `def_land_mark` string,
>  |
> |   `repeat_cd` string,
>  |
> |   `mp_incr_cd` string,
>   |
> |   `test_trk_dir` string,
>   |
> |   `eff_dt` string,
>   |
> |   `trk_file` string,
>   |
> |   `dfct_cor_dt` string,
>  |
> |   `dfct_acvt` string,
>  |
> |   `dfct_slw_ord_ind` string,
>   |
> |   `emp_id` string,
>   |
> |   `eff_ts` string,
>   |
> |   `dfct_cor_tm` string,
>  |
> |   `dfct_freight_spd` int,
>  |
> |   `dfct_amtrak_spd` int,
>   |
> |   `mile_post_sfx` string,
>  |
> |   `work_order_id` string,
>  |
> |   `loc_id_beg` string,
>   |
> |   `loc_id_end` string,
>   |
> |   `link_id` string,
>  |
> |   `lst_maint_ts` string,
>   |
> |   `del_ts` string,
>   |
> |   `gps_longitude` double,
>  |
> |   `gps_latitude` double,
>   |
> |   `geo_car_nme` string,
>  |
> |   `rept_gc_nme` string,
>  |
> |   `rept_dfct_tst` string,
>  |
> |   `rept_dfct_nbr` int,
>   |
> |   `restr_trk_cls` string,
>  |
> |   `tst_hist_cd` string,
>  |
> |   `cret_ts` string,
>  |
> |   `ylw_grp_nbr` int,
>   |
> |   `geo_dfct_grp_nme` string,
>   |
> |   `supv_rollup_cd` string,
>   |
> |   `dfct_stat_cd` string,
>   |
> |   `lst_maint_id` string,
>   |
> |   `del_rsn_cd` string,
>   |
> |   `umt_prcs_user_id` string,
>   |
> |   `gdfct_vinsp_srestr` string,
>   |
> |   `gc_opr_init` string)
>  |
> | CLUSTERED BY (
>   |
> |   geo_car_nme)
>   |
> | INTO 2 BUCKETS
>   |
> | ROW FORMAT SERDE
>   |
> |   'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
>|
> | STORED AS INPUTFORMAT
>  |
> |   'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
>|
> | OUTPUTFORMAT
>   |
> |   'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
>   |
> | LOCATION
>   |
> |   'hdfs://HADOOP02/apps/hive/warehouse/load_etl.db/trpt_
> geo_defect_prod_dec07_del_blank'  |
> | TBLPROPERTIES (
>  |
> |   'numFiles'='4',
>  |
> |   'numRows'='0',
>   |
> |   'rawDataSize'='0',
>   |
> |   'totalSize'='2566942',
>   |
> |   'transactional'='true',
>  |
> |   'transient_lastDdlTime'='1518695199')
>|
> +---
> +--+
>
>
> Thanks,
> D
>



-- 
Best Regards,
Ayan Guha


Re: Can spark handle this scenario?

2018-02-16 Thread ayan guha
Hi

Couple of suggestions:

1. Do not use Dataset, use Dataframe in this scenario. There is no benefit
of dataset features here. Using Dataframe, you can write an arbitrary UDF
which can do what you want to do.
2. In fact you do need dataframes here. You would be better off with RDD
here. just create a RDD of symbols and use map to do the processing.

On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <irving.du...@gmail.com>
wrote:

> Do you only want to use Scala? Because otherwise, I think with pyspark and
> pandas read table you should be able to accomplish what you want to
> accomplish.
>
> Thank you,
>
> Irving Duran
>
> On 02/16/2018 06:10 PM, Lian Jiang wrote:
>
> Hi,
>
> I have a user case:
>
> I want to download S stock data from Yahoo API in parallel using
> Spark. I have got all stock symbols as a Dataset. Then I used below code to
> call Yahoo API for each symbol:
>
>
>
> case class Symbol(symbol: String, sector: String)
>
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
>
>
> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]
>
>
> symbolDs.map { k =>
>
>   pullSymbolFromYahoo(k.symbol, k.sector)
>
> }
>
>
> This statement cannot compile:
>
>
> Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will be
> added in future releases.
>
>
> My questions are:
>
>
> 1. As you can see, this scenario is not traditional dataset handling such
> as count, sql query... Instead, it is more like a UDF which apply random
> operation on each record. Is Spark good at handling such scenario?
>
>
> 2. Regarding the compilation error, any fix? I did not find a satisfactory
> solution online.
>
>
> Thanks for help!
>
>
>
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Can spark handle this scenario?

2018-02-16 Thread ayan guha
** You do NOT need dataframes, I mean.

On Sat, Feb 17, 2018 at 3:58 PM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> Couple of suggestions:
>
> 1. Do not use Dataset, use Dataframe in this scenario. There is no benefit
> of dataset features here. Using Dataframe, you can write an arbitrary UDF
> which can do what you want to do.
> 2. In fact you do need dataframes here. You would be better off with RDD
> here. just create a RDD of symbols and use map to do the processing.
>
> On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <irving.du...@gmail.com>
> wrote:
>
>> Do you only want to use Scala? Because otherwise, I think with pyspark
>> and pandas read table you should be able to accomplish what you want to
>> accomplish.
>>
>> Thank you,
>>
>> Irving Duran
>>
>> On 02/16/2018 06:10 PM, Lian Jiang wrote:
>>
>> Hi,
>>
>> I have a user case:
>>
>> I want to download S stock data from Yahoo API in parallel using
>> Spark. I have got all stock symbols as a Dataset. Then I used below code to
>> call Yahoo API for each symbol:
>>
>>
>>
>> case class Symbol(symbol: String, sector: String)
>>
>> case class Tick(symbol: String, sector: String, open: Double, close:
>> Double)
>>
>>
>> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]
>>
>>
>> symbolDs.map { k =>
>>
>>   pullSymbolFromYahoo(k.symbol, k.sector)
>>
>> }
>>
>>
>> This statement cannot compile:
>>
>>
>> Unable to find encoder for type stored in a Dataset.  Primitive types
>> (Int, String, etc) and Product types (case classes) are supported by
>> importing spark.implicits._  Support for serializing other types will be
>> added in future releases.
>>
>>
>> My questions are:
>>
>>
>> 1. As you can see, this scenario is not traditional dataset handling such
>> as count, sql query... Instead, it is more like a UDF which apply random
>> operation on each record. Is Spark good at handling such scenario?
>>
>>
>> 2. Regarding the compilation error, any fix? I did not find a
>> satisfactory solution online.
>>
>>
>> Thanks for help!
>>
>>
>>
>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha


Re: mapGroupsWithState in Python

2018-01-31 Thread ayan guha
Thanks a lot TD, exactly what I was looking for. And I have seen most of
your talks, really great stuff you guys are doing :)

On Thu, Feb 1, 2018 at 10:38 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Hello Ayan,
>
> From what I understand, mapGroupsWithState (probably the more general
> flatMapGroupsWithState) is the best way forward (not available in python).
> However, you need to figure out your desired semantics of when you want to
> output the deduplicated data from the stremaing query. For example, if
> there is the following sequence of events
>
> (id, last_update_timestamp, attribute)
> 1, 12:00, A  < do you want to output this immediately or wait for
> sometime to see if there are new data?
> 1, 11:59, B  < ignored as duplicate
> 1, 12:01, C < do you want to output this?
> 1, 12:02, D
>
> If you want to output something every time there is a newer 
> last_update_timestamp,
> then thats not really a strict "deduplication". Its more like aggregation
> with keeping the latest. In that case, you can try using UDAFs as well.
> However, with UDAFs you wont get any state cleanup. So the
> flatMapGroupsWithState is the best solution as you can do whatever tracking
> you want, output whenever you want, and get state cleanup using timeouts.
>
> FYI: i have elaborated on flatMapGroupsWithState and timeouts in my talk -
> https://databricks.com/session/deep-dive-into-
> stateful-stream-processing-in-structured-streaming
>
>
>
>
>
>
>
> On Tue, Jan 30, 2018 at 5:14 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Any help would be much appreciated :)
>>
>> On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I want to write something in Structured streaming:
>>>
>>> 1. I have a dataset which has 3 columns: id, last_update_timestamp,
>>> attribute
>>> 2. I am receiving the data through Kinesis
>>>
>>> I want to deduplicate records based on last_updated. In batch, it looks
>>> like:
>>>
>>> spark.sql("select * from (Select *, row_number() OVER(Partition by id
>>> order by last_updated desc) rank  from table1) tmp where rank =1")
>>>
>>> But now I would like to do it in Structured Stream. I need to maintain
>>> the state of id as per the highest last_updated, across the triggers, for a
>>> certain period (24 hours).
>>>
>>> Questions:
>>>
>>> 1. Should I use mapGroupsWithState or is there any other (SQL?)
>>> solution? Can anyone help me to write it?
>>> 2. Is mapGroupsWithState supported in Python?
>>>
>>>  Just to ensure we cover bases, I have already tried using
>>> dropDuplicates, but it is keeping the 1st record encountered for an Id, not
>>> updating the state:
>>>
>>> unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
>>> dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.
>>> header.id').alias('id'),
>>>   get_json_object(unpackedDF.jsonData,
>>> '$.header.last_updated').cast('timestamp').alias('last_updated'),
>>>   unpackedDF.jsonData)
>>>
>>> dedupDF = 
>>> dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
>>> hours')
>>>
>>>
>>> So it is not working. Any help is appreciated.
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: mapGroupsWithState in Python

2018-01-30 Thread ayan guha
Any help would be much appreciated :)

On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> I want to write something in Structured streaming:
>
> 1. I have a dataset which has 3 columns: id, last_update_timestamp,
> attribute
> 2. I am receiving the data through Kinesis
>
> I want to deduplicate records based on last_updated. In batch, it looks
> like:
>
> spark.sql("select * from (Select *, row_number() OVER(Partition by id
> order by last_updated desc) rank  from table1) tmp where rank =1")
>
> But now I would like to do it in Structured Stream. I need to maintain the
> state of id as per the highest last_updated, across the triggers, for a
> certain period (24 hours).
>
> Questions:
>
> 1. Should I use mapGroupsWithState or is there any other (SQL?) solution?
> Can anyone help me to write it?
> 2. Is mapGroupsWithState supported in Python?
>
>  Just to ensure we cover bases, I have already tried using dropDuplicates,
> but it is keeping the 1st record encountered for an Id, not updating the
> state:
>
> unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
> dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.
> header.id').alias('id'),
>   get_json_object(unpackedDF.jsonData,
> '$.header.last_updated').cast('timestamp').alias('last_updated'),
>   unpackedDF.jsonData)
>
> dedupDF = 
> dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
> hours')
>
>
> So it is not working. Any help is appreciated.
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha


mapGroupsWithState in Python

2018-01-28 Thread ayan guha
Hi

I want to write something in Structured streaming:

1. I have a dataset which has 3 columns: id, last_update_timestamp,
attribute
2. I am receiving the data through Kinesis

I want to deduplicate records based on last_updated. In batch, it looks
like:

spark.sql("select * from (Select *, row_number() OVER(Partition by id order
by last_updated desc) rank  from table1) tmp where rank =1")

But now I would like to do it in Structured Stream. I need to maintain the
state of id as per the highest last_updated, across the triggers, for a
certain period (24 hours).

Questions:

1. Should I use mapGroupsWithState or is there any other (SQL?) solution?
Can anyone help me to write it?
2. Is mapGroupsWithState supported in Python?

 Just to ensure we cover bases, I have already tried using dropDuplicates,
but it is keeping the 1st record encountered for an Id, not updating the
state:

unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.header.id
').alias('id'),
  get_json_object(unpackedDF.jsonData,
'$.header.last_updated').cast('timestamp').alias('last_updated'),
  unpackedDF.jsonData)

dedupDF =
dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
hours')


So it is not working. Any help is appreciated.

-- 
Best Regards,
Ayan Guha


Re: can HDFS be a streaming source like Kafka in Spark 2.2.0?

2018-01-15 Thread ayan guha
http://spark.apache.org/docs/1.0.0/streaming-programming-guide.html#input-sources


On Tue, Jan 16, 2018 at 3:50 PM, kant kodali <kanth...@gmail.com> wrote:

> Got it! What about overwriting the same file instead of appending?
>
> On Mon, Jan 15, 2018 at 7:47 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> What Gerard means is that if you are adding new files in to the same base
>> path (key) then its fine, but in case you are appending lines to the same
>> file then changes will not be picked up.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Jan 16, 2018 at 12:20 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am not sure I understand. any examples ?
>>>
>>> On Mon, Jan 15, 2018 at 3:45 PM, Gerard Maas <gerard.m...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> You can monitor a filesystem directory as streaming source as long as
>>>> the files placed there are atomically copied/moved into the directory.
>>>> Updating the files is not supported.
>>>>
>>>> kr, Gerard.
>>>>
>>>> On Mon, Jan 15, 2018 at 11:41 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am wondering if HDFS can be a streaming source like Kafka in Spark
>>>>> 2.2.0? For example can I have stream1 reading from Kafka and writing to
>>>>> HDFS and stream2 to read from HDFS and write it back to Kakfa ? such that
>>>>> stream2 will be pulling the latest updates written by stream1.
>>>>>
>>>>> Thanks!
>>>>>
>>>>
>>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: Passing an array of more than 22 elements in a UDF

2017-12-22 Thread ayan guha
Hi I think you are in correct track. You can stuff all your param in a
suitable data structure like array or dict and pass this structure as a
single param in your udf.

On Fri, 22 Dec 2017 at 2:55 pm, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Hi,
>
> I am using Spark 2.2 using Java, can anyone please suggest me how to take
> more than 22 parameters in an UDF? I mean, if I want to pass all the
> parameters as an array of integers?
>
> Thanks,
> Aakash.
>
-- 
Best Regards,
Ayan Guha


Re: JDBC to hive batch use case in spark

2017-12-09 Thread ayan guha
Please enable hive support onnspark session,  if using spark2. If you are
on spark1, use hiveContext instead of sqlContext.

On Sun, 10 Dec 2017 at 12:20 am, 张万新 <kevinzwx1...@gmail.com> wrote:

> If you don't mind, I think it will help if you post your code
>
> Hokam Singh Chauhan <hokam.1...@gmail.com>于2017年12月9日周六 下午8:02写道:
>
>> Hi,
>> I have an use case in which I wants to read data from a jdbc
>> source(Oracle) table and write it to hive table on periodic basis. I tried
>> this using the SQL context to read from Oracle and Hive context to write
>> the data in hive. The data read parts works fine but when I ran the save
>> call on hive context to write data, it throws the exception and it says the
>> table or view does not exists even though the table is precreated in hive.
>>
>> Please help if anyone tried such scenario.
>>
>> Thanks
>>
> --
Best Regards,
Ayan Guha


Re: Json Parsing.

2017-12-06 Thread ayan guha
You can use get

On Thu, 7 Dec 2017 at 10:39 am, satyajit vegesna <satyajit.apas...@gmail.com>
wrote:

> Does spark support automatic detection of schema from a json string in a
> dataframe.
>
> I am trying to parse a json string and do some transofrmations on to it
> (would like append new columns to the dataframe) , from the data i stream
> from kafka.
>
> But i am not very sure, how i can parse the json in structured streaming.
> And i would not be interested in creating a schema, as the data form kafka
> is going to maintain different schema objects in value column.
>
> Any advice or help would be appreciated.
>
> Regards,
> Satyajit.
>
-- 
Best Regards,
Ayan Guha


  1   2   3   4   5   6   7   8   >