Re: need workaround around HIVE-11625 / DISTRO-800

2018-08-07 Thread Pranav Agrawal
any help please

On Tue, Aug 7, 2018 at 1:49 PM, Pranav Agrawal 
wrote:

> I am hitting issue,
> https://issues.cloudera.org/browse/DISTRO-800 (related to
> https://issues.apache.org/jira/browse/HIVE-11625)
>
> I am unable to write empty array of types int or string (array of size 0)
> into parquet, please assist or suggest workaround for the same.
>
> spark version: 2.2.1
> AWS EMR: 5.12, 5.13
>


Re: Split a row into multiple rows Java

2018-08-07 Thread Manu Zhang
The following may help although in Scala. The idea is to firstly concat
each value with time, assembly all time_value into an array and explode,
and finally split time_value into time and value.

 val ndf = df.select(col("name"), col("otherName"),
explode(
  array(concat_ws(":", col("v1"), lit("v1")).alias("v1"),
concat_ws(":", col("v2"), lit("v2")).alias("v2"),
concat_ws(":", col("v3"), lit("v3")).alias("v3"))
).alias("temp"))

  val fields = split(col("temp"), ":")
  ndf.select(col("name"), col("otherName"),
fields.getItem(1).alias("time"),
fields.getItem(0).alias("value"))

Regards,
Manu Zhang

On Wed, Aug 8, 2018 at 11:41 AM nookala  wrote:

> +-+-++++
> | name|otherName|val1|val2|val3|
> +-+-++++
> |  bob|   b1|   1|   2|   3|
> |alive|   c1|   3|   4|   6|
> |  eve|   e1|   7|   8|   9|
> +-+-++++
>
> I need this to become
>
> +-+-++-
> | name|otherName|time|value
> +-+-++-
> |  bob|   b1|   val1|1
> |  bob|   b1|   val2|2
> |  bob|   b1|   val3|3
> |alive|   c1|   val1| 3
> |alive|   c1|   val2| 4
> |alive|   c1|   val3| 6
> |  eve|   e1|   val1|7
> |  eve|   e1|   val2|8
> |  eve|   e1|   val3|9
> +-+-++-
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Unable to see completed application in Spark 2 history web UI

2018-08-07 Thread Fawze Abujaber
Hello Community,

I'm using Spark 2.3 and Spark 1.6.0 in my cluster with Cloudera
distribution 5.13.0.

Both are configured to run on Yarn, but i'm unable to see completed
application in Spark2 history server, while in Spark 1.6.0 i did.

1) I checked the HDFS permissions for both folders and both have the same
permissions.

drwxrwxrwt   - cloudera-scm spark  0 2018-08-08 00:46
/user/spark/applicationHistory
drwxrwxrwt   - cloudera-scm spark  0 2018-08-08 00:46
/user/spark/spark2ApplicationHistory

The applications file itself running with permissions 770 in both.

-rwxrwx---   3  fawzea spark 4743751 2018-08-07 23:32
/user/spark/spark2ApplicationHistory/application_1527404701551_672816_1
-rwxrwx---   3  fawzea spark   134315 2018-08-08 00:41
/user/spark/applicationHistory/application_1527404701551_673359_1

2) No error in the Spark2 history server log.

3) Compared the configurations between Spark 1.6 and Spark 2.3 like system
user, enable log, etc ... all looks the same.

4) Once i changed the permissions for the above Spark2 applications to 777,
i was able to see the application in the spark2 history server UI.

Tried to figure out if the 2 Sparks UIs running with different users but
was unable to find it.

Anyone who ran into this issue and solved it?

Thanks in advance.


-- 
Take Care
Fawze Abujaber


Re: Split a row into multiple rows Java

2018-08-07 Thread nookala
+-+-++++
| name|otherName|val1|val2|val3|
+-+-++++
|  bob|   b1|   1|   2|   3|
|alive|   c1|   3|   4|   6|
|  eve|   e1|   7|   8|   9|
+-+-++++

I need this to become

+-+-++-
| name|otherName|time|value
+-+-++-
|  bob|   b1|   val1|1
|  bob|   b1|   val2|2
|  bob|   b1|   val3|3
|alive|   c1|   val1| 3
|alive|   c1|   val2| 4
|alive|   c1|   val3| 6
|  eve|   e1|   val1|7
|  eve|   e1|   val2|8
|  eve|   e1|   val3|9
+-+-++-



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Insert into dynamic partitioned hive/parquet table throws error - Partition spec contains non-partition columns

