Limit on the number of Jobs per Application

2018-05-30 Thread Jeremy Davis


I have an application that does many thousand univariate GLM regressions that 
seems to break down after completing around 25K jobs.
Plenty of resources: disk, network, memory, CPU are free, but eventually it is 
only scheduling on a few threads (out of 400+ possible on the cluster)
No task failures or blacklisting has happened.
Also trying to bring up the Spark UI Jobs tab gets slower and slower to refresh 
(eventually it doesn’t return)




Spark 2.3.0 on EMR.


Example:
val features = getFeatures(dataFrame)
val responses = getResponses(dataFrame)

//make f/r pairs
val cross = features.flatMap(x => responses.map(y => (x, y))).toSeq.par

cross.map(c=>(c._1,c._2,glm(sparkSession,dataFrame,Array(c._1),c._2,true))).toList



Re: help needed in perforance improvement of spark structured streaming

2018-05-30 Thread amit kumar singh
hi team

any help with this



I have a use case where i need to call stored procedure through structured
streaming.

I am able to send kafka message and call stored procedure ,

but since foreach sink keeps on executing stored procedure per message

i want to combine all the messages in single dtaframe and then call  stored
procedure at once

is it possible to do


current code

select('value cast "string",'topic)
  .select('topic,concat_ws(",", 'value cast "string") as 'value1)
 .groupBy('topic cast "string").count()
.coalesce(1)
.as[String]
.writeStream
.trigger(ProcessingTime("60 seconds"))
.option("checkpointLocation", checkpointUrl)
.foreach(new SimpleSqlServerSink(jdbcUrl, connectionProperties))







On Sat, May 5, 2018 at 12:20 PM, amit kumar singh 
wrote:

> Hi Community,
>
> I have a use case where i need to call stored procedure through structured
> streaming.
>
> I am able to send kafka message and call stored procedure ,
>
> but since foreach sink keeps on executing stored procedure per message
>
> i want to combine all the messages in single dtaframe and then call
> stored procedure at once
>
> is it possible to do
>
>
> current code
>
> select('value cast "string",'topic)
>   .select('topic,concat_ws(",", 'value cast "string") as 'value1)
>  .groupBy('topic cast "string").count()
> .coalesce(1)
> .as[String]
> .writeStream
> .trigger(ProcessingTime("60 seconds"))
> .option("checkpointLocation", checkpointUrl)
> .foreach(new SimpleSqlServerSink(jdbcUrl, connectionProperties))
>
>
>
>
> thanks
> rohit
>


How can we group by messages coming in per batch of structured streaming

2018-05-30 Thread amit kumar singh
Hi Team,

I have a requirement where i need to to combine all  json messages coming
in batch of structured streaming into one single json messages which can be
separated by comma or any other delimiter and store it

i have tried to group by kafka partition i tried using concat but its not
working

thanks
Amit


Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-30 Thread Koert Kuipers
thanks, thats helpful.


On Wed, May 30, 2018 at 5:05 PM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> Few things
>
>
>
>1. Append mode is going to output data that falls out of the watermark
>2. Structured streaming isn’t time based. It reacts only when it sees
>input data. If no data appears in the input it will not move the
>aggregation window
>3. Clock time is irrelevant to structured streaming. As far as
>structured streaming is concerned, “current time” is max time of timestamp
>column used by the window
>
>
>
> SO, what is happening in our case is that you posted 1 and 2 within 2
> seconds of each other. Since, none of them fell outside of watermark, it
> didn’t output anything. Now, until the point you posted 3, time was frozen
> for Structured streaming. The max time of the timestamp column was the
> timestamp of message 2. So, current time was the timestamp of message 2.
> When you posted 3, the time advanced to the timestamp of 3, which caused 1
> to fall out, so it output 1.
>
>
>
> Note that, it will not output 1 exactly 1 second after 1 arrives. The
> clock time means nothing.
>
>
>
> *From: *Koert Kuipers 
> *Date: *Monday, May 28, 2018 at 6:17 PM
> *To: *user 
> *Subject: *trying to understand structured streaming aggregation with
> watermark and append outputmode
>
>
>
> hello all,
>
> just playing with structured streaming aggregations for the first time.
> this is my little program i run inside sbt:
>
>
>
> import org.apache.spark.sql.functions._
>
> val lines = spark.readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", )
>   .load()
>
> val query = lines
>   .withColumn("time", current_timestamp)
>   .withWatermark("time", "1 second")
>   .groupBy(window($"time", "1 second")).agg(collect_list("value") as
> "value")
>   .withColumn("windowstring", $"window" as "string")
>   .writeStream
>   .format("console")
>   .outputMode(OutputMode.Append)
>   .start()
>
> query.awaitTermination()
>
>
>
> before i start it i create a little server with nc:
>
> $ nc -lk 
>
>
>
> after it starts i simply type in a single character every 20 seconds or so
> inside nc and hit enter. my characters are 1, 2, 3, etc.
>
>
>
> the thing i dont understand is it comes back with the correct responses,
> but with delays in terms of entries (not time). after the first 2
> characters it comes back with empty aggregations, and then for every next
> character it comes back with the response for 2 characters ago. so when i
> hit 3 it comes back with the response for 1.
>
>
>
> not very realtime :(
>
>
>
> any idea why?
>
>
>
> i would like it to respond to my input 1 with the relevant response
> for that input (after the window and watermark has expired, of course, so
> within 2 seconds).
>
>
>
> i tried adding a trigger of 1 second but that didnt help either.
>
>
>
> below is the output with my inputs inserted using '<= ', so '<= 1'
> means i hit 1 and then enter.
>
>
>
>
>
> <= 1
> ---
> Batch: 0
> ---
> +--+-++
> |window|value|windowstring|
> +--+-++
> +--+-++
>
> <= 2
> ---
> Batch: 1
> ---
> +--+-++
> |window|value|windowstring|
> +--+-++
> +--+-++
>
> <= 3
> Batch: 2
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
> ++-++
>
> <= 4
> ---
> Batch: 3
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
> ++-++
>
> <= 5
> ---
> Batch: 4
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:01...|  [3]|[2018-05-28 18:01...|
> ++-++
>
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the 

Re: Unable to alter partition. The transaction for alter partition did not commit successfully.

2018-05-30 Thread naresh Goud
What are you doing? Give more details o what are you doing

On Wed, May 30, 2018 at 12:58 PM Arun Hive 
wrote:

>
> Hi
>
> While running my spark job component i am getting the following exception.
> Requesting for your help on this:
> Spark core version -
> spark-core_2.10-2.1.1
>
> Spark streaming version -
> spark-streaming_2.10-2.1.1
>
> Spark hive version -
> spark-hive_2.10-2.1.1
>
>
> 2018-05-28 00:08:04,317  [streaming-job-executor-2] ERROR (Hive.java:1883)
> - org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter
> partition. The transaction for alter partition did not commit successfully.
> at org.apache.hadoop.hive.ql.metadata.Hive.alterPartition(Hive.java:573)
> at org.apache.hadoop.hive.ql.metadata.Hive.alterPartition(Hive.java:546)
> at
> org.apache.hadoop.hive.ql.metadata.Hive.alterPartitionSpec(Hive.java:1915)
> at org.apache.hadoop.hive.ql.metadata.Hive.getPartition(Hive.java:1875)
> at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1407)
> at
> org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(Hive.java:1593)
> at sun.reflect.GeneratedMethodAccessor123.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.sql.hive.client.Shim_v1_2.loadDynamicPartitions(HiveShim.scala:831)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveClientImpl.scala:693)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:691)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:691)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:279)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:226)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:225)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:268)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.loadDynamicPartitions(HiveClientImpl.scala:691)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveExternalCatalog.scala:823)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:811)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:811)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog.loadDynamicPartitions(HiveExternalCatalog.scala:811)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:319)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:221)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:407)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
> at
> org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:263)
> at
> org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:243)
>
> 
> -
>  -
> 
> -
> 
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> at
> 

Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-30 Thread Lalwani, Jayesh
Few things


  1.  Append mode is going to output data that falls out of the watermark
  2.  Structured streaming isn’t time based. It reacts only when it sees input 
