Re: Need help with String Concat Operation

2017-10-18 Thread
Hi Debu,

First, Instead of using ‘+’, you can use ‘concat’ to concatenate string
columns. And you should enclose “0” with "lit()" to make it a column.
Second, 1440 become null because you didn’t tell spark what to do if the
when clause is failed. So it simply set the value to null. To fix this, you
should add “.otherwise()” right behind “when()”.

The code looks like this:

ctoff_df.withColumn("CTOFF_NEW",
  when(
length(col("CTOFF")) ==3,
concat(lit("0"), col("CTOFF"))
  ).otherwise(
col("CTOFF")
  ))

Best,

JiaXiang
​

On Wed, Oct 18, 2017 at 2:17 PM, Debabrata Ghosh 
wrote:

> Hi,
>  I am having a dataframe column (name of the column is CTOFF)
> and I intend to prefix with '0' in case the length of the column is 3.
> Unfortunately, I am unable to acheive my goal and wonder whether you can
> help me here.
>
> Command which I am executing:
>
> ctoff_dedup_prep_temp = 
> ctoff_df.withColumn('CTOFF_NEW',when(length(col('CTOFF'))
> == 3,'0'+col('CTOFF')))
> ctoff_dedup_prep_temp.show()
>
> ++--+--++---
> --+-+-+
> |EVNT_SRVC_AR|EVNT_FCLTY|EVNT_TP_CD|NTWRK_PRD_CD|
> DY_OF_WK|CTOFF|CTOFF_NEW|
> ++--+--++---
> --+-+-+
> | HKG|   HKC|AR|2,3,7,8,C,D,J,P,Q...|1,2,3,4,5|
> 1440| null|
> | HKG|   HKC|AR| C,Q,T,Y|1,2,3,4,5|
> 730|730.0|
> | HKG|   HKC|AR| E,K,C,Q,T,Y|1,2,3,4,5|
> 600|600.0|
> | HKG|   HKC|AR| E,K,C,Q,T,Y|1,2,3,4,5|
> 900|900.0|
> ++--+--++---
> --+-+-+
>
> The result which I want is:
> ++--+--++---
> --+-+-+
> |EVNT_SRVC_AR|EVNT_FCLTY|EVNT_TP_CD|NTWRK_PRD_CD|
> DY_OF_WK|CTOFF|CTOFF_NEW|
> ++--+--++---
> --+-+-+
> | HKG|   HKC|AR|2,3,7,8,C,D,J,P,Q...|1,2,3,4,5|
> 1440|1440|
> | HKG|   HKC|AR| C,Q,T,Y|1,2,3,4,5|
> 730|0730|
> | HKG|   HKC|AR| E,K,C,Q,T,Y|1,2,3,4,5|
> 600|0600|
> | HKG|   HKC|AR| E,K,C,Q,T,Y|1,2,3,4,5|
> 900|0900|
> ++--+--++---
> --+-+-+
>
> So I want the '0' to be prefixed but it's getting suffixed as '.0'. Any
> clue around why is this happening
>
> Thanks,
>
> Debu
>



-- 
Gao JiaXiang
Data Analyst, GCBI 


Re: java.io.NotSerializableException about SparkStreaming

2017-10-17 Thread
Hi Shengshan,

In first code, ‘newAPIJobConfiguration’ is sharing across all rdds. So, it
should be serializable.

In second code, each rdd creates a new ‘mytest_config’ object and an
individual ‘newAPIJobConfiguration’ instead of sharing the same object. So
it can be non-serializable.

If it’s possible, maybe you can try to save the result of
mydata.foreachRDD(…) instead of each rdd like

val result = mydata.foreachRDD( rdd => {
  val json_rdd = rdd.map(Json.parse _
).map(_.validate[Scan]).map(Scan.transformScanRestult
_).filter(_.nonEmpty)
  .map(_.get).map(Scan.convertForHbase _ )

result.write.save(...)

​

On Tue, Oct 17, 2017 at 7:00 PM, Shengshan Zhang  wrote:

> Hello guys!
> java.io.NotSerializableException troubles me a lot when i process data
> with spark.
> ```
> val hbase_conf = HBaseConfiguration.create()
> hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
> hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com,
> hadoop-zk1.s.qima-inc.com,hadoop-zk2.s.qima-inc.com")
> val newAPIJobConfiguration = Job.getInstance(hbase_conf);
> newAPIJobConfiguration.getConfiguration().set(
> TableOutputFormat.OUTPUT_TABLE, "mytest_table");
> newAPIJobConfiguration.setOutputFormatClass(classOf[
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat[
> ImmutableBytesWritable]])
> 
> newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir",
> "/tmp")
> // TODO: 这里的代码写的不是很优雅,考虑重构这部分OUTPUT代码
> mydata.foreachRDD( rdd => {
>   val json_rdd = rdd.map(Json.parse _ 
> ).map(_.validate[Scan]).map(Scan.transformScanRestult
> _).filter(_.nonEmpty)
>   .map(_.get).map(Scan.convertForHbase _ )
>   json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.
> getConfiguration)
> })
> ```
>
> However it fails cause of *java.io
> .NotSerializableException *and follow is error info
> 17/10/16 18:56:50 ERROR Utils: Exception encountered
> java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job
> at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>
>
> *So i change  my code as follows *
>
> object mytest_config{
> val hbase_conf = HBaseConfiguration.create()
> hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
> hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2")
> val newAPIJobConfiguration = Job.getInstance(hbase_conf);
> 
> newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, 
> "mytest_table");
> 
> newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
> 
> newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir",
>  "/tmp")
>   }
> mydata.foreachRDD( rdd => {
>   val json_rdd = rdd.map(Json.parse _ 
> ).map(_.validate[Scan]).map(Scan.transformScanRestult _).filter(_.nonEmpty)
>   .map(_.get).map(Scan.convertForHbase _ )
>   
> json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration)
> })
>
> And this could work!
> Somebody got any ideas why this work , and what is the recommended way
> officially?
>
>
>
> 【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
> 
>
>
> 【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
> 
>
>
> 【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
> 
>



-- 
Gao JiaXiang
Data Analyst, GCBI