Hi,
My name is Paul. Previously I was using storm 0.10.0 and now have to upgrade to
the latest 2.2.0.
I am using flux. But it isn't working. This is my config:
name: "AuditLogConsumerTopology"
components:
- id: "kafkaConfigBuilder"
className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
constructorArgs:
# bootstrap servers
- "localhost:9092"
# topics
- ["topic"]
#Spout Config
- id: "SpoutConfig"
className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
constructorArgs:
# brokerHosts
- ref: "kafkaConfigBuilder"
properties:
- name: "group.id"
value: "test-group"
- name: "key.deserializer"
value: "org.apache.kafka.common.serialization.StringDeserializer"
- name: "value.deserializer"
value: "org.apache.kafka.common.serialization.StringDeserializer"
config:
topology.workers: 1
topology.max.spout.pending: 1000
topology.acker.executors: 1
topology.executor.send.buffer.size: 16384
topology.executor.receive.buffer.size: 16384
topology.transfer.buffer.size: 32
zookeeperHost: "localhost"
zookeeperBasePath: "platformaudit"
kafka.broker.properties:
#bootstrap.servers: "localhost:9092"
metadata.broker.list: "localhost:9092"
request.required.acks: "1"
topic: "topic"
spouts:
- id: "TestSpout"
className: "org.apache.storm.kafka.spout.KafkaSpout"
parallelism: 1
constructorArgs:
- ref: "SpoutConfig"
bolts:
- id: "TestBolt"
className: "auditlog.TestBolt"
parallelism: 1
constructorArgs:
#index
- "auditlog"
#type
- "_doc"
#hostname
- localhost
#port
- 9200
streams:
- name: "TestSpout --> TestBolt"
from: "TesstSpout"
to: "TestBolt"
grouping:
type: SHUFFLE
My changing the configuration by referring to the actual storm consumer 2.2.0
code I was able to make this yaml flux file.
However I am unable to specify any properties from this yaml file for the
consumer.
I tried giving "group.id" and it gives an errror:
Exception in thread "main" java.lang.NoSuchFieldException: groupId
at java.lang.Class.getField(Class.java:1703)
at org.apache.storm.flux.FluxBuilder.findPublicField(FluxBuilder.java:298)
at org.apache.storm.flux.FluxBuilder.applyProperties(FluxBuilder.java:288)
at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:390)
at org.apache.storm.flux.FluxBuilder.buildComponents(FluxBuilder.java:428)
at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:102)
at org.apache.storm.flux.Flux.runCli(Flux.java:174)
at org.apache.storm.flux.Flux.main(Flux.java:119)
Without specifying properties the topology does get submitted but it throws an
error in the spout saying:
java.lang.RuntimeException: org.apache.kafka.common.config.ConfigException:
Missing required configuration "key.deserializer" which has no default value.
at org.apache.storm.utils.Utils$1.run(Utils.java:409) at
java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.kafka.common.config.ConfigException: Missing required configuration
"key.deserializer" which has no default value. at
org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478) at
org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468) at
org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
at
org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
at
org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:607)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
at
org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault.createConsumer(ConsumerFactoryDefault.java:26)
at
org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault.createConsumer(ConsumerFactoryDefault.java:22)
at org.apache.storm.kafka.spout.KafkaSpout.open(KafkaSpout.java:147) at
org.apache.storm.executor.spout.SpoutExecutor.init(SpoutExecutor.java:149) at
org.apache.storm.executor.spout.SpoutExecutor.call(SpoutExecutor.java:159) at
org.apache.storm.executor.spout.SpoutExecutor.call(SpoutExecutor.java:56) at
org.apache.storm.utils.Utils$1.run(Utils.java:389) ... 1 more
I went through a bit of the source code and found this. The properties aren't
there in the "org.apache.storm.kafka.spout.KafkaSpoutConfig" file but in the
"ConsumerConfig" file. So when flux tries to get the field name given in the
properties file it fails.
Previously in version 0.10.0 the configuration fields were available in the
"storm.kafka.SpoutConfig" file itself.
Can I please get some help on how to resolve this issue? The flux kafka spout
example at "https://storm.apache.org/releases/2.2.0/flux.html" is also wrong. I
am not sure if it has been updated.
Best Regards,
Paul Jose
---------------------------------------------------------------------------------------Disclaimer----------------------------------------------------------------------------------------------
****Views and opinions expressed in this e-mail belong to their author and do
not necessarily represent views and opinions of Ugam.
Our employees are obliged not to make any defamatory statement or infringe any
legal right.
Therefore, Ugam does not accept any responsibility or liability for such
statements. The content of this email is confidential and intended for the
recipient specified in message only. It is strictly forbidden to share any part
of this message with any third party, without a written consent of the sender.
If you have received this message by mistake, please reply to this message and
follow with its deletion, so that we can ensure such a mistake does not occur
in the future.
Warning: Sufficient measures have been taken to scan any presence of viruses
however the recipient should check this email and any attachments for the
presence of viruses as full security of the email cannot be ensured despite our
best efforts.
Therefore, Ugam accepts no liability for any damage inflicted by viewing the
content of this email.. ****
Please do not print this email unless it is necessary. Every unprinted email
helps the environment.