data. If no data appears in the input it will not move the aggregation window
  3.  Clock time is irrelevant to structured streaming. As far as structured 
streaming is concerned, “current time” is max time of timestamp column used by 
the window

SO, what is happening in our case is that you posted 1 and 2 within 2 seconds 
of each other. Since, none of them fell outside of watermark, it didn’t output 
anything. Now, until the point you posted 3, time was frozen for Structured 
streaming. The max time of the timestamp column was the timestamp of message 2. 
So, current time was the timestamp of message 2. When you posted 3, the time 
advanced to the timestamp of 3, which caused 1 to fall out, so it output 1.

Note that, it will not output 1 exactly 1 second after 1 arrives. The clock 
time means nothing.

From: Koert Kuipers 
Date: Monday, May 28, 2018 at 6:17 PM
To: user 
Subject: trying to understand structured streaming aggregation with watermark 
and append outputmode

hello all,
just playing with structured streaming aggregations for the first time. this is 
my little program i run inside sbt:

import org.apache.spark.sql.functions._

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", )
  .load()

val query = lines
  .withColumn("time", current_timestamp)
  .withWatermark("time", "1 second")
  .groupBy(window($"time", "1 second")).agg(collect_list("value") as 
"value")
  .withColumn("windowstring", $"window" as "string")
  .writeStream
  .format("console")
  .outputMode(OutputMode.Append)
  .start()

