Spark dataset cache vs tempview

2016-11-05 Thread Rohit Verma
I have a parquet file which I reading atleast 4-5 times within my application. 
I was wondering what is most efficient thing to do.

Option 1. While writing parquet file, immediately read it back to dataset and 
call cache. I am assuming by doing an immediate read I might use some existing 
hdfs/spark cache as part from write process.

Option 2. In my application when I need the dataset first time, call cache then.

Option 3. While writing parquet file, after completion create a temp view out 
of it. In all subsequent usage, use the view.

I am also not very clear about efficiency of reading from tempview vs parquet 
dataset.

FYI the datasets which I am referring, its not possible to fit all of it in 
memory. They are very huge.

Regards..
Rohit
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Reading csv files with quoted fields containing embedded commas

2016-11-05 Thread Femi Anthony
Hi, I am trying to process a very large comma delimited csv file and I am
running into problems.
The main problem is that some fields contain quoted strings with embedded
commas.
It seems as if PySpark is unable to properly parse lines containing such
fields like say Pandas does.

Here is the code I am using to read the file in Pyspark

df_raw=spark.read.option("header","true").csv(csv_path)

Here is an example of a good and 'bad' line in such a file:



col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19
80015360210876000,11.22,X,4076710258,,,sxsw,,"32 YIU ""A""",S5,,"32 XIY
""W""   JK, RE
LK",SOMETHINGLIKEAPHENOMENON#YOUGOTSOUL~BRINGDANOISE,23.0,cyclingstats,2012-25-19,432,2023-05-17,CODERED
6167229561918,137.12,U,8234971771,,,woodstock,,,T4,,,OUTKAST#THROOTS~WUTANG#RUNDMC,0.0,runstats,2013-21-22,1333,2019-11-23,CODEBLUE

Line 0 is the header
Line 1 is the 'problematic' line
Line 2 is a good line.

Pandas can handle this easily:


[1]: import pandas as pd

In [2]: pdf = pd.read_csv('malformed_data.csv')

In [4]: pdf[['col12','col13','col14']]
Out[4]:
col12 col13
 \
0  32 XIY "W"   JK, RE LK  SOMETHINGLIKEAPHENOMENON#YOUGOTSOUL~BRINGDANOISE
1 NaN OUTKAST#THROOTS~WUTANG#RUNDMC

   col14
0   23.0
10.0


while Pyspark seems to parse this erroneously:

[5]:
sdf=spark.read.format("org.apache.spark.csv").csv('malformed_data.csv',header=True)

[6]: sdf.select("col12","col13",'col14').show()
+--+++
| col12|   col13|   col14|
+--+++
|"32 XIY ""W""   JK|  RE LK"|SOMETHINGLIKEAPHE...|
|  null|OUTKAST#THROOTS~W...| 0.0|
+--+++

 Is this a bug or am I doing something wrong ?
 I am working with Spark 2.0
 Any help is appreciated

Thanks,
-- Femi

http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: Optimized way to use spark as db to hdfs etl

2016-11-05 Thread Deepak Sharma
Hi Rohit
You can use accumulators and increase it on every record processing.
At last you can get the value of accumulator on driver , which will give
you the count.

HTH
Deepak

On Nov 5, 2016 20:09, "Rohit Verma"  wrote:

