private HTable table; You should declare table variable within apply() method.
BTW which hbase release are you using ? I see you implement caching yourself. You can make use of the following HTable method: public void setWriteBufferSize(long writeBufferSize) throws IOException { Cheers On Sun, Jun 21, 2015 at 11:16 PM, Nishant Patel <nishant.k.pa...@gmail.com> wrote: > Hi, > > Please find code as below. > > dataFrame > .foreachPartition(new > AbstractFunction1<scala.collection.Iterator<Row>, BoxedUnit>() { > > private HTable table; > > private char ROWKEY_SEPERATOR = '\u0000'; > > public BoxedUnit apply(scala.collection.Iterator<Row> > rows) { > > Configuration config = HBaseConfiguration.create(); > > config.set( > "hbase.zookeeper.quorum", > "????"); > config.set("hbase.zookeeper.property.clientPort", > "???"); > config.set("zookeeper.znode.parent", "????"); > > try { > table = new HTable(config, "table_name"); > } catch (Exception e) { > throw new RuntimeException(e); > } > > List<Put> puts = new ArrayList<Put>(); > try { > while (rows.hasNext()) { > Row row = rows.next(); > Map<String, Object> map = new > HashMap<String, Object>(); > String[] fieldNames = > row.schema().fieldNames(); > for (int i = 0; i < fieldNames.length; > i++) { > map.put(fieldNames[i].toUpperCase(), > row.get(i)); > } > puts.add(mapToPut(map)); > if (puts.size() >= 500) { > table.put(puts); > puts.clear(); > } > > } > table.put(puts); > } catch (Exception e) { > throw new RuntimeException(e); > } > return BoxedUnit.UNIT; > } > > private Put mapToPut(Map<String, Object> map) throws IOException { > try { > Put put = new Put(getRowKey(map)); > String value = null; > for (String key : map.keySet()) { > value = (map.get(key) == null ? "" : > map.get(key).toString()); > put.add(Bytes.toBytes("0"), > Bytes.toBytes(key), > Bytes.toBytes(value)); > } > return put; > } catch (Exception e) { > e.printStackTrace(); > throw e; > } > } > > private byte[] getRowKey(Map<String, Object> map) { > > StringBuilder builder = new StringBuilder(); > return Bytes.toBytes(builder.toString()); > } > > }); > > Regards, > Nishant > > On Mon, Jun 22, 2015 at 11:08 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Can you show us the code for loading Hive into hbase ? >> >> There shouldn't be 'return' statement in that code. >> >> Cheers >> >> >> >> On Jun 20, 2015, at 10:10 PM, Nishant Patel <nishant.k.pa...@gmail.com> >> wrote: >> >> Hi, >> >> I am loading data from Hive table to Hbase after doing some manipulation. >> >> I am getting error as 'Task not Serializable'. >> >> My code is as below. >> >> public class HiveToHbaseLoader implements Serializable { >> >> public static void main(String[] args) throws Exception { >> >> String hbaseTableName = args[0]; >> String hiveQuery = args[1]; >> >> SparkConf conf = new SparkConf().setAppName("Hive to Hbase >> Loader") >> .setMaster("????"); >> JavaSparkContext sc = new JavaSparkContext(conf); >> >> HiveContext hiveContext = new HiveContext(sc.sc()); >> >> hiveContext.setConf("hive.metastore.uris", >> "?????"); >> >> DataFrame dataFrame = hiveContext.sql(hiveQuery); >> >> dataFrame >> .foreachPartition(new >> AbstractFunction1<scala.collection.Iterator<Row>, BoxedUnit>() { >> >> //Logic to load row from hive to Hbase. >> >> } >> }} >> >> >> Getting error as below. >> >> >> Exception in thread "main" org.apache.spark.SparkException: Task not >> serializable >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) >> at >> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) >> at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) >> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:805) >> at >> org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:875) >> at com.philips.bda.HiveToHbaseLoader.main(HiveToHbaseLoader.java:46) >> Caused by: java.io.NotSerializableException: >> com.philips.bda.HiveToHbaseLoader$1 >> Serialization stack: >> - object not serializable (class: >> com.philips.bda.HiveToHbaseLoader$1, value: <function1>) >> at >> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) >> at >> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) >> at >> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) >> ... 5 more >> >> >> -- >> Regards, >> Nishant >> >> > > > -- > Regards, > Nishant Patel > >