query.awaitTermination()

before i start it i create a little server with nc:
$ nc -lk 

after it starts i simply type in a single character every 20 seconds or so 
inside nc and hit enter. my characters are 1, 2, 3, etc.

the thing i dont understand is it comes back with the correct responses, but 
with delays in terms of entries (not time). after the first 2 characters it 
comes back with empty aggregations, and then for every next character it comes 
back with the response for 2 characters ago. so when i hit 3 it comes 
back with the response for 1.

not very realtime :(

any idea why?

i would like it to respond to my input 1 with the relevant response for 
that input (after the window and watermark has expired, of course, so within 2 
seconds).

i tried adding a trigger of 1 second but that didnt help either.

below is the output with my inputs inserted using '<= ', so '<= 1' means 
i hit 1 and then enter.


<= 1
---
Batch: 0
---
+--+-++
|window|value|windowstring|
+--+-++
+--+-++

<= 2
---
Batch: 1
---
+--+-++
|window|value|windowstring|
+--+-++
+--+-++

<= 3
Batch: 2
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
++-++

<= 4
---
Batch: 3
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
++-++

<= 5
---
Batch: 4
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:01...|  [3]|[2018-05-28 18:01...|
++-++



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Data is not getting written in sorted format on target oracle table through SPARK

2018-05-30 Thread Lalwani, Jayesh
No. There is no way to control the order except for the option that you have 
already tried (repartition =1). When you are inserting in parallel from 
multiple nodes, then the order of inserts cannot be guaranteed. That is because 
of the very nature of doing things in parallel. The only way order can be 
controlled is by running inserts sequentially on one node, which is what 
repartition=1 does

Why does insertion order matter? Aren’t RDBMS databases designed so insertion 
order shouldn’t matter?

From: abhijeet bedagkar 
Date: Wednesday, May 30, 2018 at 5:29 AM
To: "user@spark.apache.org" 
Subject: Data is not getting written in sorted format on target oracle table 
through SPARK

Hi,

I have a table in hive with below schema
emp_id:int
emp_name:string

I have created data frame from above hive table

df = sql_context.sql('SELECT * FROM employee ORDER by emp_id')
df.show()

After above code is run I see that data is sorted properly on emp_id

After this I am trying to write the data to Oracle table through below code
df.write.jdbc(url=url, table='target_table', properties=properties,  
mode="overwrite")

When I see the Oracle table I see that ordering is not preserved and data is 
populated in random order

As per my understanding, This is happening because of multiple executor 
processes running at the same time on every data partitions and sorting applied 
through query is been applied on specific partition and when multiple processes 
writing data to Oracle at the same time the result table ordering is distorted

I further tried to repartition the data to just one partition(Which is not 
ideal solution) and post writing the data to oracle the sorting worked properly

Is there any way to write sorted data to RDBMS from SPARK

Thanks,
Abhijeet


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Unable to alter partition. The transaction for alter partition did not commit successfully.

2018-05-30 Thread Arun Hive
 
Hi 
While running my spark job component i am getting the following exception. 
Requesting for your help on this:Spark core version - spark-core_2.10-2.1.1
Spark streaming version -spark-streaming_2.10-2.1.1
Spark hive version -spark-hive_2.10-2.1.1

2018-05-28 00:08:04,317  [streaming-job-executor-2] ERROR (Hive.java:1883) - 
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter partition. 
The transaction for alter partition did not commit successfully.
 at org.apache.hadoop.hive.ql.metadata.Hive.alterPartition(Hive.java:573)
 at org.apache.hadoop.hive.ql.metadata.Hive.alterPartition(Hive.java:546)
 at org.apache.hadoop.hive.ql.metadata.Hive.alterPartitionSpec(Hive.java:1915)
 at org.apache.hadoop.hive.ql.metadata.Hive.getPartition(Hive.java:1875)
 at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1407)
 at 
org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(Hive.java:1593)
 at sun.reflect.GeneratedMethodAccessor123.invoke(Unknown Source)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.spark.sql.hive.client.Shim_v1_2.loadDynamicPartitions(HiveShim.scala:831)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveClientImpl.scala:693)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:691)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:691)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:279)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:226)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:225)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:268)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.loadDynamicPartitions(HiveClientImpl.scala:691)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveExternalCatalog.scala:823)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:811)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:811)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog.loadDynamicPartitions(HiveExternalCatalog.scala:811)
 at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:319)
 at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:221)
 at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:407)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
 at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
 at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
 at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:263)
 at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:243)