2018-08-07 Thread Nirav Patel
FYI, it works with static partitioning
spark.sql("insert overwrite table mytable PARTITION(P1=1085, P2=164590861)
select c1, c2,..cn, P1, P2 from updateTable")

On Thu, Aug 2, 2018 at 5:01 PM, Nirav Patel  wrote:

> I am trying to insert overwrite multiple partitions into existing
> partitioned hive/parquet table. Table was created using sparkSession.
>
> I have a table 'mytable' with partitions P1 and P2.
>
> I have following set on sparkSession object:
>
> .config("hive.exec.dynamic.partition", true)
>
> .config("hive.exec.dynamic.partition.mode", "nonstrict")
>
> val df = spark.read.csv(pathToNewData)
>
> df.createOrReplaceTempView("updateTable")
>
> here 'df' may contains data from multiple partitions. i.e. multiple values
> for P1 and P2 in data.
>
>
> spark.sql("insert overwrite table mytable PARTITION(P1, P2) select c1,
> c2,..cn, P1, P2 from updateTable") // I made sure that partition columns P1
> and P2 are at the end of projection list.
>
> I am getting following error:
>
> org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.meta
> data.Table.ValidationFailureSemanticException: Partition spec {p1=, p2=,
> P1=1085, P2=164590861} contains non-partition columns;
>
> dataframe 'df' have records for P1=1085, P2=164590861 .
>
>

-- 


 

 
   
   
      



Updating dynamic partitioned hive table throws error - Partition spec contains non-partition columns

2018-08-07 Thread nirav
I am using spark 2.2.1 and hive2.1. I am trying to insert overwrite
multiple partitions into existing partitioned hive/parquet table.

Table was created using sparkSession.

I have a table 'mytable' with partitions P1 and P2.

I have following set on sparkSession object:

"hive.exec.dynamic.partition"=true
"hive.exec.dynamic.partition.mode"="nonstrict"

Code:

val df = spark.read.csv(pathToNewData)
df.createOrReplaceTempView("updateTable") //here 'df' may contains data
from multiple partitions. i.e. multiple values for P1 and P2 in data.

spark.sql("insert overwrite table mytable PARTITION(P1, P2) select c1,
c2,..cn, P1, P2 from updateTable") // I made sure that partition columns P1
and P2 are at the end of projection list.

I am getting following error:

org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.Table.ValidationFailureSemanticException:
Partition spec {p1=, p2=, P1=1085, P2=164590861} contains non-partition
columns;

dataframe 'df' have records for P1=1085, P2=164590861 . It looks like issue
with casing (lower vs upper). I tried both cases in my query but it's still
not working.
It works if I use static partitioning:
spark.sql("insert overwrite table mytable PARTITION(P1=1085, P2=164590861)
select c1, c2,..cn, P1, P2 from updateTable where P1=1085 and P2=164590861
")
But this is not what I am looking for. I need to get dynamic partitioning
updates working.

Thanks


Re: Newbie question on how to extract column value

2018-08-07 Thread James Starks
Because of some legacy issues I can't immediately upgrade spark version. But I 
try filter data before loading it into spark based on the suggestion by

 val df = sparkSession.read.format("jdbc").option(...).option("dbtable", 
"(select .. from ... where url <> '') table_name")load()
 df.createOrReplaceTempView("new_table")

Then perform custom operation do the trick.

sparkSession.sql("select id, url from new_table").as[(String, String)].map 
{ case (id, url) =>
   val derived_data = ... // operation on url
   (id, derived_data)
}.show()

Thanks for the advice, it's really helpful!

‐‐‐ Original Message ‐‐‐
On August 7, 2018 5:33 PM, Gourav Sengupta  wrote:

> Hi James,
>
> It is always advisable to use the latest SPARK version. That said, can you 
> please giving a try to dataframes and udf if possible. I think, that would be 
> a much scalable way to address the issue.
>
> Also in case possible, it is always advisable to use the filter option before 
> fetching the data to Spark.
>
> Thanks and Regards,
> Gourav
>
> On Tue, Aug 7, 2018 at 4:09 PM, James Starks  
> wrote:
>
>> I am very new to Spark. Just successfully setup Spark SQL connecting to 
>> postgresql database, and am able to display table with code
>>
>> sparkSession.sql("SELECT id, url from table_a where col_b <> '' ").show()
>>
>> Now I want to perform filter and map function on col_b value. In plain scala 
>> it would be something like
>>
>> Seq((1, "http://a.com/a";), (2, "http://b.com/b";), (3, "unknown")).filter 
>> { case (_, url) => isValid(url) }.map { case (id, url)  => (id, pathOf(url)) 
>> }
>>
>> where filter will remove invalid url, and then map (id, url) to (id, path of 
>> url).
>>
>> However, when applying this concept to spark sql with code snippet
>>
>> sparkSession.sql("...").filter(isValid($"url"))
>>
>> Compiler complains type mismatch because $"url" is ColumnName type. How can 
>> I extract column value i.e. http://... for the column url in order to 
>> perform filter function?
>>
>> Thanks
>>
>> Java 1.8.0
>> Scala 2.11.8
>> Spark 2.1.0

Re: Newbie question on how to extract column value

2018-08-07 Thread Gourav Sengupta
Hi James,

It is always advisable to use the latest SPARK version. That said, can you
please giving a try to dataframes and udf if possible. I think, that would
be a much scalable way to address the issue.

Also in case possible, it is always advisable to use the filter option
before fetching the data to Spark.


Thanks and Regards,
Gourav

On Tue, Aug 7, 2018 at 4:09 PM, James Starks  wrote:

> I am very new to Spark. Just successfully setup Spark SQL connecting to
> postgresql database, and am able to display table with code
>
> sparkSession.sql("SELECT id, url from table_a where col_b <> ''
> ").show()
>
> Now I want to perform filter and map function on col_b value. In plain
> scala it would be something like
>
> Seq((1, "http://a.com/a";), (2, "http://b.com/b";), (3,
> "unknown")).filter { case (_, url) => isValid(url) }.map { case (id, url)
> => (id, pathOf(url)) }
>
> where filter will remove invalid url, and then map (id, url) to (id, path
> of url).
>
> However, when applying this concept to spark sql with code snippet
>
> sparkSession.sql("...").filter(isValid($"url"))
>
> Compiler complains type mismatch because $"url" is ColumnName type. How
> can I extract column value i.e. http://... for the column url in order to
> perform filter function?
>
> Thanks
>
> Java 1.8.0
> Scala 2.11.8
> Spark 2.1.0
>
>
>
>
>
>


Newbie question on how to extract column value

2018-08-07 Thread James Starks
I am very new to Spark. Just successfully setup Spark SQL connecting to 
postgresql database, and am able to display table with code

sparkSession.sql("SELECT id, url from table_a where col_b <> '' ").show()

Now I want to perform filter and map function on col_b value. In plain scala it 
would be something like

Seq((1, "http://a.com/a";), (2, "http://b.com/b";), (3, "unknown")).filter { 
case (_, url) => isValid(url) }.map { case (id, url)  => (id, pathOf(url)) }

where filter will remove invalid url, and then map (id, url) to (id, path of 
url).

However, when applying this concept to spark sql with code snippet

sparkSession.sql("...").filter(isValid($"url"))

Compiler complains type mismatch because $"url" is ColumnName type. How can I 
extract column value i.e. http://... for the column url in order to perform 
filter function?

Thanks

Java 1.8.0
Scala 2.11.8
Spark 2.1.0

Dynamic partitioning weird behavior

2018-08-07 Thread Nikolay Skovpin
Hi guys.
I was investigating a spark property
/spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")/. It
works perfectly in local fs, but on s3 i stumbled into a strange behavior.
If i don't have a hive table or this table is empty, spark won't save any
data into this table with SaveMode.Overwrite.
What i did:
import org.apache.spark.sql.{SaveMode, SparkSession}

  val spark = SparkSession.builder()
  .appName("Test for dynamic partitioning")
  .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
  .getOrCreate()
  
 val users = Seq(
 ("11", "Nikolay", "1900", "1"),
 ("12", "Nikolay", "1900", "1"),
 ("13", "Sergey", "1901", "1"),
 ("14", "Jone", "1900", "2"))
 .toDF("user_id", "name","year", "month")

users.write.partitionBy("year",
"month").mode(SaveMode.Overwrite).option("path",
"s3://dynamicPartitioning/users").saveAsTable("test.users")

I can see from logs that spark populates .spark-staging directory with the
data, then spark executes rename command.
But AlterTableRecoverPartitionsCommand shows me a message: /Found 0
partitions, Finished to gather the fast stats for all 0 partitions/. After
that the directory on s3 is empty (except _Sussess flag).
It is ok or a bug?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



need workaround around HIVE-11625 / DISTRO-800

2018-08-07 Thread Pranav Agrawal
I am hitting issue,
https://issues.cloudera.org/browse/DISTRO-800 (related to
https://issues.apache.org/jira/browse/HIVE-11625)

I am unable to write empty array of types int or string (array of size 0)
into parquet, please assist or suggest workaround for the same.

spark version: 2.2.1
AWS EMR: 5.12, 5.13