Hi,

My name is Paul. Previously I was using storm 0.10.0 and now have to upgrade to 
the latest 2.2.0.
I am using flux. But it isn't working. This is my config:


name: "AuditLogConsumerTopology"

components:

  - id: "kafkaConfigBuilder"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
    constructorArgs:
      # bootstrap servers
      - "localhost:9092"
      # topics
      - ["topic"]

  #Spout Config
  - id: "SpoutConfig"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
    constructorArgs:
      # brokerHosts
      - ref: "kafkaConfigBuilder"
    properties:
      - name: "group.id"
        value: "test-group"
      - name: "key.deserializer"
        value: "org.apache.kafka.common.serialization.StringDeserializer"
      - name: "value.deserializer"
        value: "org.apache.kafka.common.serialization.StringDeserializer"


config:
  topology.workers: 1
  topology.max.spout.pending: 1000
  topology.acker.executors: 1
  topology.executor.send.buffer.size: 16384
  topology.executor.receive.buffer.size: 16384
  topology.transfer.buffer.size: 32
  zookeeperHost: "localhost"
  zookeeperBasePath: "platformaudit"
  kafka.broker.properties:
    #bootstrap.servers: "localhost:9092"
    metadata.broker.list: "localhost:9092"
    request.required.acks: "1"
  topic: "topic"


spouts:
  - id: "TestSpout"
    className: "org.apache.storm.kafka.spout.KafkaSpout"
    parallelism: 1
    constructorArgs:
      - ref: "SpoutConfig"


bolts:
  - id: "TestBolt"
    className: "auditlog.TestBolt"
    parallelism: 1
    constructorArgs:
        #index
      - "auditlog"
        #type
      - "_doc"
        #hostname
      - localhost
        #port
      - 9200


streams:
  - name: "TestSpout --> TestBolt"
    from: "TesstSpout"
    to: "TestBolt"
    grouping:
      type: SHUFFLE


My changing the configuration by referring to the actual storm consumer 2.2.0 
code I was able to make this yaml flux file.


However I am unable to specify any properties from this yaml file for the 
consumer.

I tried giving "group.id" and it gives an errror:

Exception in thread "main" java.lang.NoSuchFieldException: groupId

at java.lang.Class.getField(Class.java:1703)
at org.apache.storm.flux.FluxBuilder.findPublicField(FluxBuilder.java:298)
at org.apache.storm.flux.FluxBuilder.applyProperties(FluxBuilder.java:288)
at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:390)
at org.apache.storm.flux.FluxBuilder.buildComponents(FluxBuilder.java:428)
at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:102)
at org.apache.storm.flux.Flux.runCli(Flux.java:174)
at org.apache.storm.flux.Flux.main(Flux.java:119)

Without specifying properties the topology does get submitted but it throws an 
error in the spout saying:


java.lang.RuntimeException: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "key.deserializer" which has no default value. 
at org.apache.storm.utils.Utils$1.run(Utils.java:409) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.kafka.common.config.ConfigException: Missing required configuration 
"key.deserializer" which has no default value. at 
org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478) at 
org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468) at 
org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) 
at 
org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) 
at 
org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:607)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613) 
at 
org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault.createConsumer(ConsumerFactoryDefault.java:26)
 at 
org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault.createConsumer(ConsumerFactoryDefault.java:22)
 at org.apache.storm.kafka.spout.KafkaSpout.open(KafkaSpout.java:147) at 
org.apache.storm.executor.spout.SpoutExecutor.init(SpoutExecutor.java:149) at 
org.apache.storm.executor.spout.SpoutExecutor.call(SpoutExecutor.java:159) at 
org.apache.storm.executor.spout.SpoutExecutor.call(SpoutExecutor.java:56) at 
org.apache.storm.utils.Utils$1.run(Utils.java:389) ... 1 more

I went through a bit of the source code and found this. The properties aren't 
there in the "org.apache.storm.kafka.spout.KafkaSpoutConfig" file but in the 
"ConsumerConfig" file. So when flux tries to get the field name given in the 
properties file it fails.

Previously in version 0.10.0 the configuration fields were available in the 
"storm.kafka.SpoutConfig" file itself.


Can I please get some help on how to resolve this issue? The flux kafka spout 
example at "https://storm.apache.org/releases/2.2.0/flux.html"; is also wrong. I 
am not sure if it has been updated.


Best Regards,

Paul Jose
---------------------------------------------------------------------------------------Disclaimer----------------------------------------------------------------------------------------------
 

****Views and opinions expressed in this e-mail belong to  their author and do 
not necessarily represent views and opinions  of Ugam. 
Our employees are obliged not to make any defamatory statement or infringe any 
legal right. 
Therefore, Ugam does not accept any responsibility or liability for such 
statements. The content of this email is confidential and intended for the 
recipient specified in message only. It is strictly forbidden to share any part 
of this message with any third party, without a written consent of the sender.
If you have received this message by mistake, please reply to this message and 
follow with its deletion, so that we can ensure such a mistake does not occur 
in the future. 
Warning: Sufficient measures have been taken to scan any presence of viruses 
however the recipient should check this email and any attachments for the 
presence of viruses as full security of the email cannot be ensured despite our 
best efforts.
Therefore, Ugam accepts no liability for any damage inflicted by viewing the 
content of this email.. ****

Please do not print this email unless it is necessary. Every unprinted email 
helps the environment. 

Reply via email to