-
 
-
 
-
 at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
 at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
 at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
 at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
 at 

Closing IPC connection

2018-05-30 Thread Arun Hive
Hi,
While running my spark job component i am getting the following exception. 
Requesting for your help on this:Spark core version - spark-core_2.10-2.1.1
Spark streaming version -spark-streaming_2.10-2.1.1
Spark hive version -spark-hive_2.10-2.1.1

b-executor-0] DEBUG (Client.java:428) - The ping interval is 6 
ms.2018-05-28 00:08:10,187  [streaming-job-executor-0] DEBUG (Client.java:698) 
- Connecting to /:80202018-05-28 00:08:10,188  
[streaming-job-executor-0] DEBUG (Client.java:1176) - closing ipc connection to 
/:8020: nulljava.nio.channels.ClosedByInterruptException 
at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
 at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659) at 
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192) 
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530) at 
org.apache.hadoop.net.NetUtils.connect(NetUtils.java:494) at 
org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:608) at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:706) at 
org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:369) at 
org.apache.hadoop.ipc.Client.getConnection(Client.java:1522) at 
org.apache.hadoop.ipc.Client.call(Client.java:1439) at 
org.apache.hadoop.ipc.Client.call(Client.java:1400) at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
 at com.sun.proxy.$Proxy11.getListing(Unknown Source) at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:554)
 at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy12.getListing(Unknown Source) at 
org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1958) at 
org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1941) at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:693)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751)
 at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)
 at org.apache.hadoop.fs.Globber.listStatus(Globber.java:69) at 
org.apache.hadoop.fs.Globber.glob(Globber.java:217) at 
org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644) at 
org.apache.hadoop.hive.common.HiveStatsUtils.getFileStatusRecurse(HiveStatsUtils.java:73)
 at 
org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(Hive.java:1549) 
at sun.reflect.GeneratedMethodAccessor123.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.spark.sql.hive.client.Shim_v1_2.loadDynamicPartitions(HiveShim.scala:831)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveClientImpl.scala:693)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:691)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:691)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:279)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:226)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:225)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:268)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.loadDynamicPartitions(HiveClientImpl.scala:691)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveExternalCatalog.scala:823)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:811)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:811)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog.loadDynamicPartitions(HiveExternalCatalog.scala:811)
 at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:319)
 at 

Error while creating table with space with /wihout partition

2018-05-30 Thread abhijeet bedagkar
I am facing a weird situation wherein the insert overwrite query does not
give any error on being executed against a table which contains a column
with a space in its name. Following are the queries which give no error:


CREATE TABLE TEST_PART (`col1 ` STRING) PARTITIONED BY (`col2` STRING)
STORED AS PARQUET; set hive.exec.dynamic.partition.mode=nonstrict; set
hive.exec.max.dynamic.partitions = 2500; INSERT OVERWRITE TABLE TEST_PART
PARTITION (col2) select 'test' as col1, 'test2' as col2;


A similar table without partitions results in the AnalysisException thrown
saying 'Attribute name "col1 " contains invalid character(s) among "
,;{}()\n\t=". Please use alias to rename it.;'. Below are the set of
queries:


CREATE TABLE TEST (`col1 ` STRING,`col2` STRING)STORED AS PARQUET; INSERT
OVERWRITE TABLE TEST select 'test' as col1, 'test2' as col2;


However, I get the AnalysisException on running a select * against
TEST_PART table. For TEST the insert overwrite query itself errors out. Any
idea why would this happen ?


We are using Spark 2.0.0. The same behavior has been noticed in Spark 2.2.1
as well.


Thrift server not exposing temp tables (spark.sql.hive.thriftServer.singleSession=true)

2018-05-30 Thread Daniel Haviv
Hi,
I would like to expose a DF through the Thrift server, but even though I
enable spark.sql.hive.thriftServer.singleSession I still can't see the temp
table.

I'm using Spark 2.2.0:


