Of course, I could create a connection in
val result = rdd.map(line => {
val conf = HBaseConfiguration.create
val connection = HConnectionManager.createConnection(conf)
val table = connection.getTable("user")
...
table.close()
connection.close()
}
but that would be too slow, which is also the reason I share conf and
connection in Utilobject.
Maybe I did need a shutdown hook as the Javadoc says.
Thank you!
2014-10-17 12:18 GMT+08:00 Ted Yu <[email protected]>:
> Looking at Apache 0.98 code, you can follow the example in the class
> javadoc (line 144 of HConnectionManager.java):
>
> * HTableInterface table = connection.getTable("table1");
> * try {
> * // Use the table as needed, for a single operation and a single thread
> * } finally {
> * table.close();
> * connection.close();
> * }
>
> Cheers
>
> On Thu, Oct 16, 2014 at 9:03 PM, Fengyun RAO <[email protected]> wrote:
>
>> Thanks, Ted,
>>
>> We use CDH 5.1 and the HBase version is 0.98.1-cdh5.1.0, in which the
>> javadoc of HConnectionManager.java still recommends shutdown hook.
>>
>> I look into val table = Util.Connection.getTable("user"), and find it
>> didn't invoke
>>
>> public HTable(Configuration conf, final byte[] tableName, final
>> ExecutorService pool)
>>
>> but
>>
>> public HTable(TableName tableName, final HConnection connection,
>> final ExecutorService pool) throws IOException {
>> if (connection == null || connection.isClosed()) {
>> throw new IllegalArgumentException("Connection is null or closed.");
>> }
>> this.tableName = tableName;
>> this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;
>> this.connection = connection;
>> this.configuration = connection.getConfiguration();
>> this.pool = pool;
>>
>> this.finishSetup();
>> }
>>
>> in which cleanupConnectionOnClose is false
>>
>> 2014-10-16 22:51 GMT+08:00 Ted Yu <[email protected]>:
>>
>>> Which hbase release are you using ?
>>>
>>> Let me refer to 0.94 code hbase.
>>>
>>> Take a look at the following method
>>> in src/main/java/org/apache/hadoop/hbase/client/HTable.java :
>>>
>>> public void close() throws IOException {
>>> ...
>>> if (cleanupConnectionOnClose) {
>>> if (this.connection != null) {
>>> this.connection.close();
>>>
>>> When Connection.getTable() is called, the following is invoked:
>>> public HTable(Configuration conf, final byte[] tableName, final
>>> ExecutorService pool)
>>> which sets cleanupConnectionOnClose to true.
>>>
>>> w.r.t. javadoc, the paragraph on shutdown hook is
>>> in HConnectionManager.java of 0.94
>>> You don't need to use shutdown hook for 0.94+
>>>
>>> Cheers
>>>
>>> On Wed, Oct 15, 2014 at 11:41 PM, Fengyun RAO <[email protected]>
>>> wrote:
>>>
>>>> I may have misunderstood your point.
>>>>
>>>> val result = rdd.map(line => {
>>>> val table = Util.Connection.getTable("user")
>>>> ...
>>>> table.close()
>>>> }
>>>>
>>>> Did you mean this is enough, and there’s no need to call
>>>> Util.Connection.close(),
>>>> or HConnectionManager.deleteAllConnections()?
>>>>
>>>> Where is the documentation that statesHconnectionManager would release
>>>> underlying connection automatically?
>>>> If that’s true, maybe the Javadoc which recommends a shutdown hook
>>>> needs update
>>>>
>>>>
>>>> 2014-10-16 14:20 GMT+08:00 Fengyun RAO <[email protected]>:
>>>>
>>>>> Thanks, Ted.
>>>>> Util.Connection.close() should be called only once, so it can NOT be
>>>>> in a map function
>>>>>
>>>>> val result = rdd.map(line => {
>>>>> val table = Util.Connection.getTable("user")
>>>>> ...
>>>>> Util.Connection.close()
>>>>> }
>>>>>
>>>>> As you mentioned:
>>>>>
>>>>> Calling table.close() is the recommended approach.
>>>>> HConnectionManager does reference counting. When all references to the
>>>>> underlying connection are gone, connection would be released.
>>>>>
>>>>> Yes, we should call table.close(), but it won’t remove HConnection in
>>>>> HConnectionManager which is a HConnection pool.
>>>>> As I look into the HconnectionManager Javadoc, it seems I have to
>>>>> implement a shutdown hook
>>>>>
>>>>> * <p>Cleanup used to be done inside in a shutdown hook. On startup we'd
>>>>> * register a shutdown hook that called {@link #deleteAllConnections()}
>>>>> * on its way out but the order in which shutdown hooks run is not
>>>>> defined so
>>>>> * were problematic for clients of HConnection that wanted to register
>>>>> their
>>>>> * own shutdown hooks so we removed ours though this shifts the onus for
>>>>> * cleanup to the client.
>>>>>
>>>>>
>>>>>
>>>>> 2014-10-15 22:31 GMT+08:00 Ted Yu <[email protected]>:
>>>>>
>>>>>> Pardon me - there was typo in previous email.
>>>>>>
>>>>>> Calling table.close() is the recommended approach.
>>>>>> HConnectionManager does reference counting. When all references to
>>>>>> the underlying connection are gone, connection would be released.
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Wed, Oct 15, 2014 at 7:13 AM, Ted Yu <[email protected]> wrote:
>>>>>>
>>>>>>> Have you tried the following ?
>>>>>>>
>>>>>>> val result = rdd.map(line => { val table =
>>>>>>> Util.Connection.getTable("user")
>>>>>>> ...
>>>>>>> Util.Connection.close() }
>>>>>>>
>>>>>>> On Wed, Oct 15, 2014 at 6:09 AM, Fengyun RAO <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> In order to share an HBase connection pool, we create an object
>>>>>>>>
>>>>>>>> Object Util {
>>>>>>>> val HBaseConf = HBaseConfiguration.create
>>>>>>>> val Connection= HConnectionManager.createConnection(HBaseConf)
>>>>>>>> }
>>>>>>>>
>>>>>>>> which would be shared among tasks on the same executor. e.g.
>>>>>>>>
>>>>>>>> val result = rdd.map(line => {
>>>>>>>> val table = Util.Connection.getTable("user")
>>>>>>>> ...
>>>>>>>> }
>>>>>>>>
>>>>>>>> However, we don’t how to close the Util.Connection.
>>>>>>>> If we write Util.Connection.close() in the main function,
>>>>>>>> it’ll only run on the driver, not the executor.
>>>>>>>>
>>>>>>>> So, How to make sure every Connection closed before exist?
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>