DataFrameWriter in pyspark ignoring hdfs attributes (using spark-2.2.1-bin-hadoop2.7)?

2018-03-10 Thread Chuan-Heng Hsiao
hi all,

I am using spark-2.2.1-bin-hadoop2.7 with stand-alone mode.
(python version: 3.5.2 from ubuntu 16.04)

I intended to have DataFrame write to hdfs with customized block-size but
failed.
However, the corresponding rdd can successfully write with the customized
block-size.

Could you help me figure out the issue?

Thanks!

Best regards,
Hsiao


The following is the test code:


##
# init
##
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import hdfs
from hdfs import InsecureClient
import os

import numpy as np
import pandas as pd
import logging

os.environ['SPARK_HOME'] = '/opt/spark-2.2.1-bin-hadoop2.7'

block_size = 512 * 1024

conf = 
SparkConf().setAppName("DCSSpark").setMaster("spark://10.7.34.47:7077").set('spark.cores.max',
20).set("spark.executor.cores", 10).set("spark.executor.memory",
"10g").set("spark.hadoop.dfs.blocksize",
str(block_size)).set("spark.hadoop.dfs.block.size", str(block_size))

spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext._jsc.hadoopConfiguration().setInt("dfs.blocksize",
block_size)
spark.sparkContext._jsc.hadoopConfiguration().setInt("dfs.block.size",
block_size)

##
# main
##

# create DataFrame
df_txt = spark.createDataFrame([{'temp': "hello"}, {'temp': "world"},
{'temp': "!"}])

# save using DataFrameWriter
df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')

# save using rdd
client = InsecureClient('http://spark1:50070')
client.delete('/tmp/temp_with_rrd', recursive=True)
df_txt.rdd.saveAsTextFile('hdfs://spark1/tmp/temp_with_rrd')


Re: is there a way to catch exceptions on executor level

2018-03-10 Thread naresh Goud
How about accumaltors?


Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/



On Thu, Mar 8, 2018 at 12:07 AM Chethan Bhawarlal <
cbhawar...@collectivei.com> wrote:

> Hi Dev,
>
> I am doing spark operations on Rdd level for each row like this,
>
>  private def obj(row: org.apache.spark.sql.Row): Put = {
>
>
>
> row.schema.fields.foreach(x => {
>
>   x.dataType match {
>
>case (StringType)=> //some operation
>
>
> so, when I get some empty or garbage value my code fails and I am not able
> to catch the exceptions as these failures are occurring at executors.
>
>
> is there a way I can catch these exceptions and accumulate them and print
> to driver logs?
>
>
> any sample examples provided will be of great help.
>
>
> Thanks,
>
> Chethan.
>
>
>
> Collective[i] dramatically improves sales and marketing performance using
> technology, applications and a revolutionary network designed to provide
> next generation analytics and decision-support directly to business users.
> Our goal is to maximize human potential and minimize mistakes. In most
> cases, the results are astounding. We cannot, however, stop emails from
> sometimes being sent to the wrong person. If you are not the intended
> recipient, please notify us by replying to this email's sender and deleting
> it (and any attachments) permanently from your system. If you are, please
> respect the confidentiality of this communication's contents.


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-10 Thread kant kodali
I will give an attempt to answer this.

since rightValue1 and rightValue2 have the same key "K"(two matches) why
would it ever be the case *rightValue2* replacing *rightValue1*
replacing *null?
*Moreover, why does user need to care?

The result in this case (after getting 2 matches) should be

*(K, leftValue, rightValue1)*
*(K, leftValue, rightValue2)*

This basically means only one of them replaced the earlier null. which one
of two? Depends on whichever arrived first. Other words, "null's" will be
replaced by first matching row and subsequently, if there is a new matching
row it will just be another row with the same key in the result table or if
there a new unmatched row then the result table should have null's for the
unmatched fields.

>From a user perspective, I believe just spitting out nulls for every
trigger until there is a match and when there is match spitting out the
joined rows should suffice isn't it?

Sorry if my thoughts are too naive!










On Thu, Mar 8, 2018 at 6:14 PM, Tathagata Das 
wrote:

> This doc is unrelated to the stream-stream join we added in Structured
> Streaming. :)
>
> That said we added append mode first because it easier to reason about the
> semantics of append mode especially in the context of outer joins. You
> output a row only when you know it wont be changed ever. The semantics of
> update mode in outer joins is trickier to reason about and expose through
> the APIs. Consider a left outer join. As soon as we get a left-side record
> with a key K that does not have a match, do we output *(K, leftValue,
> null)*? And if we do so, then later get 2 matches from the right side, we
> have to output *(K, leftValue, rightValue1) and (K, leftValue,
> rightValue2)*. But how do we convey that *rightValue1* and *rightValue2 
> *together
> replace the earlier *null*, rather than *rightValue2* replacing
> *rightValue1* replacing *null?*
>
> We will figure these out in future releases. For now, we have released
> append mode, which allow quite a large range of use cases, including
> multiple cascading joins.
>
> TD
>
>
>
> On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta  > wrote:
>
>> super interesting.
>>
>> On Wed, Mar 7, 2018 at 11:44 AM, kant kodali  wrote:
>>
>>> It looks to me that the StateStore described in this doc
>>> 
>>>  Actually
>>> has full outer join and every other join is a filter of that. Also the doc
>>> talks about update mode but looks like Spark 2.3 ended up with append mode?
>>> Anyways the moment it is in master I am ready to test so JIRA tickets on
>>> this would help to keep track. please let me know.
>>>
>>> Thanks!
>>>
>>> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali  wrote:
>>>
 Sorry I meant Spark 2.4 in my previous email

 On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:

> Hi TD,
>
> I agree I think we are better off either with a full fix or no fix. I
> am ok with the complete fix being available in master or some branch. I
> guess the solution for me is to just build from the source.
>
> On a similar note, I am not finding any JIRA tickets related to full
> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
> it two implement both of these? It turns out the update mode and full 
> outer
> join is very useful and required in my case, therefore, I'm just asking.
>
> Thanks!
>
> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> I thought about it.
>> I am not 100% sure whether this fix should go into 2.3.1.
>>
>> There are two parts to this bug fix to enable self-joins.
>>
>> 1. Enabling deduping of leaf logical nodes by extending
>> MultInstanceRelation
>>   - This is safe to be backported into the 2.3 branch as it does not
>> touch production code paths.
>>
>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>> micro-batch plan is spliced into the streaming plan.
>>   - This touches core production code paths and therefore, may not
>> safe to backport.
>>
>> Part 1 enables self-joins in all but a small fraction of self-join
>> queries. That small fraction can produce incorrect results, and part 2
>> avoids that.
>>
>> So for 2.3.1, we can enable self-joins by merging only part 1, but it
>> can give wrong results in some cases. I think that is strictly worse than
>> no fix.
>>
>> TD
>>
>>
>>
>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali 
>> wrote:
>>
>>> Hi TD,
>>>
>>> I pulled your commit that is listed on this ticket
>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I
>>> did the following steps and self