spark-shell --conf spark.sql.hive.thriftServer.singleSession=true

 import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
 import org.apache.spark.sql.hive.thriftserver._
 val df = spark.read.orc( "s3://sparkdemoday/crimes_orc")
 df.createOrReplaceTempView("cached_df")
 df.cache
 df.count
 val sql = spark.sqlContext
 HiveThriftServer2.startWithContext(sql)


Thank you.
Daniel


Re: Blockmgr directories intermittently not being cleaned up

2018-05-30 Thread Jeff Frylings
The logs are not the problem; it is the shuffle files that are not being 
cleaned up.  We do have the configs for log rolling and that is working just 
fine.

ex: /mnt/blockmgr-d65d4a74-d59a-4a06-af93-ba29232f7c5b/31/shuffle_1_46_0.data

> On May 30, 2018, at 9:54 AM, Ajay  wrote:
> 
> I have used these configs in the paths to clean up the executor logs.
> 
>   .set("spark.executor.logs.rolling.time.interval", "minutely")
>   .set("spark.executor.logs.rolling.strategy", "time")
>   .set("spark.executor.logs.rolling.maxRetainedFiles", "1")
> 
> On Wed, May 30, 2018 at 8:49 AM Jeff Frylings  > wrote:
> Intermittently on spark executors we are seeing blockmgr directories not 
> being cleaned up after execution and is filling up disk.  These executors are 
> using Mesos dynamic resource allocation and no single app using an executor 
> seems to be the culprit.  Sometimes an app will run and be cleaned up and 
> then on a subsequent run that same AppExecId will run and not be cleaned up.  
> The runs that have left behind folders did not have any obvious task failures 
> in the SparkUI during that time frame.  
> 
> The Spark shuffle service in the ami is version 2.1.1
> The code is running on spark 2.0.2 in the mesos sandbox.
> 
> In a case where files are cleaned up the spark.log looks like the following
> 18/05/28 14:47:24 INFO ExternalShuffleBlockResolver: Registered executor 
> AppExecId{appId=33d8fe79-a670-4277-b6f3-ee1049724204-8310, execId=95} with 
> ExecutorShuffleInfo{localDirs=[/mnt/blockmgr-b2c7ff97-481e-4482-b9ca-92a5f8d4b25e],
>  subDirsPerLocalDir=64, 
> shuffleManager=org.apache.spark.shuffle.sort.SortShuffleManager}
> ...
> 18/05/29 02:54:09 INFO MesosExternalShuffleBlockHandler: Application 
> 33d8fe79-a670-4277-b6f3-ee1049724204-8310 timed out. Removing shuffle files.
> 18/05/29 02:54:09 INFO ExternalShuffleBlockResolver: Application 
> 33d8fe79-a670-4277-b6f3-ee1049724204-8310 removed, cleanupLocalDirs = true
> 
> 
> In a case where files are not cleaned up we do not see the 
> "MesosExternalShuffleBlockHandler: Application  timed out. Removing 
> shuffle files."
> 
> We are using this config when starting the job "--conf 
> spark.worker.cleanup.enabled=true" but I believe this only pertains to 
> standalone mode and we are using the mesos deployment mode. So I don't think 
> this flag actually does anything. 
> 
> 
> Thanks,
> Jeff
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 
> 
> -- 
> Thanks,
> Ajay



Re: Blockmgr directories intermittently not being cleaned up

2018-05-30 Thread Ajay
I have used these configs in the paths to clean up the executor logs.

  .set("spark.executor.logs.rolling.time.interval", "minutely")
  .set("spark.executor.logs.rolling.strategy", "time")
  .set("spark.executor.logs.rolling.maxRetainedFiles", "1")

On Wed, May 30, 2018 at 8:49 AM Jeff Frylings 
wrote:

> Intermittently on spark executors we are seeing blockmgr directories not
> being cleaned up after execution and is filling up disk.  These executors
> are using Mesos dynamic resource allocation and no single app using an
> executor seems to be the culprit.  Sometimes an app will run and be cleaned
> up and then on a subsequent run that same AppExecId will run and not be
> cleaned up.  The runs that have left behind folders did not have any
> obvious task failures in the SparkUI during that time frame.
>
> The Spark shuffle service in the ami is version 2.1.1
> The code is running on spark 2.0.2 in the mesos sandbox.
>
> In a case where files are cleaned up the spark.log looks like the following
> 18/05/28 14:47:24 INFO ExternalShuffleBlockResolver: Registered executor
> AppExecId{appId=33d8fe79-a670-4277-b6f3-ee1049724204-8310, execId=95} with
> ExecutorShuffleInfo{localDirs=[/mnt/blockmgr-b2c7ff97-481e-4482-b9ca-92a5f8d4b25e],
> subDirsPerLocalDir=64,
> shuffleManager=org.apache.spark.shuffle.sort.SortShuffleManager}
> ...
> 18/05/29 02:54:09 INFO MesosExternalShuffleBlockHandler: Application
> 33d8fe79-a670-4277-b6f3-ee1049724204-8310 timed out. Removing shuffle files.
> 18/05/29 02:54:09 INFO ExternalShuffleBlockResolver: Application
> 33d8fe79-a670-4277-b6f3-ee1049724204-8310 removed, cleanupLocalDirs = true
>
>
> In a case where files are not cleaned up we do not see the
> "MesosExternalShuffleBlockHandler: Application  timed out. Removing
> shuffle files."
>
> We are using this config when starting the job "--conf
> spark.worker.cleanup.enabled=true" but I believe this only pertains to
> standalone mode and we are using the mesos deployment mode. So I don't
> think this flag actually does anything.
>
>
> Thanks,
> Jeff
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Thanks,
Ajay


