RE: Unable to run simple spark-sql

2019-06-18 Thread Nirmal Kumar
Hi Raymond,

I cross checked hive/conf/hive-site.xml and spark2/conf/hive-site.xml
Same value is being shown by Ambari Hive config.
Seems correct value here:

  
  hive.metastore.warehouse.dir
  /apps/hive/warehouse
 

Problem :
Spark trying to create a local directory under the home directory of hive user 
(/home/hive/).
Why is it referring the local file system and from where?

Thanks,
Nirmal

From: Raymond Honderdors 
Sent: 19 June 2019 11:18
To: Nirmal Kumar 
Cc: user 
Subject: Re: Unable to run simple spark-sql

Hi Nirmal,
i came across the following article 
"https://stackoverflow.com/questions/47497003/why-is-hive-creating-tables-in-the-local-file-system"
(and an updated ref link : 
https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration)
you should check "hive.metastore.warehouse.dir" in hive config files


On Tue, Jun 18, 2019 at 8:09 PM Nirmal Kumar 
mailto:nirmal.ku...@impetus.co.in>> wrote:
Just an update on the thread that it's kerberized.

I'm trying to execute the query with a different user xyz not hive.
Because seems like some permission issue the user xyz trying creating directory 
in /home/hive directory

Do i need some impersonation setting?

Thanks,
Nirmal

Get Outlook for 
Android>


From: Nirmal Kumar
Sent: Tuesday, June 18, 2019 5:56:06 PM
To: Raymond Honderdors; Nirmal Kumar
Cc: user
Subject: RE: Unable to run simple spark-sql

Hi Raymond,

Permission on hdfs is 777
drwxrwxrwx   - impadmin hdfs  0 2019-06-13 16:09 
/home/hive/spark-warehouse


But it’s pointing to a local file system:
Exception in thread "main" java.lang.IllegalStateException: Cannot create 
staging directory  
'file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1'

Thanks,
-Nirmal


From: Raymond Honderdors 
mailto:raymond.honderd...@sizmek.com>>
Sent: 18 June 2019 17:52
To: Nirmal Kumar 
mailto:nirmal.ku...@impetus.co.in>.invalid>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Unable to run simple spark-sql

Hi
Can you check the permission of the user running spark
On the hdfs folder where it tries to create the table

On Tue, Jun 18, 2019, 15:05 Nirmal Kumar 
mailto:nirmal.ku...@impetus.co.in>.invalid>>
 wrote:
Hi List,

I tried running the following sample Java code using Spark2 version 2.0.0 on 
YARN (HDP-2.5.0.0)

