Hi all,

    My storm project is for calculating pv and uv . So i write two classes  .

    class UidCountState implements java.io.Serializable 
       private Set<String> dimens = new ConcurrentSkipListSet<String>();
       private long count = 0;

   The ConcurrentSkipListSet is for the uv ,and count for pv.



   class RedisState implements IBackingMap<UidCountState>
   This class is to store UidCountState into Redis.

    Because i need to put UidCountState  into Redis, so in UidCountState i have 
two methods serialize and deserialize.  The serialize method put every fields 
of UidCountState into a byte[] , of course it needs iterate the Set.
    When multiPut, serialize  UidCountState and put the bytes to redis.
    When multiGet, get the bytes and deserialize it.

    At the beginning,the set in UidCountState is HasSet, and storm throws:

    java.lang.RuntimeException: java.util.ConcurrentModificationException
        at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
        at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
        at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
        at 
backtype.storm.disruptor$consume_loop_STAR_$fn__1619.invoke(disruptor.clj:73)
        at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
        at clojure.lang.AFn.run(AFn.java:24)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.util.ConcurrentModificationException
        at java.util.HashMap$HashIterator.nextEntry(HashMap.java:793)
        at java.util.HashMap$KeyIterator.next(HashMap.java:828)
        at java.util.HashSet.writeObject(HashSet.java:267)
        at sun.reflect.GeneratedMethodAccessor30.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:940)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1469)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
        at 
backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)
        at 
backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)
        at 
backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27)
        at 
backtype.storm.daemon.worker$mk_transfer_fn$fn__4152$fn__4156.invoke(worker.clj:99)
        at backtype.storm.util$fast_list_map.invoke(util.clj:771)
        at 
backtype.storm.daemon.worker$mk_transfer_fn$fn__4152.invoke(worker.clj:99)
        at 
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3914.invoke(executor.clj:238)
        at 
backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
        at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
        ... 6 more

    I thought it was because  the HashSet is not thread-safe,so i changed it to 
ConcurrentSkipListSet. But the exception still exists.

    Need i custom serialize the UidCountState ?
     
-- 
Best regards,

Ivy Tang

Reply via email to