Blockmgr directories intermittently not being cleaned up

2018-05-30 Thread Jeff Frylings
Intermittently on spark executors we are seeing blockmgr directories not being 
cleaned up after execution and is filling up disk.  These executors are using 
Mesos dynamic resource allocation and no single app using an executor seems to 
be the culprit.  Sometimes an app will run and be cleaned up and then on a 
subsequent run that same AppExecId will run and not be cleaned up.  The runs 
that have left behind folders did not have any obvious task failures in the 
SparkUI during that time frame.  

The Spark shuffle service in the ami is version 2.1.1
The code is running on spark 2.0.2 in the mesos sandbox.

In a case where files are cleaned up the spark.log looks like the following
18/05/28 14:47:24 INFO ExternalShuffleBlockResolver: Registered executor 
AppExecId{appId=33d8fe79-a670-4277-b6f3-ee1049724204-8310, execId=95} with 
ExecutorShuffleInfo{localDirs=[/mnt/blockmgr-b2c7ff97-481e-4482-b9ca-92a5f8d4b25e],
 subDirsPerLocalDir=64, 
shuffleManager=org.apache.spark.shuffle.sort.SortShuffleManager}
...
18/05/29 02:54:09 INFO MesosExternalShuffleBlockHandler: Application 
33d8fe79-a670-4277-b6f3-ee1049724204-8310 timed out. Removing shuffle files.
18/05/29 02:54:09 INFO ExternalShuffleBlockResolver: Application 
33d8fe79-a670-4277-b6f3-ee1049724204-8310 removed, cleanupLocalDirs = true


In a case where files are not cleaned up we do not see the 
"MesosExternalShuffleBlockHandler: Application  timed out. Removing 
shuffle files."

We are using this config when starting the job "--conf 
spark.worker.cleanup.enabled=true" but I believe this only pertains to 
standalone mode and we are using the mesos deployment mode. So I don't think 
this flag actually does anything. 


Thanks,
Jeff
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Apache Spark is not working as expected

2018-05-30 Thread remil
hadoopuser@sherin-VirtualBox:/usr/lib/spark/bin$ spark-shell
spark-shell: command not found
hadoopuser@sherin-VirtualBox:/usr/lib/spark/bin$  Spark.odt
  



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: testing frameworks

2018-05-30 Thread Holden Karau
So Jessie has an excellent blog post on how to use it with Java
applications -
http://www.jesse-anderson.com/2016/04/unit-testing-spark-with-java/

On Wed, May 30, 2018 at 4:14 AM Spico Florin  wrote:

> Hello!
>   I'm also looking for unit testing spark Java application. I've seen the
> great work done in  spark-testing-base but it seemed to me that I could
> not use for Spark Java applications.
> Only spark scala applications are supported?
> Thanks.
> Regards,
>  Florin
>
> On Wed, May 23, 2018 at 8:07 AM, umargeek 
> wrote:
>
>> Hi Steve,
>>
>> you can try out pytest-spark plugin if your writing programs using pyspark
>> ,please find below link for reference.
>>
>> https://github.com/malexer/pytest-spark
>> 
>>
>> Thanks,
>> Umar
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
> --
Twitter: https://twitter.com/holdenkarau


Re: testing frameworks

2018-05-30 Thread Spico Florin
Hello!
  I'm also looking for unit testing spark Java application. I've seen the
great work done in  spark-testing-base but it seemed to me that I could not
use for Spark Java applications.
Only spark scala applications are supported?
Thanks.
Regards,
 Florin

On Wed, May 23, 2018 at 8:07 AM, umargeek 
wrote:

