private HTable table;

You should declare table variable within apply() method.

BTW which hbase release are you using ?

I see you implement caching yourself. You can make use of the following
HTable method:

  public void setWriteBufferSize(long writeBufferSize) throws IOException {

Cheers

On Sun, Jun 21, 2015 at 11:16 PM, Nishant Patel <nishant.k.pa...@gmail.com>
wrote:

> Hi,
>
> Please find code as below.
>
> dataFrame
>                 .foreachPartition(new
> AbstractFunction1<scala.collection.Iterator<Row>, BoxedUnit>() {
>
>                     private HTable table;
>
>                     private char ROWKEY_SEPERATOR = '\u0000';
>
>                     public BoxedUnit apply(scala.collection.Iterator<Row>
> rows) {
>
>                         Configuration config = HBaseConfiguration.create();
>
>                         config.set(
>                                 "hbase.zookeeper.quorum",
>                                 "????");
>                         config.set("hbase.zookeeper.property.clientPort",
>                                 "???");
>                         config.set("zookeeper.znode.parent", "????");
>
>                         try {
>                             table = new HTable(config, "table_name");
>                         } catch (Exception e) {
>                             throw new RuntimeException(e);
>                         }
>
>                         List<Put> puts = new ArrayList<Put>();
>                         try {
>                             while (rows.hasNext()) {
>                                 Row row = rows.next();
>                                 Map<String, Object> map = new
> HashMap<String, Object>();
>                                 String[] fieldNames =
> row.schema().fieldNames();
>                                 for (int i = 0; i < fieldNames.length;
> i++) {
>                                     map.put(fieldNames[i].toUpperCase(),
>                                             row.get(i));
>                                 }
>                                 puts.add(mapToPut(map));
>                                 if (puts.size() >= 500) {
>                                     table.put(puts);
>                                     puts.clear();
>                                 }
>
>                             }
>                             table.put(puts);
>                         } catch (Exception e) {
>                             throw new RuntimeException(e);
>                         }
>                         return BoxedUnit.UNIT;
>                     }
>
> private Put mapToPut(Map<String, Object> map) throws IOException {
>                         try {
>                             Put put = new Put(getRowKey(map));
>                             String value = null;
>                             for (String key : map.keySet()) {
>                                 value = (map.get(key) == null ? "" :
> map.get(key).toString());
>                                 put.add(Bytes.toBytes("0"),
> Bytes.toBytes(key),
>                                         Bytes.toBytes(value));
>                             }
>                             return put;
>                         } catch (Exception e) {
>                             e.printStackTrace();
>                             throw e;
>                         }
>                     }
>
>                     private byte[] getRowKey(Map<String, Object> map) {
>
>                         StringBuilder builder = new StringBuilder();
>                         return Bytes.toBytes(builder.toString());
>                     }
>
>                 });
>
> Regards,
> Nishant
>
> On Mon, Jun 22, 2015 at 11:08 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Can you show us the code for loading Hive into hbase ?
>>
>> There shouldn't be 'return' statement in that code.
>>
>> Cheers
>>
>>
>>
>> On Jun 20, 2015, at 10:10 PM, Nishant Patel <nishant.k.pa...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I am loading data from Hive table to Hbase after doing some manipulation.
>>
>> I am getting error as 'Task not Serializable'.
>>
>> My code is as below.
>>
>> public class HiveToHbaseLoader implements Serializable {
>>
>>     public static void main(String[] args) throws Exception {
>>
>>         String hbaseTableName = args[0];
>>         String hiveQuery = args[1];
>>
>>         SparkConf conf = new SparkConf().setAppName("Hive to Hbase
>> Loader")
>>                 .setMaster("????");
>>         JavaSparkContext sc = new JavaSparkContext(conf);
>>
>>         HiveContext hiveContext = new HiveContext(sc.sc());
>>
>>         hiveContext.setConf("hive.metastore.uris",
>>                 "?????");
>>
>>         DataFrame dataFrame = hiveContext.sql(hiveQuery);
>>
>>         dataFrame
>>                 .foreachPartition(new
>> AbstractFunction1<scala.collection.Iterator<Row>, BoxedUnit>() {
>>
>> //Logic to load row from hive to Hbase.
>>
>> }
>> }}
>>
>>
>> Getting error as below.
>>
>>
>> Exception in thread "main" org.apache.spark.SparkException: Task not
>> serializable
>>     at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>     at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
>>     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:805)
>>     at
>> org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:875)
>>     at com.philips.bda.HiveToHbaseLoader.main(HiveToHbaseLoader.java:46)
>> Caused by: java.io.NotSerializableException:
>> com.philips.bda.HiveToHbaseLoader$1
>> Serialization stack:
>>     - object not serializable (class:
>> com.philips.bda.HiveToHbaseLoader$1, value: <function1>)
>>     at
>> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
>>     at
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>>     at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
>>     at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>     ... 5 more
>>
>>
>> --
>> Regards,
>> Nishant
>>
>>
>
>
> --
> Regards,
> Nishant Patel
>
>

Reply via email to