this work!
but can you explain why should use like this?
--
qiaou
已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
在 2014年11月12日 星期三,下午3:18,Shixiong Zhu 写道:
You need to create a new configuration for each RDD. Therefore, val
hbaseConf = HBaseConfigUtil.getHBaseConfiguration should be changed to val
hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration)
Best Regards,
Shixiong Zhu
2014-11-12 14:53 GMT+08:00 qiaou qiaou8...@gmail.com
(mailto:qiaou8...@gmail.com):
ok here is the code
def hbaseQuery:(String)=RDD[Result] = {
val generateRdd = (area:String)={
val startRowKey = s$area${RowKeyUtils.convertToHex(startId,
10)}
val stopRowKey = s$area${RowKeyUtils.convertToHex(endId,
10)}
println(sstartRowKey:${startRowKey})
println(sstopRowKey :${stopRowKey})
val scan = new Scan()
scan.setStartRow(Bytes.toBytes(startRowKey))
scan.setStopRow(Bytes.toBytes(stopRowKey))
val filterList: FilterList = new FilterList()
if (appKey != null !appKey.equals(_)) {
val appKeyFilter: SingleColumnValueFilter =
new SingleColumnValueFilter(Bytes.toBytes(clientInfo),
Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey))
filterList.addFilter(appKeyFilter)
}
if (imei != null !imei.equals(_)) {
val imeiFilter: SingleColumnValueFilter =
new SingleColumnValueFilter(Bytes.toBytes(clientInfo),
Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei))
filterList.addFilter(imeiFilter)
}
if (filterList.getFilters != null filterList.getFilters.size()
0) {
scan.setFilter(filterList)
}
scan.setCaching(1)
val hbaseConf = HBaseConfigUtil.getHBaseConfiguration
hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack)
hbaseConf.set(TableInputFormat.SCAN,
Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
SparkUtil.getSingleSparkContext()
.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result]).map {
case (_: ImmutableBytesWritable, result: Result) = {
result
}
}
}
return generateRdd
}
--
qiaou
已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道:
Could you provide the code of hbaseQuery? It maybe doesn't support to
execute in parallel.
Best Regards,
Shixiong Zhu
2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com
(mailto:qiaou8...@gmail.com):
Hi:
I got a problem with using the union method of RDD
things like this
I get a function like
def hbaseQuery(area:string):RDD[Result]= ???
when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it
returns 0
however when use like this
sc.parallize(hbaseQuery('aa’).collect.toList :::
hbaseQuery(’bb’).collect.toList).count() it return the right value
obviously i have got an action after my transformation action ,but why
it did not work
fyi
--
qiaou
已使用 Sparrow (http://www.sparrowmailapp.com/?sig)