> Hi Steve,
>
> you can try out pytest-spark plugin if your writing programs using pyspark
> ,please find below link for reference.
>
> https://github.com/malexer/pytest-spark
> 
>
> Thanks,
> Umar
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Positive log-likelihood with Gaussian mixture

2018-05-30 Thread Simon Dirmeier
I see, thanks for clearning that up. I was aware of the fact for uniform 
distributions, but not for normal ones.
So that would mean, some of the components have such a small variance 
that the loglik is positive in the end?


Cheers,
Simon

Am 30.05.18 um 11:22 schrieb robin.e...@xense.co.uk:
Positive log likelihoods for continuous distributions are not unusual. 
You are evaluating a pdf not a probability. For example a univariate 
Gaussian pdf returns greater than 1 at the mean when the variance goes 
below 0.39, at which point the log pdf is positive.


Sent from Polymail 
 



On Tue, 29 May 2018 at 12:08 Simon Dirmeier > wrote:


Hey,

sorry for the late reply. I cannot share the data but the problem
can be reproduced easily, like below.
I wanted to check with sklearn and observe a similar behaviour,
i.e. a positive per-sample average log-likelihood

(http://scikit-learn.org/stable/modules/generated/sklearn.mixture.GaussianMixture.html#sklearn.mixture.GaussianMixture.score).

I don't think it is necessarily an issue with the implementation,
but maybe due to parameter identifiability or so?
As far as I can tell, the variances seem to be ok.

Thanks for looking into this.

Best,
Simon
/


import scipy//
import sklearn.mixture//
from scipy.stats import multivariate_normal//
from sklearn.mixture import GaussianMixture//
/ /
/ /scipy.random.seed(23)//
/ /X = multivariate_normal.rvs(mean=scipy.ones(10), size=100)//
/ /
/ /dff = map(lambda x: (int(x[0]), Vectors.dense(x[0:])), X)//
/ /df = spark.createDataFrame(dff, schema=["label", "features"])//
/ /
/
/for i in [100, 90, 80, 70, 60, 50]://
/ /    km = pyspark.ml.clustering.GaussianMixture(k=10,
seed=23).fit(df.limit(i))//
/ /sk_gmm = sklearn.mixture.GaussianMixture(10,
random_state=23).fit(X[:i, :])//
/ /    print(df.limit(i).count(), X[:i, :].shape[0],
km.summary.logLikelihood, sk_gmm.score(X[:i, :]))//
/ /
/

/100 100 368.37475644171036 -1.54949312502 90 90 1026.084529101155
1.16196607062 80 80 2245.427539835042 4.25769131857 70 70
1940.0122633489268 10.0949992881 60 60 2255.002313247103
14.0497823725 50 50 -140.82605873444814 21.2423016046/






Data is not getting written in sorted format on target oracle table through SPARK

2018-05-30 Thread abhijeet bedagkar
Hi,

I have a table in hive with below schema
emp_id:int
emp_name:string

I have created data frame from above hive table

df = sql_context.sql('SELECT * FROM employee ORDER by emp_id')
df.show()

After above code is run I see that data is sorted properly on emp_id

After this I am trying to write the data to Oracle table through below code
df.write.jdbc(url=url, table='target_table', properties=properties,
mode="overwrite")

When I see the Oracle table I see that ordering is not preserved and data
is populated in random order

As per my understanding, This is happening because of multiple executor
processes running at the same time on every data partitions and sorting
applied through query is been applied on specific partition and when
multiple processes writing data to Oracle at the same time the result table
ordering is distorted

I further tried to repartition the data to just one partition(Which is not
ideal solution) and post writing the data to oracle the sorting worked
properly

Is there any way to write sorted data to RDBMS from SPARK

Thanks,
Abhijeet


Re: Positive log-likelihood with Gaussian mixture

2018-05-30 Thread robin . east
Positive log likelihoods for continuous distributions are not unusual. You are 
evaluating a pdf not a probability. For example a univariate Gaussian pdf 
returns greater than 1 at the mean when the variance goes below 0.39, at which 
point the log pdf is positive.

Sent from Polymail ( 
https://polymail.io/?utm_source=polymail_medium=referral_campaign=signature
 )

On Tue, 29 May 2018 at 12:08 Simon Dirmeier < Simon Dirmeier ( Simon Dirmeier 
 ) > wrote:

> 
> 
> Hey,
> 
> sorry for the late reply. I cannot share the data but the problem can be
> reproduced easily, like below.
> I wanted to check with sklearn and observe a similar behaviour, i.e. a
> positive per-sample average log-likelihood ( 
> http://scikit-learn.org/stable/modules/generated/sklearn.mixture.GaussianMixture.html#sklearn.mixture.GaussianMixture.score
> ).
> 
> I don't think it is necessarily an issue with the implementation, but
> maybe due to parameter identifiability or so?
> As far as I can tell, the variances seem to be ok.
> 
> Thanks for looking into this.
> 
> Best,
> Simon
> 
> 
> 
> import scipy
> import sklearn.mixture
> from scipy.stats import multivariate_normal
> from sklearn.mixture import GaussianMixture
> 
> scipy.random.seed(23)
> X = multivariate_normal.rvs(mean=scipy.ones(10), size=100)
> 
> dff = map(lambda x: (int(x[0]), Vectors.dense(x[0:])), X)
> df = spark.createDataFrame(dff, schema=["label", "features"])
> 
> for i in [100, 90, 80, 70, 60, 50]:
>     km = pyspark.ml.clustering.GaussianMixture(k=10,
> seed=23).fit(df.limit(i))
>     sk_gmm = sklearn.mixture.GaussianMixture(10,
> random_state=23).fit(X[:i, :])
>     print(df.limit(i).count(), X[:i, :].shape[0],
> km.summary.logLikelihood, sk_gmm.score(X[:i, :]))
> 
> 100 100 368.37475644171036 -1.54949312502 90 90 1026.084529101155
> 1.16196607062 80 80 2245.427539835042 4.25769131857 70 70
> 1940.0122633489268 10.0949992881 60 60 2255.002313247103 14.0497823725 50
> 50 -140.82605873444814 21.2423016046
>

Re: Datafarme save as table operation is failing when the child columns name contains special characters

2018-05-30 Thread abhijeet bedagkar
I further dig down into this issue and 1. Seems like this issue originates
from hive meta-store since when tried to execute query with sub-column
containing special characters and despite adding backtick it did not work
for me 2. I solved this issue by explicitly passing SQL expression to the
data frame by updating special character from sub columns

Ex

source data :
{
  "address": {
"lane-one": "mark street",
"lane:two": "sub stree"
 }
}
Python CODE:

schema = 'struct'
data_frame_from_json.select(col('address').cast(schema))
I have verified the data for much more complex JSON and XML structure and
it looks good.

Thanks,
Abhijeet

On Wed, May 16, 2018 at 6:13 PM, abhijeet bedagkar 
wrote:

> Hi,
>
> I am using SPARK to read the XML / JSON files to create a dataframe and
> save it as a hive table
>
> Sample XML file:
> 
> 101
> 
> 45
> COMMAND
> 
> 
>
> Note field 'validation-timeout' under testexecutioncontroller.
>
> Below is the schema populated by DF after reading the XML file
>
> |-- id: long (nullable = true)
> |-- testexecutioncontroller: struct (nullable = true)
> ||-- execution-timeout: long (nullable = true)
> ||-- execution-method: string (nullable = true)
>
> While saving this dataframe to hive table I am getting below exception
>
> Caused by: java.lang.IllegalArgumentException: Error: : expected at the
> position 24 of 
> 'bigint:struct'
> but '-' is found.at org.apache.hadoop.hive.serde2.
> typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:360)
>   at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$
> TypeInfoParser.expect(TypeInfoUtils.java:331)at
> org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$
> TypeInfoParser.parseType(TypeInfoUtils.java:483)at
> org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$
> TypeInfoParser.parseTypeInfos(TypeInfoUtils.java:305)at
> org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.
> getTypeInfosFromTypeString(TypeInfoUtils.java:765)at
> org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.initialize(ParquetHiveSerDe.java:111)
>   at 
> org.apache.hadoop.hive.serde2.AbstractSerDe.initialize(AbstractSerDe.java:53)
>   at 
> org.apache.hadoop.hive.serde2.SerDeUtils.initializeSerDe(SerDeUtils.java:521)
>   at 
> org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:391)
>   at org.apache.hadoop.hive.ql.metadata.Table.
> getDeserializerFromMetaStore(Table.java:276)at
> org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:197)
> at org.apache
>
> It looks like the issue is happening due to special character '-' in the
> field. As after removing the special character it iw working properly.
>
> Could you please let me know if there is way to replaces all child column
> names so that it can be saved as table without any issue.
>
> Creating the STRUCT FIELD from df.schema and recursively creating another
> STRUCTFIELD with renamed column is one solution I am aware of. But still
> wanted to check if there is easy way to do this.
>
> Thanks,
> Abhijeet
>