public class SparkSQLTest {
  public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().master("yarn")
.config("spark.sql.warehouse.dir", "/apps/hive/warehouse")
.config("hive.metastore.uris", "thrift://x:9083")
.config("spark.driver.extraJavaOptions", "-Dhdp.version=2.5.0.0-1245")

.config("spark.yarn.am

Re: Unable to run simple spark-sql

2019-06-18 Thread Raymond Honderdors
Hi Nirmal,
i came across the following article "
https://stackoverflow.com/questions/47497003/why-is-hive-creating-tables-in-the-local-file-system
"
(and an updated ref link :
https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration
)
you should check "hive.metastore.warehouse.dir" in hive config files


On Tue, Jun 18, 2019 at 8:09 PM Nirmal Kumar 
wrote:

> Just an update on the thread that it's kerberized.
>
> I'm trying to execute the query with a different user xyz not hive.
> Because seems like some permission issue the user xyz trying creating
> directory in /home/hive directory
>
> Do i need some impersonation setting?
>
> Thanks,
> Nirmal
>
> Get Outlook for Android
>
> 
> From: Nirmal Kumar
> Sent: Tuesday, June 18, 2019 5:56:06 PM
> To: Raymond Honderdors; Nirmal Kumar
> Cc: user
> Subject: RE: Unable to run simple spark-sql
>
> Hi Raymond,
>
> Permission on hdfs is 777
> drwxrwxrwx   - impadmin hdfs  0 2019-06-13 16:09
> /home/hive/spark-warehouse
>
>
> But it’s pointing to a local file system:
> Exception in thread "main" java.lang.IllegalStateException: Cannot create
> staging directory
> 'file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1'
>
> Thanks,
> -Nirmal
>
>
> From: Raymond Honderdors 
> Sent: 18 June 2019 17:52
> To: Nirmal Kumar 
> Cc: user 
> Subject: Re: Unable to run simple spark-sql
>
> Hi
> Can you check the permission of the user running spark
> On the hdfs folder where it tries to create the table
>
> On Tue, Jun 18, 2019, 15:05 Nirmal Kumar  .invalid> wrote:
> Hi List,
>
> I tried running the following sample Java code using Spark2 version 2.0.0
> on YARN (HDP-2.5.0.0)
>
> public class SparkSQLTest {
>   public static void main(String[] args) {
> SparkSession sparkSession = SparkSession.builder().master("yarn")
> .config("spark.sql.warehouse.dir", "/apps/hive/warehouse")
> .config("hive.metastore.uris", "thrift://x:9083")
> .config("spark.driver.extraJavaOptions",
> "-Dhdp.version=2.5.0.0-1245")
> .config("spark.yarn.am<
> http://secure-web.cisco.com/1beuiC-aaBQJ0jgI7vONgZiTP5gCokYFEbllyW3ShZVdpQaIuYfuuEuS8iwzhqvwBE8C_E_bBe_7isO-HyPEVX6ZgJajKrQ6oWvTeBQCMjTHVCVImERG2S9qSHrH_mDzf656vrBFxAT1MYZhTZYzXl_3hyZ4BH-XCbKjXrCDyR1OR3tYqqDc7if9NJ1gqHWPwg84tho0__fut2d8y4XxMoMTQNnJzx5367QL6lYV5CFZj055coSLihVVYrh5jBID5jJF40PsrWSvdW7gJ_P6IAN9jTpHFJD7ZrokjlyS7WBAx5Mtnd2KxvNc2O6kKcxk2/http%3A%2F%2Fspark.yarn.am>.extraJavaOptions",
> "-Dhdp.version=2.5.0.0-1245")
> .config("spark.yarn.jars",
> "hdfs:///tmp/lib/spark2/*").enableHiveSupport().getOrCreate();
>
> sparkSession.sql("insert into testdb.employee_orc select * from
> testdb.employee where empid<5");
>   }
> }
>
> I get the following error pointing to a local file system
> (file:/home/hive/spark-warehouse) wondering from where its being picked:
>
> 16:08:21.321 [dispatcher-event-loop-7] INFO
> org.apache.spark.storage.BlockManagerInfo - Added broadcast_0_piece0 in
> memory on 192.168.218.92:40831<
> http://secure-web.cisco.com/18zd_gzhF2N4NeZyolJRHaQMm3mYmE7J-u5p8lbMjuy7lxIZN8zgUUzR8pAzFfMxMiTknORj-329_qyn9tpyQcLejfGKtMK8lhr24CVjsWQVC_YXrT8Ie0c3rifE3KxpJ2y2k58cNtAr0je4JPtzOp6x1HuSmOHLU6CXb80FNn2yi0-PBSRKBHYDJVGU9TlTto9wpY5gkO3U-u7BLR69hXgrqotcSHjzbipPVbI1-HcKKcTbYaEFEqUkM7yy9XJiBfxeqYYJyvstG-5JMJ8Vu8R9DU7gRE0VWMYDNKWPF9KAk_ky4jPHMYHf_DEJimDFI9l0OCyJlELPQs0iw1M6d5g/http%3A%2F%2F192.168.218.92%3A40831>
> (size: 30.6 KB, free: 4.0 GB)
> 16:08:21.322 [main] DEBUG org.apache.spark.storage.BlockManagerMaster -
> Updated info of block broadcast_0_piece0
> 16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Told
> master about block broadcast_0_piece0
> 16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Put
> block broadcast_0_piece0 locally took  4 ms
> 16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Putting
> block broadcast_0_piece0 without replication took  4 ms
> 16:08:21.326 [main] INFO org.apache.spark.SparkContext - Created broadcast
> 0 from sql at SparkSQLTest.java:33
> 16:08:21.449 [main] DEBUG
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable - Created staging
> dir =
> file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1
> for path = file:/home/hive/spark-warehouse/testdb.db/employee_orc
> 16:08:21.451 [main] INFO org.apache.hadoop.hive.common.FileUtils -
> Creating directory if it doesn't exist:
> file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1
> Exception in thread "main" java.lang.IllegalStateException: Cannot create
> staging directory
> 'file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1'
> at
> org.apache.s

Re: Filter cannot be pushed via a Join

2019-06-18 Thread William Wong
Hi Xiao,

Just report this with JIRA SPARK-28103.

https://issues.apache.org/jira/browse/SPARK-28103

Thanks and Regards,
William

On Wed, 19 Jun 2019 at 1:35 AM, Xiao Li  wrote:

> Hi, William,
>
> Thanks for reporting it. Could you open a JIRA?
>
> Cheers,
>
> Xiao
>
> William Wong  于2019年6月18日周二 上午8:57写道:
>
>> BTW, I noticed a workaround is creating a custom rule to remove 'empty
>> local relation' from a union table. However, I am not 100% sure if it is
>> the right approach.
>>
>> On Tue, Jun 18, 2019 at 11:53 PM William Wong 
>> wrote:
>>
>>> Dear all,
>>>
>>> I am not sure if it is something expected or not, and should I report it
>>> as a bug.  Basically, the constraints of a union table could be turned
>>> empty if any subtable is turned into an empty local relation. The side
>>> effect is filter cannot be inferred correctly (by
>>> InferFiltersFromConstrains)
>>>
>>> We may reproduce the issue with the following setup:
>>> 1) Prepare two tables:
>>> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
>>> USING PARQUET");
>>> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
>>> USING PARQUET");
>>>
>>> 2) Create a union view on table1.
>>> * spark.sql("""
>>>  | CREATE VIEW partitioned_table_1 AS
>>>  | SELECT * FROM table1 WHERE id = 'a'
>>>  | UNION ALL
>>>  | SELECT * FROM table1 WHERE id = 'b'
>>>  | UNION ALL
>>>  | SELECT * FROM table1 WHERE id = 'c'
>>>  | UNION ALL
>>>  | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>>  | """.stripMargin)
>>>
>>> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
>>> be inferred. We can see that the constraints of the left table are empty.
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
>>> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>>> Join Inner, (id#0 = id#4)
>>> :- Union
>>> :  :- Filter (isnotnull(id#0) && (id#0 = a))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- LocalRelation , [id#0, val#1]
>>> :  :- LocalRelation , [id#0, val#1]
>>> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
>>> : +- Relation[id#0,val#1] parquet
>>> +- Filter isnotnull(id#4)
>>>+- Relation[id#4,val#5] parquet
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id =
>>> 'a'").queryExecution.optimizedPlan.children(0).constraints
>>> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>>>
>>> 4) Modified the query to avoid empty local relation. The filter 't2.id
>>> in ('a','b','c','d')' is then inferred properly. The constraints of the
>>> left table are not empty as well.
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id IN
>>> ('a','b','c','d')").queryExecution.optimizedPlan
>>> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>>> Join Inner, (id#0 = id#4)
>>> :- Union
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>>> isnotnull(id#0))
>>> : +- Relation[id#0,val#1] parquet
>>> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) ||
>>> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>>>+- Relation[id#4,val#5] parquet
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id IN
>>> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
>>> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
>>> Set(isnotnull(id#0), id#0 IN (a,b,c,d), id#0 = a) || (id#0 = b)) ||
>>> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>>>
>>>
>>> Thanks and regards,
>>> William
>>>
>>>
>>> On Sat, Jun 15, 2019 at 1:13 AM William Wong 
>>> wrote:
>>>
 Hi all,

 Appreciate any expert may help on this strange behavior..

 It is interesting that... I implemented a custom rule to remove empty
 LocalRelation children under Union and run the same query. The filter 'id =
 'a' is inferred to the table2 and pushed via the Join.

 scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2
 WHERE t1.id = t2.id AND t1.id = 'a'").explain
 == Physical Plan ==
 *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
 :- Union
 :  :- *(1) Project [id#0, val#1]
 :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
 :  : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
 true, Format: Parquet, Location:
 InMemoryFileIndex[file:/Users/williamwong/spark-wareho

unsubscribe

2019-06-18 Thread Sheel Pancholi



Re: How to encrypt AK and SK using Livy restAPI to submit sparkJob

2019-06-18 Thread Huizhe Wang
>
> Hi,
>
> I am using Livy RestApI to submit spark job.  I used s3a to replace HDFS.
> I have to write fs.s3a.access.key and fs.s3a.sercet.key directly in
> core-site.xml, as there is no these conf param in Livy API.
> How could I encrypt my AK and SK?
>
> Yours.
> Jane
>


unsubscribe

2019-06-18 Thread SV



Unsubscribe

2019-06-18 Thread SV



Re: tcps oracle connection from spark

2019-06-18 Thread Richard Xin
 and btw, same connection string works fine when used in SQL Developer. 
On Tuesday, June 18, 2019, 03:49:24 PM PDT, Richard Xin 
 wrote:  
 
 HI, I need help with tcps oracle connection from spark (version: 
spark-2.4.0-bin-hadoop2.7)
Properties prop = new Properties();prop.putAll(sparkOracle);  // 
username/password
prop.put("javax.net.ssl.trustStore", "path to 
root.jks");prop.put("javax.net.ssl.trustStorePassword", "password_here");
df.write().mode(SaveMode.Append).option("driver", 
"oracle.jdbc.driver.OracleDriver").jdbc("jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=tcps)(HOST=host.mycomapny.com)(PORT=1234)))(CONNECT_DATA=(SERVICE_NAME=service_name)))","tableName",
 prop);

note "PROTOCOL=tcps" in the connection string. 
The code worked fine for "tcp" hosts, but some of our servers use "tcps" only, 
I got following errors when hitting oracld tcps hosts, can someone shed some 
light? Thanks a lot!
Exception in thread "main" java.sql.SQLRecoverableException: IO Error: Remote 
host terminated the handshake at 
oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:682) at 
oracle.jdbc.driver.PhysicalConnection.(PhysicalConnection.java:715) at 
oracle.jdbc.driver.T4CConnection.(T4CConnection.java:385) at 
oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:30) 
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:564) at 
org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48)
 at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
 at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
 at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
 at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
 at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
 at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) 
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) 
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) at 
org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:506) at 
com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:103)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: 
javax.net.ssl.SSLHandshakeException: Remote host terminated the handshake at 
java.base/sun.security.ssl.SSLSocketImpl.handleEOF(SSLSocke

tcps oracle connection from spark

2019-06-18 Thread Richard Xin
HI, I need help with tcps oracle connection from spark (version: 
spark-2.4.0-bin-hadoop2.7)
Properties prop = new Properties();prop.putAll(sparkOracle);  // 
username/password
prop.put("javax.net.ssl.trustStore", "path to 
root.jks");prop.put("javax.net.ssl.trustStorePassword", "password_here");
df.write().mode(SaveMode.Append).option("driver", 
"oracle.jdbc.driver.OracleDriver").jdbc("jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=tcps)(HOST=host.mycomapny.com)(PORT=1234)))(CONNECT_DATA=(SERVICE_NAME=service_name)))","tableName",
 prop);

note "PROTOCOL=tcps" in the connection string. 
The code worked fine for "tcp" hosts, but some of our servers use "tcps" only, 
I got following errors when hitting oracld tcps hosts, can someone shed some 
light? Thanks a lot!
Exception in thread "main" java.sql.SQLRecoverableException: IO Error: Remote 
host terminated the handshake at 
oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:682) at 
oracle.jdbc.driver.PhysicalConnection.(PhysicalConnection.java:715) at 
oracle.jdbc.driver.T4CConnection.(T4CConnection.java:385) at 
oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:30) 
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:564) at 
org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48)
 at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
 at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
 at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
 at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
 at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
 at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) 
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) 
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) at 
org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:506) at 
com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:103)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: 
javax.net.ssl.SSLHandshakeException: Remote host terminated the handshake at 
java.base/sun.security.ssl.SSLSocketImpl.handleEOF(SSLSocketImpl.java:1321) at 
java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1160) at 
java.base/sun.security.ssl.SSLSocketImpl.readHand

