How did the RDD.union work

2014-11-11 Thread qiaou
Hi:  
I got a problem with using the union method of RDD
things like this
I get a function like
def hbaseQuery(area:string):RDD[Result]= ???
when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns 0
however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList ::: 
hbaseQuery(’bb’).collect.toList) it return the right value  
obviously i have got an action after my transformation action ,but why it did 
not work
fyi

--  
qiaou
已使用 Sparrow (http://www.sparrowmailapp.com/?sig)



Re: How did the RDD.union work

2014-11-11 Thread Shixiong Zhu
Could you provide the code of hbaseQuery? It maybe doesn't support to
execute in parallel.

Best Regards,
Shixiong Zhu

2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com:

  Hi:
 I got a problem with using the union method of RDD
 things like this
 I get a function like
 def hbaseQuery(area:string):RDD[Result]= ???
 when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it
 returns 0
 however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList
 ::: hbaseQuery(’bb’).collect.toList).count() it return the right value
 obviously i have got an action after my transformation action ,but why it
 did not work
 fyi

 --
 qiaou
 已使用 Sparrow http://www.sparrowmailapp.com/?sig




回复: How did the RDD.union work

2014-11-11 Thread qiaou
ok here is the code

def hbaseQuery:(String)=RDD[Result] = {
  val generateRdd = (area:String)={
val startRowKey = s$area${RowKeyUtils.convertToHex(startId, 
10)}
val stopRowKey = s$area${RowKeyUtils.convertToHex(endId, 
10)}
println(sstartRowKey:${startRowKey})
println(sstopRowKey :${stopRowKey})

val scan = new Scan()
scan.setStartRow(Bytes.toBytes(startRowKey))
scan.setStopRow(Bytes.toBytes(stopRowKey))
val filterList: FilterList = new FilterList()
if (appKey != null  !appKey.equals(_)) {
  val appKeyFilter: SingleColumnValueFilter =
new SingleColumnValueFilter(Bytes.toBytes(clientInfo), 
Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey))
  filterList.addFilter(appKeyFilter)
}
if (imei != null  !imei.equals(_)) {
  val imeiFilter: SingleColumnValueFilter =
new SingleColumnValueFilter(Bytes.toBytes(clientInfo), 
Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei))
  filterList.addFilter(imeiFilter)
}
if (filterList.getFilters != null  filterList.getFilters.size()  0) {
  scan.setFilter(filterList)
}
scan.setCaching(1)

val hbaseConf = HBaseConfigUtil.getHBaseConfiguration
hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack)
hbaseConf.set(TableInputFormat.SCAN, 
Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))

SparkUtil.getSingleSparkContext()
  .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result]).map {
  case (_: ImmutableBytesWritable, result: Result) = {
result
  }
}
  }
  return generateRdd
}


--  
qiaou
已使用 Sparrow (http://www.sparrowmailapp.com/?sig)


在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道:

 Could you provide the code of hbaseQuery? It maybe doesn't support to execute 
 in parallel.
  
 Best Regards,
 Shixiong Zhu
  
  
  
  
 2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com 
 (mailto:qiaou8...@gmail.com):
  Hi:  
  I got a problem with using the union method of RDD
  things like this
  I get a function like
  def hbaseQuery(area:string):RDD[Result]= ???
  when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it 
  returns 0
  however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList 
  ::: hbaseQuery(’bb’).collect.toList).count() it return the right value  
  obviously i have got an action after my transformation action ,but why it 
  did not work
  fyi
   
  --  
  qiaou
  已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
   
  



回复: How did the RDD.union work

2014-11-11 Thread qiaou
this work!  
but can you explain why should use like this?

--  
qiaou
已使用 Sparrow (http://www.sparrowmailapp.com/?sig)


在 2014年11月12日 星期三,下午3:18,Shixiong Zhu 写道:

 You need to create a new configuration for each RDD. Therefore, val 
 hbaseConf = HBaseConfigUtil.getHBaseConfiguration should be changed to val 
 hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration)
  
 Best Regards,
 Shixiong Zhu
  
  
  
  
 2014-11-12 14:53 GMT+08:00 qiaou qiaou8...@gmail.com 
 (mailto:qiaou8...@gmail.com):
  ok here is the code
   
  def hbaseQuery:(String)=RDD[Result] = {
val generateRdd = (area:String)={
  val startRowKey = s$area${RowKeyUtils.convertToHex(startId, 
  10)}
  val stopRowKey = s$area${RowKeyUtils.convertToHex(endId, 
  10)}
  println(sstartRowKey:${startRowKey})
  println(sstopRowKey :${stopRowKey})
   
  val scan = new Scan()
  scan.setStartRow(Bytes.toBytes(startRowKey))
  scan.setStopRow(Bytes.toBytes(stopRowKey))
  val filterList: FilterList = new FilterList()
  if (appKey != null  !appKey.equals(_)) {
val appKeyFilter: SingleColumnValueFilter =
  new SingleColumnValueFilter(Bytes.toBytes(clientInfo), 
  Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey))
filterList.addFilter(appKeyFilter)
  }
  if (imei != null  !imei.equals(_)) {
val imeiFilter: SingleColumnValueFilter =
  new SingleColumnValueFilter(Bytes.toBytes(clientInfo), 
  Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei))
filterList.addFilter(imeiFilter)
  }
  if (filterList.getFilters != null  filterList.getFilters.size()  
  0) {
scan.setFilter(filterList)
  }
  scan.setCaching(1)
   
  val hbaseConf = HBaseConfigUtil.getHBaseConfiguration
  hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack)
  hbaseConf.set(TableInputFormat.SCAN, 
  Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
   
  SparkUtil.getSingleSparkContext()
.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
  classOf[ImmutableBytesWritable], classOf[Result]).map {
case (_: ImmutableBytesWritable, result: Result) = {
  result
}
  }
}
return generateRdd
  }
   
   
  --  
  qiaou
  已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
   
   
  在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道:
   
   Could you provide the code of hbaseQuery? It maybe doesn't support to 
   execute in parallel.

   Best Regards,
   Shixiong Zhu




   2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com 
   (mailto:qiaou8...@gmail.com):
