Hi, SpoutConfig extends KafkaConfig which has public field fetchSizeBytes , fetchMaxWait and so on,
you can directly set value in code > 在 2016年3月22日,19:48,sujitha chinnu <[email protected]> 写道: > > hai., > > I am getting the message stream of 3MB from kafka topic but the > default value is 1MB. Now I have changed the kafka properties from 1MB to 3MB > by adding the below lines in kafa consumer.properties and server.properties > file. > fetch.message.max.bytes=2048576 ( consumer.properties ) > filemessage.max.bytes=2048576 ( server.properties ) > replica.fetch.max.bytes=2048576 ( server.properties )Now after adding the > above lines in Kafka, 3MB message data is going into kafka data logs. But > STORM is unable to process that 3MB data and it is able to read only default > size i.e.,1MB data. So how to change those configurations inorder to > process/read the 3MB data. Here is my topology class. String argument > = args[0]; > Config conf = new Config(); > conf.put(JDBC_CONF, map); > conf.setDebug(true); > conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); > //set the number of workers > conf.setNumWorkers(3); > > TopologyBuilder builder = new TopologyBuilder(); //Setup Kafka > spout > BrokerHosts hosts = new ZkHosts("localhost:2181"); > String topic = "year1234"; > String zkRoot = ""; > String consumerGroupId = "group1"; > SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, > consumerGroupId); > > spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); > builder.setSpout("KafkaSpout", kafkaSpout,1); > builder.setBolt("user_details", new > Parserspout(),1).shuffleGrouping("KafkaSpout"); > builder.setBolt("bolts_user", new > bolts_user(cp),1).shuffleGrouping("user_details");