Reading JSON RDD in Spark Streaming

2019-06-18 Thread Mich Talebzadeh
Hi,

I have prices coming through Kafka in the following format

key,{JSON data}

The key is needed as part of data post to NoSQL database like Aerospike.

The following is record of topic from Kafka

ba7e6bdc-2a92-4dc3-8e28-a75e1a7d58f2,{"rowkey":"ba7e6bdc-2a92-4dc3-8e28-a75e1a7d58f2","ticker":"SBRY",
"timeissued":"2019-06-18T22:10:26", "price":555.75}

The "key":"value" pairs inside {} are valid JSON as shown below in JSONLint

https://jsonlint.com/

{
 "rowkey": "ba7e6bdc-2a92-4dc3-8e28-a75e1a7d58f2",
 "ticker": "SBRY",
 "timeissued": "2019-06-18T22:10:26",
 "price": 555.75
}

Now I need to extract values from this JSON.

One way would be to go through dstream

dstream.foreachRDD
{ pricesRDD =>
  if (!pricesRDD.isEmpty)  // data exists in RDD
  {
 for(row <- pricesRDD.collect.toArray)
 {
   println(row)
   println(row._2.split(',').view(0).toString)
println(row._2.split(',').view(1).split(':').view(1).toString)
println(row._2.split(',').view(2).split(':').view(1).toString)
println(row._2.split(',').view(3).split(':').view(1).toString)

And I get hit and miss as shown in the sample below with incorrect parsing


(ba7e6bdc-2a92-4dc3-8e28-a75e1a7d58f2,{"rowkey":"ba7e6bdc-2a92-4dc3-8e28-a75e1a7d58f2","ticker":"SBRY",
"timeissued":"2019-06-18T22:10:26", "price":555.75})
{"rowkey":"ba7e6bdc-2a92-4dc3-8e28-a75e1a7d58f2"
"SBRY"  //corrrect
"2019-06-18T22  // missing half
555.75}  // incorrect

Is there any way reading JSON data systematically?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


GC problem doing fuzzy join

2019-06-18 Thread Arun Luthra
I'm trying to do a brute force fuzzy join where I compare N records against
N other records, for N^2 total comparisons.

The table is medium size and fits in memory, so I collect it and put it
into a broadcast variable.

The other copy of the table is in an RDD. I am basically calling the RDD
map operation, and each record in the RDD takes the broadcasted table and
FILTERS it. There appears to be large GC happening, so I suspect that huge
repeated data deletion of copies of the broadcast table is causing GC.

Is there a way to fix this pattern?

Thanks,
Arun


Re: Filter cannot be pushed via a Join

2019-06-18 Thread Xiao Li
Hi, William,

Thanks for reporting it. Could you open a JIRA?

Cheers,

Xiao

William Wong  于2019年6月18日周二 上午8:57写道:

> BTW, I noticed a workaround is creating a custom rule to remove 'empty
> local relation' from a union table. However, I am not 100% sure if it is
> the right approach.
>
> On Tue, Jun 18, 2019 at 11:53 PM William Wong 
> wrote:
>
>> Dear all,
>>
>> I am not sure if it is something expected or not, and should I report it
>> as a bug.  Basically, the constraints of a union table could be turned
>> empty if any subtable is turned into an empty local relation. The side
>> effect is filter cannot be inferred correctly (by
>> InferFiltersFromConstrains)
>>
>> We may reproduce the issue with the following setup:
>> 1) Prepare two tables:
>> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
>> USING PARQUET");
>> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
>> USING PARQUET");
>>
>> 2) Create a union view on table1.
>> * spark.sql("""
>>  | CREATE VIEW partitioned_table_1 AS
>>  | SELECT * FROM table1 WHERE id = 'a'
>>  | UNION ALL
>>  | SELECT * FROM table1 WHERE id = 'b'
>>  | UNION ALL
>>  | SELECT * FROM table1 WHERE id = 'c'
>>  | UNION ALL
>>  | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>  | """.stripMargin)
>>
>> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
>> be inferred. We can see that the constraints of the left table are empty.
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
>> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>> Join Inner, (id#0 = id#4)
>> :- Union
>> :  :- Filter (isnotnull(id#0) && (id#0 = a))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  :- LocalRelation , [id#0, val#1]
>> :  :- LocalRelation , [id#0, val#1]
>> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
>> : +- Relation[id#0,val#1] parquet
>> +- Filter isnotnull(id#4)
>>+- Relation[id#4,val#5] parquet
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id =
>> 'a'").queryExecution.optimizedPlan.children(0).constraints
>> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>>
>> 4) Modified the query to avoid empty local relation. The filter 't2.id
>> in ('a','b','c','d')' is then inferred properly. The constraints of the
>> left table are not empty as well.
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id IN
>> ('a','b','c','d')").queryExecution.optimizedPlan
>> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>> Join Inner, (id#0 = id#4)
>> :- Union
>> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>> isnotnull(id#0))
>> : +- Relation[id#0,val#1] parquet
>> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) ||
>> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>>+- Relation[id#4,val#5] parquet
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id IN
>> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
>> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
>> Set(isnotnull(id#0), id#0 IN (a,b,c,d), id#0 = a) || (id#0 = b)) ||
>> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>>
>>
>> Thanks and regards,
>> William
>>
>>
>> On Sat, Jun 15, 2019 at 1:13 AM William Wong 
>> wrote:
>>
>>> Hi all,
>>>
>>> Appreciate any expert may help on this strange behavior..
>>>
>>> It is interesting that... I implemented a custom rule to remove empty
>>> LocalRelation children under Union and run the same query. The filter 'id =
>>> 'a' is inferred to the table2 and pushed via the Join.
>>>
>>> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id = 'a'").explain
>>> == Physical Plan ==
>>> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
>>> :- Union
>>> :  :- *(1) Project [id#0, val#1]
>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>> :  : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>> ReadSchema: struct
>>> :  +- *(2) Project [id#0, val#1]
>>> : +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0
>>> = a))
>>> :+- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>

Re: Unable to run simple spark-sql

2019-06-18 Thread Nirmal Kumar
Just an update on the thread that it's kerberized.

I'm trying to execute the query with a different user xyz not hive.
Because seems like some permission issue the user xyz trying creating directory 
in /home/hive directory

Do i need some impersonation setting?

Thanks,
Nirmal

Get Outlook for Android


From: Nirmal Kumar
Sent: Tuesday, June 18, 2019 5:56:06 PM
To: Raymond Honderdors; Nirmal Kumar
Cc: user
Subject: RE: Unable to run simple spark-sql

Hi Raymond,

Permission on hdfs is 777
drwxrwxrwx   - impadmin hdfs  0 2019-06-13 16:09 
/home/hive/spark-warehouse


But it’s pointing to a local file system:
Exception in thread "main" java.lang.IllegalStateException: Cannot create 
staging directory  
'file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1'

Thanks,
-Nirmal


From: Raymond Honderdors 
Sent: 18 June 2019 17:52
To: Nirmal Kumar 
Cc: user 
Subject: Re: Unable to run simple spark-sql

Hi
Can you check the permission of the user running spark
On the hdfs folder where it tries to create the table

On Tue, Jun 18, 2019, 15:05 Nirmal Kumar 
mailto:nirmal.ku...@impetus.co.in.invalid>> 
wrote:
Hi List,

I tried running the following sample Java code using Spark2 version 2.0.0 on 
YARN (HDP-2.5.0.0)

public class SparkSQLTest {
  public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().master("yarn")
.config("spark.sql.warehouse.dir", "/apps/hive/warehouse")
.config("hive.metastore.uris", "thrift://x:9083")
.config("spark.driver.extraJavaOptions", "-Dhdp.version=2.5.0.0-1245")

.config("spark.yarn.am.extraJavaOptions",
 "-Dhdp.version=2.5.0.0-1245")
.config("spark.yarn.jars", 
"hdfs:///tmp/lib/spark2/*").enableHiveSupport().getOrCreate();

sparkSession.sql("insert into testdb.employee_orc select * from 
testdb.employee where empid<5");
  }
}

I get the following error pointing to a local file system 
(file:/home/hive/spark-warehouse) wondering from where its being picked:

16:08:21.321 [dispatcher-event-loop-7] INFO 
org.apache.spark.storage.BlockManagerInfo - Added broadcast_0_piece0 in memory 
on 
192.168.218.92:40831
 (size: 30.6 KB, free: 4.0 GB)
16:08:21.322 [main] DEBUG org.apache.spark.storage.BlockManagerMaster - Updated 
info of block broadcast_0_piece0
16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Told master 
about block broadcast_0_piece0
16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Put block 
broadcast_0_piece0 locally took  4 ms
16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Putting block 
broadcast_0_piece0 without replication took  4 ms
16:08:21.326 [main] INFO org.apache.spark.SparkContext - Created broadcast 0 
from sql at SparkSQLTest.java:33
16:08:21.449 [main] DEBUG 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable - Created staging dir = 
file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1
 for path = file:/home/hive/spark-warehouse/testdb.db/employee_orc
16:08:21.451 [main] INFO org.apache.hadoop.hive.common.FileUtils - Creating 
directory if it doesn't exist: 
file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1
Exception in thread "main" java.lang.IllegalStateException: Cannot create 
staging directory  
'file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1'
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getStagingDir(InsertIntoHiveTable.scala:83)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getExternalScratchDir(InsertIntoHiveTable.scala:97)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getExternalTmpPath(InsertIntoHiveTable.scala:105)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:148)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveT

Re: Filter cannot be pushed via a Join

2019-06-18 Thread William Wong
BTW, I noticed a workaround is creating a custom rule to remove 'empty
local relation' from a union table. However, I am not 100% sure if it is
the right approach.

On Tue, Jun 18, 2019 at 11:53 PM William Wong  wrote:

> Dear all,
>
> I am not sure if it is something expected or not, and should I report it
> as a bug.  Basically, the constraints of a union table could be turned
> empty if any subtable is turned into an empty local relation. The side
> effect is filter cannot be inferred correctly (by
> InferFiltersFromConstrains)
>
> We may reproduce the issue with the following setup:
> 1) Prepare two tables:
> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
> USING PARQUET");
> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
> USING PARQUET");
>
> 2) Create a union view on table1.
> * spark.sql("""
>  | CREATE VIEW partitioned_table_1 AS
>  | SELECT * FROM table1 WHERE id = 'a'
>  | UNION ALL
>  | SELECT * FROM table1 WHERE id = 'b'
>  | UNION ALL
>  | SELECT * FROM table1 WHERE id = 'c'
>  | UNION ALL
>  | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>  | """.stripMargin)
>
> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
> be inferred. We can see that the constraints of the left table are empty.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Join Inner, (id#0 = id#4)
> :- Union
> :  :- Filter (isnotnull(id#0) && (id#0 = a))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- LocalRelation , [id#0, val#1]
> :  :- LocalRelation , [id#0, val#1]
> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
> : +- Relation[id#0,val#1] parquet
> +- Filter isnotnull(id#4)
>+- Relation[id#4,val#5] parquet
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id =
> 'a'").queryExecution.optimizedPlan.children(0).constraints
> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>
> 4) Modified the query to avoid empty local relation. The filter 't2.id in
> ('a','b','c','d')' is then inferred properly. The constraints of the left
> table are not empty as well.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN
> ('a','b','c','d')").queryExecution.optimizedPlan
> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Join Inner, (id#0 = id#4)
> :- Union
> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
> isnotnull(id#0))
> : +- Relation[id#0,val#1] parquet
> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) ||
> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>+- Relation[id#4,val#5] parquet
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN
> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
> Set(isnotnull(id#0), id#0 IN (a,b,c,d), id#0 = a) || (id#0 = b)) ||
> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>
>
> Thanks and regards,
> William
>
>
> On Sat, Jun 15, 2019 at 1:13 AM William Wong 
> wrote:
>
>> Hi all,
>>
>> Appreciate any expert may help on this strange behavior..
>>
>> It is interesting that... I implemented a custom rule to remove empty
>> LocalRelation children under Union and run the same query. The filter 'id =
>> 'a' is inferred to the table2 and pushed via the Join.
>>
>> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id = 'a'").explain
>> == Physical Plan ==
>> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
>> :- Union
>> :  :- *(1) Project [id#0, val#1]
>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>> :  : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>> ReadSchema: struct
>> :  +- *(2) Project [id#0, val#1]
>> : +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
>> a))
>> :+- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>> EqualTo(id,a)], ReadSchema: struct
>> +- BroadcastExch

Re: Filter cannot be pushed via a Join

2019-06-18 Thread William Wong
Dear all,

I am not sure if it is something expected or not, and should I report it as
a bug.  Basically, the constraints of a union table could be turned empty
if any subtable is turned into an empty local relation. The side effect is
filter cannot be inferred correctly (by InferFiltersFromConstrains)

We may reproduce the issue with the following setup:
1) Prepare two tables:
* spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) USING
PARQUET");
* spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) USING
PARQUET");

2) Create a union view on table1.
* spark.sql("""
 | CREATE VIEW partitioned_table_1 AS
 | SELECT * FROM table1 WHERE id = 'a'
 | UNION ALL
 | SELECT * FROM table1 WHERE id = 'b'
 | UNION ALL
 | SELECT * FROM table1 WHERE id = 'c'
 | UNION ALL
 | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
 | """.stripMargin)

3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot be
inferred. We can see that the constraints of the left table are empty.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter (isnotnull(id#0) && (id#0 = a))
:  :  +- Relation[id#0,val#1] parquet
:  :- LocalRelation , [id#0, val#1]
:  :- LocalRelation , [id#0, val#1]
:  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
: +- Relation[id#0,val#1] parquet
+- Filter isnotnull(id#4)
   +- Relation[id#4,val#5] parquet

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id =
'a'").queryExecution.optimizedPlan.children(0).constraints
res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()

4) Modified the query to avoid empty local relation. The filter 't2.id in
('a','b','c','d')' is then inferred properly. The constraints of the left
table are not empty as well.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id IN ('a','b','c','d')").queryExecution.optimizedPlan
res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && isnotnull(id#0))
: +- Relation[id#0,val#1] parquet
+- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4
= b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
   +- Relation[id#4,val#5] parquet

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id IN
('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
Set(isnotnull(id#0), id#0 IN (a,b,c,d), id#0 = a) || (id#0 = b)) ||
(id#0 = c)) || NOT id#0 IN (a,b,c)))


Thanks and regards,
William


On Sat, Jun 15, 2019 at 1:13 AM William Wong  wrote:

> Hi all,
>
> Appreciate any expert may help on this strange behavior..
>
> It is interesting that... I implemented a custom rule to remove empty
> LocalRelation children under Union and run the same query. The filter 'id =
> 'a' is inferred to the table2 and pushed via the Join.
>
> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id = 'a'").explain
> == Physical Plan ==
> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
> :- Union
> :  :- *(1) Project [id#0, val#1]
> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
> :  : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
> ReadSchema: struct
> :  +- *(2) Project [id#0, val#1]
> : +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
> a))
> :+- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
> EqualTo(id,a)], ReadSchema: struct
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>+- *(3) Project [id#4, val#5]
>   +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
>  +- *(3) FileScan parquet default.table2[id#4,val#5] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
> PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
> Rea

Unsubscribe

2019-06-18 Thread gopal kulkarni



Re: [External Sender] Re: Spark 2.4.1 on Kubernetes - DNS resolution of driver fails

2019-06-18 Thread Prudhvi Chennuru (CONT)
Thanks for the response Oliver.

I am facing this issue intermittently, once in a while i don't see service
being created for the respective spark driver(* i don't see service for
that driver on kubernetes dashboard and not even via kubectl but in driver
logs i see the service endpoint*) and by default driver requests for
executors in a batch of 5 as soon as 5 executors are created they fail with
below error.


Caused by: java.io.IOException: Failed to connect to
group9990-features-282526d440ab3f12a68746fbef289c95-driver-svc.experimental.svc:7078
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException:
group9990-features-282526d440ab3f12a68746fbef289c95-driver-svc.experimental.svc
at java.net.InetAddress.getAllByName0(InetAddress.java:1280)
at java.net.InetAddress.getAllByName(InetAddress.java:1192)
at java.net.InetAddress.getAllByName(InetAddress.java:1126)
at java.net.InetAddress.getByName(InetAddress.java:1076)

Did you face the same problem or were you able to see the service for the
driver pod on your cluster?


On Tue, Jun 18, 2019 at 8:00 AM Jose Luis Pedrosa <
jose.pedr...@microsoft.com> wrote:

> Hi guys
>
>
>
> There’s also an interesting one that we found in a similar case. In our
> case the service ip ranges takes more time to be reachable, so DNS was
> timing out. The approach that I was suggesting was:
>
>1. Add retries in the connection from the executor to the driver:
>https://github.com/apache/spark/pull/24702
>
> 
>2. Disable negative DNS caching at JVM level, on the entrypoint.sh
>
>
>
> JL
>
>
>
>
>
> *From: *Olivier Girardot 
> *Date: *Tuesday 18 June 2019 at 10:06
> *To: *"Prudhvi Chennuru (CONT)" 
> *Cc: *Li Gao , dev , user <
> user@spark.apache.org>
> *Subject: *Re: [External Sender] Re: Spark 2.4.1 on Kubernetes - DNS
> resolution of driver fails
>
>
>
> Hi Prudhvi,
>
> not really but we took a drastic approach mitigating this, modifying the
> bundled launch script to be more resilient.
>
> In the kubernetes/dockerfiles/spark/entrypoint.sh in the executor case we
> added something like that :
>
>
>
>   executor)
>
> DRIVER_HOST=$(echo $SPARK_DRIVER_URL | cut -d "@" -f 2 | cut -d ":"
> -f 1)
>
> DRIVER_PORT=$(echo $SPARK_DRIVER_URL | cut -d "@" -f 2 | cut -d ":"
> -f 2)
>
>
>
> for i in $(seq 1 20);
>
> do
>
>   nc -zvw1 $DRIVER_HOST $DRIVER_PORT
>
>   status=$?
>
>   if [ $status -eq 0 ]
>
>   then
>
> echo "Driver is accessible, let's rock'n'roll."
>
> break
>
>   else
>
> echo "Driver not accessible :-| napping for a while..."
>
> sleep 3
>
>   fi
>
> done
>
> CMD=(
>
>   ${JAVA_HOME}/bin/java
>
> 
>
>
>
> That way the executor will not start before the driver is really
> connectable.
>
> That's kind of a hack but we did not experience the issue anymore, so I
> guess I'll keep it for now.
>
>
>
> Regards,
>
>
>
> Olivier.
>
>
>
> Le mar. 11 juin 2019 à 18:23, Prudhvi Chennuru (CONT) <
> prudhvi.chenn...@capitalone.com> a écrit :
>
> Hey Oliver,
>
>
>
>  I am also facing the same issue on my kubernetes
> cluster(v1.11.5)  on AWS with spark version 2.3.3, any luck in figuring out
> the root cause?
>
>
>
> On Fri, May 3, 2019 at 5:37 AM Olivier Girardot <
> o.girar...@lateral-thoughts.com> wrote:
>
> Hi,
>
> I did not try on another vendor, so I can't say if it's only related to
> gke, and no, I did not notice anything on the kubelet or kube-dns
> processes...
>
>
>
> Regards
>
>
>
> Le ven. 3 mai 2019 à 03:05, Li Gao  a écrit :
>
> hi Olivier,
>
>
>
> This seems a GKE specific issue? have you tried on other vendors ? Also on
> the kubelet nodes did you notice any pressure on the DNS side?
>
>
>
> Li
>
>
>
>
>
> On Mon, Apr 29, 2019, 5:43 AM Olivier Girardot <
> o.girar...@lateral-thoughts.com> wrote:
>
> Hi everyone,
>
> I have ~300 spark job on Kubernetes (GKE) using the cluster auto-scaler,
> and sometimes while running these jobs a pretty bad thing happens, the
> driver (in cluster mode) gets scheduled on Kubern

unsubscribe

2019-06-18 Thread Matteo Bovetti


[agilelab_logo]
Matteo Bovetti
Big Data Engineer & DevOps
Mobile: +39 333 290 1242
Email: matteo.bove...@agilelab.it
Site: www.agilelab.it




RE: Unable to run simple spark-sql

2019-06-18 Thread Nirmal Kumar
Hi Raymond,

Permission on hdfs is 777
drwxrwxrwx   - impadmin hdfs  0 2019-06-13 16:09 
/home/hive/spark-warehouse


But it’s pointing to a local file system:
Exception in thread "main" java.lang.IllegalStateException: Cannot create 
staging directory  
'file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1'

Thanks,
-Nirmal


From: Raymond Honderdors 
Sent: 18 June 2019 17:52
To: Nirmal Kumar 
Cc: user 
Subject: Re: Unable to run simple spark-sql

Hi
Can you check the permission of the user running spark
On the hdfs folder where it tries to create the table

On Tue, Jun 18, 2019, 15:05 Nirmal Kumar 
mailto:nirmal.ku...@impetus.co.in.invalid>> 
wrote:
Hi List,

I tried running the following sample Java code using Spark2 version 2.0.0 on 
YARN (HDP-2.5.0.0)

public class SparkSQLTest {
  public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().master("yarn")
.config("spark.sql.warehouse.dir", "/apps/hive/warehouse")
.config("hive.metastore.uris", "thrift://x:9083")
.config("spark.driver.extraJavaOptions", "-Dhdp.version=2.5.0.0-1245")

.config("spark.yarn.am.extraJavaOptions",
 "-Dhdp.version=2.5.0.0-1245")
.config("spark.yarn.jars", 
"hdfs:///tmp/lib/spark2/*").enableHiveSupport().getOrCreate();

sparkSession.sql("insert into testdb.employee_orc select * from 
testdb.employee where empid<5");
  }
}

I get the following error pointing to a local file system 
(file:/home/hive/spark-warehouse) wondering from where its being picked:

16:08:21.321 [dispatcher-event-loop-7] INFO 
org.apache.spark.storage.BlockManagerInfo - Added broadcast_0_piece0 in memory 
on 
192.168.218.92:40831
 (size: 30.6 KB, free: 4.0 GB)
16:08:21.322 [main] DEBUG org.apache.spark.storage.BlockManagerMaster - Updated 
info of block broadcast_0_piece0
16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Told master 
about block broadcast_0_piece0
16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Put block 
broadcast_0_piece0 locally took  4 ms
16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Putting block 
broadcast_0_piece0 without replication took  4 ms
16:08:21.326 [main] INFO org.apache.spark.SparkContext - Created broadcast 0 
from sql at SparkSQLTest.java:33
16:08:21.449 [main] DEBUG 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable - Created staging dir = 
file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1
 for path = file:/home/hive/spark-warehouse/testdb.db/employee_orc
16:08:21.451 [main] INFO org.apache.hadoop.hive.common.FileUtils - Creating 
directory if it doesn't exist: 
file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1
Exception in thread "main" java.lang.IllegalStateException: Cannot create 
staging directory  
'file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1'
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getStagingDir(InsertIntoHiveTable.scala:83)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getExternalScratchDir(InsertIntoHiveTable.scala:97)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getExternalTmpPath(InsertIntoHiveTable.scala:105)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:148)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:142)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:313)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala

Re: Unable to run simple spark-sql

2019-06-18 Thread Raymond Honderdors
Hi
Can you check the permission of the user running spark
On the hdfs folder where it tries to create the table

On Tue, Jun 18, 2019, 15:05 Nirmal Kumar 
wrote:

> Hi List,
>
> I tried running the following sample Java code using Spark2 version 2.0.0
> on YARN (HDP-2.5.0.0)
>
> public class SparkSQLTest {
>   public static void main(String[] args) {
> SparkSession sparkSession = SparkSession.builder().master("yarn")
> .config("spark.sql.warehouse.dir", "/apps/hive/warehouse")
> .config("hive.metastore.uris", "thrift://x:9083")
> .config("spark.driver.extraJavaOptions",
> "-Dhdp.version=2.5.0.0-1245")
> .config("spark.yarn.am.extraJavaOptions",
> "-Dhdp.version=2.5.0.0-1245")
> .config("spark.yarn.jars",
> "hdfs:///tmp/lib/spark2/*").enableHiveSupport().getOrCreate();
>
> sparkSession.sql("insert into testdb.employee_orc select * from
> testdb.employee where empid<5");
>   }
> }
>
> I get the following error pointing to a local file system
> (file:/home/hive/spark-warehouse) wondering from where its being picked:
>
> 16:08:21.321 [dispatcher-event-loop-7] INFO
> org.apache.spark.storage.BlockManagerInfo - Added broadcast_0_piece0 in
> memory on 192.168.218.92:40831 (size: 30.6 KB, free: 4.0 GB)
> 16:08:21.322 [main] DEBUG org.apache.spark.storage.BlockManagerMaster -
> Updated info of block broadcast_0_piece0
> 16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Told
> master about block broadcast_0_piece0
> 16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Put
> block broadcast_0_piece0 locally took  4 ms
> 16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Putting
> block broadcast_0_piece0 without replication took  4 ms
> 16:08:21.326 [main] INFO org.apache.spark.SparkContext - Created broadcast
> 0 from sql at SparkSQLTest.java:33
> 16:08:21.449 [main] DEBUG
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable - Created staging
> dir =
> file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1
> for path = file:/home/hive/spark-warehouse/testdb.db/employee_orc
> 16:08:21.451 [main] INFO org.apache.hadoop.hive.common.FileUtils -
> Creating directory if it doesn't exist:
> file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1
> Exception in thread "main" java.lang.IllegalStateException: Cannot create
> staging directory
> 'file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1'
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getStagingDir(InsertIntoHiveTable.scala:83)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getExternalScratchDir(InsertIntoHiveTable.scala:97)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getExternalTmpPath(InsertIntoHiveTable.scala:105)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:148)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:142)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:313)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
> at org.apache.spark.sql.Dataset.(Dataset.scala:186)
>at org.apache.spark.sql.Dataset.(Dataset.scala:167)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
> at
> com..xxx.xxx.xxx..SparkSQLTest.main(SparkSQLTest.java:33)
> 16:08:21.454 [pool-8-thread-1] INFO org.apache.spark.SparkContext -
> Invoking stop() from shutdown hook
> 16:08:21.455 [pool-8-thread-1] DEBUG
> org.spark_project.jetty.util.component.AbstractLifeCycle - stopping
> org.spark_project.jetty.server.Server@620aa4ea
> 16:08:21.455 [pool-8-thread-1] DEBUG org.spark_project.jetty.server.Server
> - Graceful shutdown org.spark_project.jetty.server.Server@620aa4ea by
>
> Thanks,
> -Nirmal
>
> 
>
>
>
>
>
>
> NOTE: This message may contain information that i

Unable to run simple spark-sql

2019-06-18 Thread Nirmal Kumar
Hi List,

I tried running the following sample Java code using Spark2 version 2.0.0 on 
YARN (HDP-2.5.0.0)

public class SparkSQLTest {
  public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().master("yarn")
.config("spark.sql.warehouse.dir", "/apps/hive/warehouse")
.config("hive.metastore.uris", "thrift://x:9083")
.config("spark.driver.extraJavaOptions", "-Dhdp.version=2.5.0.0-1245")
.config("spark.yarn.am.extraJavaOptions", "-Dhdp.version=2.5.0.0-1245")
.config("spark.yarn.jars", 
"hdfs:///tmp/lib/spark2/*").enableHiveSupport().getOrCreate();

sparkSession.sql("insert into testdb.employee_orc select * from 
testdb.employee where empid<5");
  }
}

I get the following error pointing to a local file system 
(file:/home/hive/spark-warehouse) wondering from where its being picked:

16:08:21.321 [dispatcher-event-loop-7] INFO 
org.apache.spark.storage.BlockManagerInfo - Added broadcast_0_piece0 in memory 
on 192.168.218.92:40831 (size: 30.6 KB, free: 4.0 GB)
16:08:21.322 [main] DEBUG org.apache.spark.storage.BlockManagerMaster - Updated 
info of block broadcast_0_piece0
16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Told master 
about block broadcast_0_piece0
16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Put block 
broadcast_0_piece0 locally took  4 ms
16:08:21.323 [main] DEBUG org.apache.spark.storage.BlockManager - Putting block 
broadcast_0_piece0 without replication took  4 ms
16:08:21.326 [main] INFO org.apache.spark.SparkContext - Created broadcast 0 
from sql at SparkSQLTest.java:33
16:08:21.449 [main] DEBUG 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable - Created staging dir = 
file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1
 for path = file:/home/hive/spark-warehouse/testdb.db/employee_orc
16:08:21.451 [main] INFO org.apache.hadoop.hive.common.FileUtils - Creating 
directory if it doesn't exist: 
file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1
Exception in thread "main" java.lang.IllegalStateException: Cannot create 
staging directory  
'file:/home/hive/spark-warehouse/testdb.db/employee_orc/.hive-staging_hive_2019-06-18_16-08-21_448_1691186175028734135-1'
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getStagingDir(InsertIntoHiveTable.scala:83)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getExternalScratchDir(InsertIntoHiveTable.scala:97)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getExternalTmpPath(InsertIntoHiveTable.scala:105)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:148)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:142)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:313)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at org.apache.spark.sql.Dataset.(Dataset.scala:186)
   at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
at com..xxx.xxx.xxx..SparkSQLTest.main(SparkSQLTest.java:33)
16:08:21.454 [pool-8-thread-1] INFO org.apache.spark.SparkContext - Invoking 
stop() from shutdown hook
16:08:21.455 [pool-8-thread-1] DEBUG 
org.spark_project.jetty.util.component.AbstractLifeCycle - stopping 
org.spark_project.jetty.server.Server@620aa4ea
16:08:21.455 [pool-8-thread-1] DEBUG org.spark_project.jetty.server.Server - 
Graceful shutdown org.spark_project.jetty.server.Server@620aa4ea by

Thanks,
-Nirmal








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the inte

How to encrypt AK and SK using Livy restAPI to submit sparkJob

2019-06-18 Thread Huizhe Wang
Hi,

I am using Livy RestApI to submit spark job.  I used s3a to replace HDFS. I
have to write fs.s3a.access.key and fs.s3a.sercet.key directly in
core-site.xml, as there is no these conf param in Livy API.
How could I encrypt my AK and SK?

Yours.
Jane


Re: [External Sender] Re: Spark 2.4.1 on Kubernetes - DNS resolution of driver fails

2019-06-18 Thread Olivier Girardot
Hi Prudhvi,
not really but we took a drastic approach mitigating this, modifying the
bundled launch script to be more resilient.
In the kubernetes/dockerfiles/spark/entrypoint.sh in the executor case we
added something like that :

  executor)

DRIVER_HOST=$(echo $SPARK_DRIVER_URL | cut -d "@" -f 2 | cut -d ":" -f 1
)

DRIVER_PORT=$(echo $SPARK_DRIVER_URL | cut -d "@" -f 2 | cut -d ":" -f 2
)


for i in $(seq 1 20);

do

  nc -zvw1 $DRIVER_HOST $DRIVER_PORT

  status=$?

  if [ $status -eq 0 ]

  then

echo "Driver is accessible, let's rock'n'roll."

break

  else

echo "Driver not accessible :-| napping for a while..."

sleep 3

  fi

done

CMD=(

  ${JAVA_HOME}/bin/java




That way the executor will not start before the driver is really
connectable.
That's kind of a hack but we did not experience the issue anymore, so I
guess I'll keep it for now.

Regards,

Olivier.

Le mar. 11 juin 2019 à 18:23, Prudhvi Chennuru (CONT) <
prudhvi.chenn...@capitalone.com> a écrit :

> Hey Oliver,
>
>  I am also facing the same issue on my kubernetes
> cluster(v1.11.5)  on AWS with spark version 2.3.3, any luck in figuring out
> the root cause?
>
> On Fri, May 3, 2019 at 5:37 AM Olivier Girardot <
> o.girar...@lateral-thoughts.com> wrote:
>
>> Hi,
>> I did not try on another vendor, so I can't say if it's only related to
>> gke, and no, I did not notice anything on the kubelet or kube-dns
>> processes...
>>
>> Regards
>>
>> Le ven. 3 mai 2019 à 03:05, Li Gao  a écrit :
>>
>>> hi Olivier,
>>>
>>> This seems a GKE specific issue? have you tried on other vendors ? Also
>>> on the kubelet nodes did you notice any pressure on the DNS side?
>>>
>>> Li
>>>
>>>
>>> On Mon, Apr 29, 2019, 5:43 AM Olivier Girardot <
>>> o.girar...@lateral-thoughts.com> wrote:
>>>
 Hi everyone,
 I have ~300 spark job on Kubernetes (GKE) using the cluster
 auto-scaler, and sometimes while running these jobs a pretty bad thing
 happens, the driver (in cluster mode) gets scheduled on Kubernetes and
 launches many executor pods.
 So far so good, but the k8s "Service" associated to the driver does not
 seem to be propagated in terms of DNS resolution so all the executor fails
 with a "spark-application-..cluster.svc.local" does not exists.

 All executors failing the driver should be failing too, but it
 considers that it's a "pending" initial allocation and stay stuck forever
 in a loop of "Initial job has not accepted any resources, please check
 Cluster UI"

 Has anyone else observed this king of behaviour ?
 We had it on 2.3.1 and I upgraded to 2.4.1 but this issue still seems
 to exist even after the "big refactoring" in the kubernetes cluster
 scheduler backend.

 I can work on a fix / workaround but I'd like to check with you the
 proper way forward :

- Some processes (like the airflow helm recipe) rely on a "sleep
30s" before launching the dependent pods (that could be added to
/opt/entrypoint.sh used in the kubernetes packing)
- We can add a simple step to the init container trying to do the
DNS resolution and failing after 60s if it did not work

 But these steps won't change the fact that the driver will stay stuck
 thinking we're still in the case of the Initial allocation delay.

 Thoughts ?

 --
 *Olivier Girardot*
 o.girar...@lateral-thoughts.com

>>>
>
> --
> *Thanks,*
> *Prudhvi Chennuru.*
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


-- 
*Olivier Girardot* | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94