Hi:  
I got a problem with using the union method of RDD
things like this
I get a function like
def hbaseQuery(area:string):RDD[Result]= ???
when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it 
returns 0
however when use like this  
sc.parallize(hbaseQuery('aa’).collect.toList ::: 
hbaseQuery(’bb’).collect.toList).count() it return the right value  
obviously i have got an action after my transformation action ,but why 
it did not work
fyi
 
--  
qiaou
已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
 

   
  



Re: How did the RDD.union work

2014-11-11 Thread Shixiong Zhu
The `conf` object will be sent to other nodes via Broadcast.

Here is the scaladoc of Broadcast:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.broadcast.Broadcast

In addition, the object v should not be modified after it is broadcast in
order to ensure that all nodes get the same value of the broadcast variable
(e.g. if the variable is shipped to a new node later).

Best Regards,
Shixiong Zhu

2014-11-12 15:20 GMT+08:00 qiaou qiaou8...@gmail.com:

  this work!
 but can you explain why should use like this?

 --
 qiaou
 已使用 Sparrow http://www.sparrowmailapp.com/?sig

 在 2014年11月12日 星期三,下午3:18,Shixiong Zhu 写道:

 You need to create a new configuration for each RDD. Therefore, val
 hbaseConf = HBaseConfigUtil.getHBaseConfiguration should be changed to val
 hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration)

 Best Regards,
 Shixiong Zhu

 2014-11-12 14:53 GMT+08:00 qiaou qiaou8...@gmail.com:

  ok here is the code

 def hbaseQuery:(String)=RDD[Result] = {
   val generateRdd = (area:String)={
 val startRowKey = s$area${RowKeyUtils.convertToHex(startId,
 10)}
 val stopRowKey = s$area${RowKeyUtils.convertToHex(endId,
 10)}
 println(sstartRowKey:${startRowKey})
 println(sstopRowKey :${stopRowKey})

 val scan = new Scan()
 scan.setStartRow(Bytes.toBytes(startRowKey))
 scan.setStopRow(Bytes.toBytes(stopRowKey))
 val filterList: FilterList = new FilterList()
 if (appKey != null  !appKey.equals(_)) {
   val appKeyFilter: SingleColumnValueFilter =
 new SingleColumnValueFilter(Bytes.toBytes(clientInfo),
 Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey))
   filterList.addFilter(appKeyFilter)
 }
 if (imei != null  !imei.equals(_)) {
   val imeiFilter: SingleColumnValueFilter =
 new SingleColumnValueFilter(Bytes.toBytes(clientInfo),
 Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei))
   filterList.addFilter(imeiFilter)
 }
 if (filterList.getFilters != null  filterList.getFilters.size()
  0) {
   scan.setFilter(filterList)
 }
 scan.setCaching(1)

 val hbaseConf = HBaseConfigUtil.getHBaseConfiguration
 hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack)
 hbaseConf.set(TableInputFormat.SCAN,
 Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))

 SparkUtil.getSingleSparkContext()
   .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
 classOf[ImmutableBytesWritable], classOf[Result]).map {
   case (_: ImmutableBytesWritable, result: Result) = {
 result
   }
 }
   }
   return generateRdd
 }

 --
 qiaou
 已使用 Sparrow http://www.sparrowmailapp.com/?sig

 在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道:

 Could you provide the code of hbaseQuery? It maybe doesn't support to
 execute in parallel.

 Best Regards,
 Shixiong Zhu

 2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com:

  Hi:
 I got a problem with using the union method of RDD
 things like this
 I get a function like
 def hbaseQuery(area:string):RDD[Result]= ???
 when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it
 returns 0
 however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList
 ::: hbaseQuery(’bb’).collect.toList).count() it return the right value
 obviously i have got an action after my transformation action ,but why it
 did not work
 fyi

 --
 qiaou
 已使用 Sparrow http://www.sparrowmailapp.com/?sig