Hi Everyone,
Sorry, fingers slipped on the keyboard and the message was sent unfinished.
I'm a setting up a Kafka Spout in Clojure like that, and using it in a
Trident topology:
(defn create-scheme []
(reify backtype.storm.spout.MultiScheme
(deserialize [this bytes]
(avroclj.avro/deserialize bytes)))
(getOutputFields [this]
(Fields. ["a" "b" "c"]))))
(set! (.scheme spout-config) (create-scheme))
(TransactionalTridentKafkaSpout. spout-config)))
What I get in Storm's worker logs is this: RuntimeException:
java.lang.IllegalStateException: Attempting to call unbound fn:
#'avroclj.avro/deserialize
I am not an expert on the topic of ClassLoaders, but my guess is that the
thread which calls the deserialize method on the reified object has a
different context ClassLoader than the one used when (create-scheme) was
called.
So I found a workaround for this: I call (require 'myns.myschemenamespace)
at the beginning of the deserialize method like that:
(deserialize [this bytes]
(do
(require 'myns.myschemenamespace)
(avroclj.avro/deserialize bytes))))
Of course it's a hack..
My questions are:
1. Is this a known problem?
2. Or am I doing it just plain wrong?
3. What's the proper way I should be attempting to do it?
Thanks,
Michael