Hi,
I'm not familiar with PhoenixBolt, but a Connection object is not thread
safe (the internal MutateState object is owned by a single connection). If
you ensure that the same connection is not used by multiple threads, then
this will not occur. One thing to keep in mind with Phoenix is that
connections are "cheap", as it's an embedded driver. When you open a new
connection, you're merely instantiating a few object - the underlying
HConnection to your HBase cluster is shared across all connections for the
same client JVM. Thus, a PhoenixConnection does not need to be pooled, but
can be created as needed with each access.
Thanks,
James


On Wed, Apr 16, 2014 at 11:07 AM, Justin Workman
<[email protected]>wrote:

> I work with Karthik, and just realized this was posted to the old user
> group. Forwarding to the new Apache user group. Any input is appreciated!
>
> Thanks
>
>
> On Wednesday, April 16, 2014 11:13:44 AM UTC-6, nandanavanam karthik wrote:
>>
>> Hi ,
>>
>> We are building a storm application that reads records from Kafka Queue
>> and writes it to Phoenix database. It works perfectly fine if the number of
>> threads for PhoenixBolt are 1. If we increase the number, We are getting
>> the following error:
>>
>> 21993 [Thread-35-requestsLogBolt] ERROR backtype.storm.util - Async loop
>> died!
>> java.lang.RuntimeException: java.lang.IllegalArgumentException:
>> java.lang.IllegalArgumentException: java.util.
>> ConcurrentModificationException
>>     at 
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
>> ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
>>     at 
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
>> ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
>>     at 
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
>> ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
>>     at backtype.storm.daemon.executor$eval5170$fn__5171$fn_
>> _5183$fn__5230.invoke(executor.clj:745) ~[na:na]
>>     at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433)
>> ~[na:na]
>>     at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>>     at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>> Caused by: java.lang.IllegalArgumentException: 
>> java.lang.IllegalArgumentException:
>> java.util.ConcurrentModificationException
>>     at 
>> com.overstock.storm.app.RequestsLogBolt.upsertRecords(RequestsLogBolt.java:51)
>> ~[classes/:na]
>>     at 
>> com.overstock.storm.app.RequestsLogBolt.execute(RequestsLogBolt.java:33)
>> ~[classes/:na]
>>     at 
>> backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
>> ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
>>     at backtype.storm.daemon.executor$eval5170$fn__5171$
>> tuple_action_fn__5173.invoke(executor.clj:630) ~[na:na]
>>     at 
>> backtype.storm.daemon.executor$mk_task_receiver$fn__5091.invoke(executor.clj:398)
>> ~[na:na]
>>     at 
>> backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58)
>> ~[na:na]
>>     at 
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104)
>> ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
>>     ... 6 common frames omitted
>> Caused by: java.lang.IllegalArgumentException: java.util.
>> ConcurrentModificationException
>>     at com.overstock.hadoop.phoenix.utils.PhoenixConnection.
>> rollback(PhoenixConnection.java:115) ~[phoenix-utils-0.2-SNAPSHOT.jar:na]
>>     at com.overstock.hadoop.phoenix.utils.PhoenixConnection.
>> access$100(PhoenixConnection.java:22) ~[phoenix-utils-0.2-SNAPSHOT.
>> jar:na]
>>     at 
>> com.overstock.hadoop.phoenix.utils.PhoenixConnection$5.run(PhoenixConnection.java:214)
>> ~[phoenix-utils-0.2-SNAPSHOT.jar:na]
>>     at 
>> com.overstock.hadoop.phoenix.utils.PhoenixConnection$5.run(PhoenixConnection.java:205)
>> ~[phoenix-utils-0.2-SNAPSHOT.jar:na]
>>     at java.security.AccessController.doPrivileged(Native Method)
>> ~[na:1.7.0_51]
>>     at javax.security.auth.Subject.doAs(Subject.java:356) ~[na:1.7.0_51]
>>     at org.apache.hadoop.security.UserGroupInformation.doAs(
>> UserGroupInformation.java:1388) ~[hadoop-common-2.0.0-cdh4.3.0.jar:na]
>>     at 
>> com.overstock.hadoop.phoenix.utils.PhoenixConnection.executeBatch(PhoenixConnection.java:205)
>> ~[phoenix-utils-0.2-SNAPSHOT.jar:na]
>>     at 
>> com.overstock.storm.app.RequestsLogBolt.upsertRecords(RequestsLogBolt.java:46)
>> ~[classes/:na]
>>     ... 12 common frames omitted
>> Caused by: java.util.ConcurrentModificationException: null
>>     at java.util.HashMap$HashIterator.remove(HashMap.java:944)
>> ~[na:1.7.0_51]
>>     at 
>> org.apache.phoenix.execute.MutationState.commit(MutationState.java:438)
>> ~[phoenix-3.0.0-incubating-incubating.jar:3.0.0-incubating]
>>     at 
>> org.apache.phoenix.jdbc.PhoenixConnection.commit(PhoenixConnection.java:351)
>> ~[phoenix-3.0.0-incubating-incubating.jar:3.0.0-incubating]
>>     at 
>> org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:229)
>> ~[phoenix-3.0.0-incubating-incubating.jar:3.0.0-incubating]
>>     at 
>> org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:185)
>> ~[phoenix-3.0.0-incubating-incubating.jar:3.0.0-incubating]
>>     at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(
>> PhoenixPreparedStatement.java:146) ~[phoenix-3.0.0-incubating-
>> incubating.jar:3.0.0-incubating]
>>     at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(
>> PhoenixPreparedStatement.java:151) ~[phoenix-3.0.0-incubating-
>> incubating.jar:3.0.0-incubating]
>>     at com.jolbox.bonecp.PreparedStatementHandle.execute(
>> PreparedStatementHandle.java:140) ~[bonecp-0.8.0.RELEASE.jar:na]
>>     at 
>> com.overstock.hadoop.phoenix.utils.PhoenixConnection$5.run(PhoenixConnection.java:210)
>> ~[phoenix-utils-0.2-SNAPSHOT.jar:na]
>>     ... 18 common frames omitted
>>
>> We are using BoneCP for connection pool. Initially, We thought it is the
>> problem of BoneCP/PhoenixConnection. To prove it, I have written an
>> executor service that writes records to Phoenix table with 10 threads. It
>> works without any error. So, we are seeing this error only when we use
>> BoneCP Phoenix Connection pool in Storm Cluster. This is much closer to the
>> this email thread. We have the same use case as mentioned. The versions we
>> are using:
>>
>> phoenix-3.0.0-incubating-incubating.jar:3.0.0-incubating
>> bonecp-0.8.0.RELEASE.jar:na
>> storm-core-0.9.1-incubating.jar:0.9.1-incubating
>>
>> The solution James Taylor mentioned is to remove that iterator.remove()
>> from the loop. I don't think we should re-build entire Phoenix jar for that
>> change. Is there any other solution to run past this issue?
>>
>> -Nandanavanam Karthik
>>
>> On Thursday, August 1, 2013 7:43:31 PM UTC-6, James Taylor wrote:
>>>
>>> Try removing the iterator.remove call at com.salesforce.phoenix.
>>> execute.MutationState.commit(MutationState.java:232). Instead just
>>> clear the mutations Map at the end of the loop. It's not particularly
>>> important that an entry is remove as you're submitting the batch to the
>>> HBase server - it just is meant to provide a way for the caller to know
>>> (based on the state in the exception) what was successfully committed and
>>> what wasn't.
>>>
>>> Also, if you can create a standalone test that demonstrates this issue,
>>> we can get to the root cause and provide you a fix.
>>>
>>> Thanks,
>>> James
>>>
>>>
>>> On Thu, Aug 1, 2013 at 6:05 PM, 张晓燕 <[email protected]> wrote:
>>>
>>>> hi, We use phoenix in the bolts of storm. Get a lot data from kafka,
>>>> then handle it, then update to hbase with phoenix. New a connnection in a
>>>> bolt, select and upsert one table in evey 10seconds.Run this in storm
>>>> cluster, the problem "com.salesforce.phoenix.execute.CommitException:
>>>> java.util.ConcurrentModificationException" comes up.
>>>>
>>>> 在 2013年7月29日星期一UTC+8下午11时47分29秒,James Taylor写道:
>>>>>
>>>>> Looks like it might be a bug - can you put together a small unit test
>>>>> that consistently repros?
>>>>>
>>>>> Thanks,
>>>>> James
>>>>>
>>>>>
>>>>> On Jul 29, 2013, at 5:02 AM, panfei <[email protected]> wrote:
>>>>>
>>>>> com.salesforce.phoenix.execute.CommitException: java.util.
>>>>> ConcurrentModificationException
>>>>>         at com.salesforce.phoenix.execute.MutationState.commit(
>>>>> MutationState.java:236)
>>>>>         at com.salesforce.phoenix.jdbc.PhoenixConnection.commit(
>>>>> PhoenixConnection.java:244)
>>>>>         at com.salesforce.phoenix.jdbc.PhoenixStatement.
>>>>> executeMutation(PhoenixStatement.java:180)
>>>>>         at com.salesforce.phoenix.jdbc.PhoenixStatement.access$600(
>>>>> PhoenixStatement.java:73)
>>>>>         at com.salesforce.phoenix.jdbc.PhoenixStatement$
>>>>> ExecutableUpsertStatement.executeUpdate(PhoenixStatement.java:210)
>>>>>         at com.salesforce.phoenix.jdbc.PhoenixStatement.executeUpdate(
>>>>> PhoenixStatement.java:741)
>>>>>         at com.cy.kafka.hbase.PhoenixHBaseHelper.ExcuteNonQuery(
>>>>> PhoenixHBaseHelper.java:39)
>>>>>         at com.cy.storm.bolt.dau.task.DauTask.saveData(DauTask.java:
>>>>> 138)
>>>>>         at com.cy.storm.bolt.dau.task.DauTask.run(DauTask.java:34)
>>>>>         at java.util.TimerThread.mainLoop(Timer.java:512)
>>>>>         at java.util.TimerThread.run(Timer.java:462)
>>>>> Caused by: java.util.ConcurrentModificationException
>>>>>         at java.util.HashMap$HashIterator.remove(HashMap.java:811)
>>>>>         at com.salesforce.phoenix.execute.MutationState.commit(
>>>>> MutationState.java:232)
>>>>>         ... 10 more
>>>>> com.salesforce.phoenix.execute.CommitException: java.util.
>>>>> ConcurrentModificationException
>>>>>         at com.salesforce.phoenix.execute.MutationState.commit(
>>>>> MutationState.java:236)
>>>>>         at com.salesforce.phoenix.jdbc.PhoenixConnection.commit(
>>>>> PhoenixConnection.java:244)
>>>>>         at com.salesforce.phoenix.jdbc.PhoenixStatement.
>>>>> executeMutation(PhoenixStatement.java:180)
>>>>>         at com.salesforce.phoenix.jdbc.PhoenixStatement.access$600(
>>>>> PhoenixStatement.java:73)
>>>>>         at com.salesforce.phoenix.jdbc.PhoenixStatement$
>>>>> ExecutableUpsertStatement.executeUpdate(PhoenixStatement.java:210)
>>>>>         at com.salesforce.phoenix.jdbc.PhoenixStatement.executeUpdate(
>>>>> PhoenixStatement.java:741)
>>>>>         at com.cy.kafka.hbase.PhoenixHBaseHelper.ExcuteNonQuery(
>>>>> PhoenixHBaseHelper.java:39)
>>>>>         at com.cy.storm.bolt.dau.DauFilterBolt.filterDAUTuple(
>>>>> DauFilterBolt.java:105)
>>>>>         at com.cy.storm.bolt.dau.DauFilterBolt.execute(DauFilterBolt.
>>>>> java:59)
>>>>>         at backtype.storm.topology.BasicBoltExecutor.execute(
>>>>> BasicBoltExecutor.java:32)
>>>>>         at backtype.storm.daemon.executor$fn__4050$tuple_action_fn__
>>>>> 4052.invoke(executor.clj:566)
>>>>>         at backtype.storm.daemon.executor$mk_task_receiver$fn__3976.
>>>>> invoke(executor.clj:345)
>>>>>         at backtype.storm.disruptor$clojure_handler$reify__1606.
>>>>> onEvent(disruptor.clj:43)
>>>>>         at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(
>>>>> DisruptorQueue.java:84)
>>>>>         at backtype.storm.utils.DisruptorQueue.
>>>>> consumeBatchWhenAvailable(DisruptorQueue.java:58)
>>>>>         at backtype.storm.disruptor$consume_batch_when_available.
>>>>> invoke(disruptor.clj:62)
>>>>>         at backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.
>>>>> invoke(executor.clj:658)
>>>>>         at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
>>>>>         at clojure.lang.AFn.run(AFn.java:24)
>>>>>         at java.lang.Thread.run(Thread.java:662)
>>>>> Caused by: java.util.ConcurrentModificationException
>>>>>         at java.util.HashMap$HashIterator.remove(HashMap.java:811)
>>>>>         at com.salesforce.phoenix.execute.MutationState.commit(
>>>>> MutationState.java:
>>>>>
>>>>>
>>>>> --
>>>>> 不学习,不知道
>>>>>
>>>>> --
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Phoenix HBase User" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to [email protected].
>>>>> For more options, visit https://groups.google.com/groups/opt_out.
>>>>>
>>>>>
>>>>>
>>>>>  --
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Phoenix HBase User" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to [email protected].
>>>> For more options, visit https://groups.google.com/groups/opt_out.
>>>>
>>>>
>>>>
>>>
>>>

Reply via email to