[ 
https://issues.apache.org/jira/browse/HBASE-22213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815744#comment-16815744
 ] 

Jean-Marc Spaggiari edited comment on HBASE-22213 at 4/11/19 7:53 PM:
----------------------------------------------------------------------

3 things:
1) I'm unable to build hbase-connectors.
2) It might be doable to call the Scala BulkLoadPartioner directly from the 
Java code. Constructor is not useful but it should work
3) Below is a work Java version of it, with a useful easy to use constructor.

Closing this Jira for now.


{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE file distributed with this work
 * for additional information regarding copyright ownership. The ASF licenses 
this file to You under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License. You may 
obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
License for the specific language governing permissions and limitations
 * under the License.
 */

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.spark.ByteArrayWrapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.Partitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BulkLoadPartitioner extends Partitioner {

  /**
   * 
   */
  private static final long serialVersionUID = 1994698119904772184L;
  private byte[][] startKeys;
  private static final Logger LOG = 
LoggerFactory.getLogger(BulkLoadPartitioner.class);

  public BulkLoadPartitioner(TableName tableName, Configuration configuration) {
    try {
      this.startKeys = 
ConnectionFactory.createConnection(HBaseConfiguration.create(HBaseConfiguration.create(configuration))).getRegionLocator(tableName).getStartKeys();
    } catch (IOException e) {
      LOG.error(e.toString(), e);
    }
  }

  @Override
  public int getPartition(Object key) {
    byte[] keyBytes = null;
    if (key instanceof ImmutableBytesWritable) {
      keyBytes = ((ImmutableBytesWritable) key).get();
    }
    if (key instanceof byte[]) {
      keyBytes = (byte[]) key;
    }
    if (key instanceof ByteArrayWrapper) {
      keyBytes = ((ByteArrayWrapper) key).value();
    }
    // Only one region return 0
    if (startKeys.length == 1) {
      return 0;
    }
    for (int i = startKeys.length - 1; i >= 0; i--) {
      if (Bytes.compareTo(startKeys[i], keyBytes) <= 0) {
        return i;
      }
    }
    // if above fails to find start key that match we need to return
    // something
    return 0;

  }

  @Override
  public int numPartitions() {
    // when table not exist, startKeys = Byte[0][]
    return Math.max(1, startKeys.length);
  }

}

{code}



was (Author: jmspaggi):
3 things:
1) I'm unable to build hbase-connectors.
2) It might be doable to call the Scala BulkLoadPartioner directly from the 
Java code. Constructor is not useful but it should work
3) Below is a work Java version of it, with a useful easy to use constructor.

Closing this Jira for now.


{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE file distributed with this work
 * for additional information regarding copyright ownership. The ASF licenses 
this file to You under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License. You may 
obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
License for the specific language governing permissions and limitations
 * under the License.
 */

package org.spaggiari.othello.spark;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.spark.ByteArrayWrapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.Partitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BulkLoadPartitioner extends Partitioner {

  /**
   * 
   */
  private static final long serialVersionUID = 1994698119904772184L;
  private byte[][] startKeys;
  private static final Logger LOG = 
LoggerFactory.getLogger(BulkLoadPartitioner.class);

  public BulkLoadPartitioner(TableName tableName, Configuration configuration) {
    try {
      this.startKeys = 
ConnectionFactory.createConnection(HBaseConfiguration.create(HBaseConfiguration.create(configuration))).getRegionLocator(tableName).getStartKeys();
    } catch (IOException e) {
      LOG.error(e.toString(), e);
    }
  }

  @Override
  public int getPartition(Object key) {
    byte[] keyBytes = null;
    if (key instanceof ImmutableBytesWritable) {
      keyBytes = ((ImmutableBytesWritable) key).get();
    }
    if (key instanceof byte[]) {
      keyBytes = (byte[]) key;
    }
    if (key instanceof ByteArrayWrapper) {
      keyBytes = ((ByteArrayWrapper) key).value();
    }
    // Only one region return 0
    if (startKeys.length == 1) {
      return 0;
    }
    for (int i = startKeys.length - 1; i >= 0; i--) {
      if (Bytes.compareTo(startKeys[i], keyBytes) <= 0) {
        return i;
      }
    }
    // if above fails to find start key that match we need to return
    // something
    return 0;

  }

  @Override
  public int numPartitions() {
    // when table not exist, startKeys = Byte[0][]
    return Math.max(1, startKeys.length);
  }

}

{code}


> Create a Java based BulkLoadPartitioner
> ---------------------------------------
>
>                 Key: HBASE-22213
>                 URL: https://issues.apache.org/jira/browse/HBASE-22213
>             Project: HBase
>          Issue Type: New Feature
>    Affects Versions: 2.1.4
>            Reporter: Jean-Marc Spaggiari
>            Assignee: Jean-Marc Spaggiari
>            Priority: Minor
>
> We have a scala based partitionner, but not all projects are build in Scala. 
> We should provide a Java based version of it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to