> I am using spark to read from database and write in hdfs as parquet file.
> Here is code snippet.
>
> private long etlFunction(SparkSession spark){
> spark.sqlContext().setConf("spark.sql.parquet.compression.codec",
> “SNAPPY");
> Properties properties = new Properties();
> properties.put("driver”,”oracle.jdbc.driver");
> properties.put("fetchSize”,”5000");
> Dataset dataset = spark.read().jdbc(jdbcUrl, query, properties);
> dataset.write.format(“parquet”).save(“pdfs-path”);
> return dataset.count();
> }
>
> When I look at spark ui, during write I have stats of records written,
> visible in sql tab under query plan.
>
> While the count itself is a heavy task.
>
> Can someone suggest best way to get count in most optimized way.
>
> Thanks all..
>


Optimized way to use spark as db to hdfs etl

2016-11-05 Thread Rohit Verma
I am using spark to read from database and write in hdfs as parquet file. Here 
is code snippet.

private long etlFunction(SparkSession spark){
spark.sqlContext().setConf("spark.sql.parquet.compression.codec", “SNAPPY");
Properties properties = new Properties();
properties.put("driver”,”oracle.jdbc.driver");
properties.put("fetchSize”,”5000");
Dataset dataset = spark.read().jdbc(jdbcUrl, query, properties);
dataset.write.format(“parquet”).save(“pdfs-path”);
return dataset.count();
}

When I look at spark ui, during write I have stats of records written, visible 
in sql tab under query plan.

While the count itself is a heavy task.

Can someone suggest best way to get count in most optimized way.

Thanks all..


unsubscribe

2016-11-05 Thread junius zhou



why visitCreateFileFormat doesn`t support hive STORED BY ,just support store as

2016-11-05 Thread ????????YDB??????????
why visitCreateFileFormat doesn`t support  hive STORED BY ,just support story 
as 
when i update spark1.6.2 to spark2.0.1  
so what i want to ask is .does it on plan to support hive stored by ?  or never 
support that ?
configureOutputJobProperties is quit important ,is there any other method to 
instand?

 override def visitCreateFileFormat(
  ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
(ctx.fileFormat, ctx.storageHandler) match {
  // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format
  case (c: TableFileFormatContext, null) =>
visitTableFileFormat(c)
  // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | 
AVRO
  case (c: GenericFileFormatContext, null) =>
visitGenericFileFormat(c)
  case (null, storageHandler) =>
operationNotAllowed("STORED BY", ctx)
  case _ =>
throw new ParseException("Expected either STORED AS or STORED BY, not 
both", ctx)
}
  }

SparkLauncer 2.0.1 version working incosistently in yarn-client mode

2016-11-05 Thread Elkhan Dadashov
Hi,

I'm running Spark 2.0.1 version with Spark Launcher 2.0.1 version on Yarn
cluster. I launch map task which spawns Spark job via
SparkLauncher#startApplication().

Deploy mode is yarn-client. I'm running in Mac laptop.

I have this snippet of code:

SparkAppHandle appHandle = sparkLauncher.startApplication();
while (appHandle.getState() == null || !appHandle.getState().isFinal()) {
if (appHandle.getState() != null) {
   * log.info ("while: Spark job state is : " +
appHandle.getState());*
if (appHandle.getAppId() != null) {
log.info("\t App id: " + appHandle.getAppId() + "\tState: " +
appHandle.getState());
}
}
}

The above snippet of code works fine, both spark job and the map task which
spawns that Spark job successfully completes.

But if i comment out the red highlighted line, then the Spark job launches
and finishes successfully, but the map task hangs for a while (in Running
state) and then fails with the exception below.

I run exact same code in exact same environment except that one line
commented out.

When the highlighted line is commented out, I even see the 2nd log line in
the stderr either, it seems appHandle hook never returns back anything
(neither app id nor app state), even though spark application starts, runs
and finishes successfully. Inside the same stderr, i can see Spark job
related logs, and spark job results printed, and application report
indicating status.

You can see the exception below (this is from the stderr of the mapper
container which launches Spark job):
---

INFO: Communication exception: java.net.ConnectException: Call From
/10.3.8.118 to :53567 failed on connection
exception: java.net.ConnectException: Connection refused;

Caused by: java.net.ConnectException: Connection refused

at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)

at
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)

at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)

at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)

at
org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:614)

at
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:712)

at
org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)

at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)

at org.apache.hadoop.ipc.Client.call(Client.java:1451)

... 5 more

---

Nov 05, 2016 2:41:54 AM org.apache.hadoop.ipc.Client handleConnectionFailure

INFO: Retrying connect to server: /10.3.8.118:53567. Already
tried 9 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)

Nov 05, 2016 2:41:54 AM org.apache.hadoop.mapred.Task run

INFO: Communication exception: java.net.ConnectException: Call From
/10.3.8.118 to :53567 failed on connection
exception: java.net.ConnectException: Connection refused; For more details
see:  http://wiki.apache.org/hadoop/ConnectionRefused

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)

at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)

at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)

at org.apache.hadoop.ipc.Client.call(Client.java:1479)

at org.apache.hadoop.ipc.Client.call(Client.java:1412)

at
org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:242)

at com.sun.proxy.$Proxy9.ping(Unknown Source)

at org.apache.hadoop.mapred.Task$TaskReporter.run(Task.java:767)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused

at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)

at
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)

at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)

at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)

at
org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:614)

at
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:712)

at
org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)

at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)

at org.apache.hadoop.ipc.Client.call(Client.java:1451)

... 5 more

---

Nov 05, 2016 2:41:54 AM org.apache.hadoop.mapred.Task