Re: How did the RDD.union work

2014-11-11 Thread Shixiong Zhu
The `conf` object will be sent to other nodes via Broadcast.

Here is the scaladoc of Broadcast:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.broadcast.Broadcast

In addition, the object v should not be modified after it is broadcast in
order to ensure that all nodes get the same value of the broadcast variable
(e.g. if the variable is shipped to a new node later).

Best Regards,
Shixiong Zhu

2014-11-12 15:20 GMT+08:00 qiaou :

>  this work!
> but can you explain why should use like this?
>
> --
> qiaou
> 已使用 Sparrow 
>
> 在 2014年11月12日 星期三,下午3:18,Shixiong Zhu 写道:
>
> You need to create a new configuration for each RDD. Therefore, "val
> hbaseConf = HBaseConfigUtil.getHBaseConfiguration" should be changed to "val
> hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration)"
>
> Best Regards,
> Shixiong Zhu
>
> 2014-11-12 14:53 GMT+08:00 qiaou :
>
>  ok here is the code
>
> def hbaseQuery:(String)=>RDD[Result] = {
>   val generateRdd = (area:String)=>{
> val startRowKey = s"$area${RowKeyUtils.convertToHex(startId,
> 10)}"
> val stopRowKey = s"$area${RowKeyUtils.convertToHex(endId,
> 10)}"
> println(s"startRowKey:${startRowKey}")
> println(s"stopRowKey :${stopRowKey}")
>
> val scan = new Scan()
> scan.setStartRow(Bytes.toBytes(startRowKey))
> scan.setStopRow(Bytes.toBytes(stopRowKey))
> val filterList: FilterList = new FilterList()
> if (appKey != null && !appKey.equals("_")) {
>   val appKeyFilter: SingleColumnValueFilter =
> new SingleColumnValueFilter(Bytes.toBytes("clientInfo"),
> Bytes.toBytes("optKey"), CompareOp.EQUAL, Bytes.toBytes(appKey))
>   filterList.addFilter(appKeyFilter)
> }
> if (imei != null && !imei.equals("_")) {
>   val imeiFilter: SingleColumnValueFilter =
> new SingleColumnValueFilter(Bytes.toBytes("clientInfo"),
> Bytes.toBytes("optImei"), CompareOp.EQUAL, Bytes.toBytes(imei))
>   filterList.addFilter(imeiFilter)
> }
> if (filterList.getFilters != null && filterList.getFilters.size()
> > 0) {
>   scan.setFilter(filterList)
> }
> scan.setCaching(1)
>
> val hbaseConf = HBaseConfigUtil.getHBaseConfiguration
> hbaseConf.set(TableInputFormat.INPUT_TABLE, "asrLogFeedBack")
> hbaseConf.set(TableInputFormat.SCAN,
> Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
>
> SparkUtil.getSingleSparkContext()
>   .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
> classOf[ImmutableBytesWritable], classOf[Result]).map {
>   case (_: ImmutableBytesWritable, result: Result) => {
> result
>   }
> }
>   }
>   return generateRdd
> }
>
> --
> qiaou
> 已使用 Sparrow 
>
> 在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道:
>
> Could you provide the code of hbaseQuery? It maybe doesn't support to
> execute in parallel.
>
> Best Regards,
> Shixiong Zhu
>
> 2014-11-12 14:32 GMT+08:00 qiaou :
>
>  Hi:
> I got a problem with using the union method of RDD
> things like this
> I get a function like
> def hbaseQuery(area:string):RDD[Result]= ???
> when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it
> returns 0
> however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList
> ::: hbaseQuery(’bb’).collect.toList).count() it return the right value
> obviously i have got an action after my transformation action ,but why it
> did not work
> fyi
>
> --
> qiaou
> 已使用 Sparrow 
>
>
>
>
>
>


Re: How did the RDD.union work

2014-11-11 Thread Shixiong Zhu
You need to create a new configuration for each RDD. Therefore, "val
hbaseConf = HBaseConfigUtil.getHBaseConfiguration" should be changed to "val
hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration)"

Best Regards,
Shixiong Zhu

2014-11-12 14:53 GMT+08:00 qiaou :

>  ok here is the code
>
> def hbaseQuery:(String)=>RDD[Result] = {
>   val generateRdd = (area:String)=>{
> val startRowKey = s"$area${RowKeyUtils.convertToHex(startId,
> 10)}"
> val stopRowKey = s"$area${RowKeyUtils.convertToHex(endId,
> 10)}"
> println(s"startRowKey:${startRowKey}")
> println(s"stopRowKey :${stopRowKey}")
>
> val scan = new Scan()
> scan.setStartRow(Bytes.toBytes(startRowKey))
> scan.setStopRow(Bytes.toBytes(stopRowKey))
> val filterList: FilterList = new FilterList()
> if (appKey != null && !appKey.equals("_")) {
>   val appKeyFilter: SingleColumnValueFilter =
> new SingleColumnValueFilter(Bytes.toBytes("clientInfo"),
> Bytes.toBytes("optKey"), CompareOp.EQUAL, Bytes.toBytes(appKey))
>   filterList.addFilter(appKeyFilter)
> }
> if (imei != null && !imei.equals("_")) {
>   val imeiFilter: SingleColumnValueFilter =
> new SingleColumnValueFilter(Bytes.toBytes("clientInfo"),
> Bytes.toBytes("optImei"), CompareOp.EQUAL, Bytes.toBytes(imei))
>   filterList.addFilter(imeiFilter)
> }
> if (filterList.getFilters != null && filterList.getFilters.size()
> > 0) {
>   scan.setFilter(filterList)
> }
> scan.setCaching(1)
>
> val hbaseConf = HBaseConfigUtil.getHBaseConfiguration
> hbaseConf.set(TableInputFormat.INPUT_TABLE, "asrLogFeedBack")
> hbaseConf.set(TableInputFormat.SCAN,
> Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
>
> SparkUtil.getSingleSparkContext()
>   .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
> classOf[ImmutableBytesWritable], classOf[Result]).map {
>   case (_: ImmutableBytesWritable, result: Result) => {
> result
>   }
> }
>   }
>   return generateRdd
> }
>
> --
> qiaou
> 已使用 Sparrow 
>
> 在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道:
>
> Could you provide the code of hbaseQuery? It maybe doesn't support to
> execute in parallel.
>
> Best Regards,
> Shixiong Zhu
>
> 2014-11-12 14:32 GMT+08:00 qiaou :
>
>  Hi:
> I got a problem with using the union method of RDD
> things like this
> I get a function like
> def hbaseQuery(area:string):RDD[Result]= ???
> when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it
> returns 0
> however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList
> ::: hbaseQuery(’bb’).collect.toList).count() it return the right value
> obviously i have got an action after my transformation action ,but why it
> did not work
> fyi
>
> --
> qiaou
> 已使用 Sparrow 
>
>
>
>


Re: How did the RDD.union work

2014-11-11 Thread Shixiong Zhu
Could you provide the code of hbaseQuery? It maybe doesn't support to
execute in parallel.

Best Regards,
Shixiong Zhu

2014-11-12 14:32 GMT+08:00 qiaou :

>  Hi:
> I got a problem with using the union method of RDD
> things like this
> I get a function like
> def hbaseQuery(area:string):RDD[Result]= ???
> when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it
> returns 0
> however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList
> ::: hbaseQuery(’bb’).collect.toList).count() it return the right value
> obviously i have got an action after my transformation action ,but why it
> did not work
> fyi
>
> --
> qiaou
> 已使用 Sparrow 
>
>