Hi all,
My storm project is for calculating pv and uv . So i write two classes .
class UidCountState implements java.io.Serializable
private Set<String> dimens = new ConcurrentSkipListSet<String>();
private long count = 0;
The ConcurrentSkipListSet is for the uv ,and count for pv.
class RedisState implements IBackingMap<UidCountState>
This class is to store UidCountState into Redis.
Because i need to put UidCountState into Redis, so in UidCountState i have
two methods serialize and deserialize. The serialize method put every fields
of UidCountState into a byte[] , of course it needs iterate the Set.
When multiPut, serialize UidCountState and put the bytes to redis.
When multiGet, get the bytes and deserialize it.
At the beginning,the set in UidCountState is HasSet, and storm throws:
java.lang.RuntimeException: java.util.ConcurrentModificationException
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
at
backtype.storm.disruptor$consume_loop_STAR_$fn__1619.invoke(disruptor.clj:73)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextEntry(HashMap.java:793)
at java.util.HashMap$KeyIterator.next(HashMap.java:828)
at java.util.HashSet.writeObject(HashSet.java:267)
at sun.reflect.GeneratedMethodAccessor30.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:940)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1469)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
at
backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)
at
backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)
at
backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27)
at
backtype.storm.daemon.worker$mk_transfer_fn$fn__4152$fn__4156.invoke(worker.clj:99)
at backtype.storm.util$fast_list_map.invoke(util.clj:771)
at
backtype.storm.daemon.worker$mk_transfer_fn$fn__4152.invoke(worker.clj:99)
at
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3914.invoke(executor.clj:238)
at
backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
... 6 more
I thought it was because the HashSet is not thread-safe,so i changed it to
ConcurrentSkipListSet. But the exception still exists.
Need i custom serialize the UidCountState ?
--
Best regards,
Ivy Tang