[MINOR] Set task number of each bolt in HdfsAuditLog Application If not set, the task number of all bolts are 8
Author: Zhao, Qingwen <qingwz...@apache.org> Closes #521 from qingwen220/quickFix. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3d6a29ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3d6a29ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3d6a29ec Branch: refs/heads/master Commit: 3d6a29ec2e67a83742158a08d378cfc5cad59814 Parents: 4734967 Author: Zhao, Qingwen <qingwz...@apache.org> Authored: Mon Oct 17 22:45:19 2016 +0800 Committer: Zhao, Qingwen <qingwz...@apache.org> Committed: Mon Oct 17 22:45:19 2016 +0800 ---------------------------------------------------------------------- .../auditlog/AbstractHdfsAuditLogApplication.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3d6a29ec/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java index b985daf..b9f480b 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java @@ -59,11 +59,11 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication { int numOfIPZoneJoinTasks = config.getInt(IPZONE_JOIN_TASK_NUM); int numOfSinkTasks = config.getInt(SINK_TASK_NUM); - builder.setSpout("ingest", spout, numOfSpoutTasks); + builder.setSpout("ingest", spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); BaseRichBolt parserBolt = getParserBolt(); - BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks); + BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks).setNumTasks(numOfParserTasks); Boolean useDefaultPartition = !config.hasPath("eagleProps.useDefaultPartition") || config.getBoolean("eagleProps.useDefaultPartition"); if(useDefaultPartition){ @@ -73,15 +73,15 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication { } HdfsSensitivityDataEnrichBolt sensitivityDataJoinBolt = new HdfsSensitivityDataEnrichBolt(config); - BoltDeclarer sensitivityDataJoinBoltDeclarer = builder.setBolt("sensitivityJoin", sensitivityDataJoinBolt, numOfSensitivityJoinTasks); + BoltDeclarer sensitivityDataJoinBoltDeclarer = builder.setBolt("sensitivityJoin", sensitivityDataJoinBolt, numOfSensitivityJoinTasks).setNumTasks(numOfSensitivityJoinTasks); sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1")); IPZoneDataEnrichBolt ipZoneDataJoinBolt = new IPZoneDataEnrichBolt(config); - BoltDeclarer ipZoneDataJoinBoltDeclarer = builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt, numOfIPZoneJoinTasks); + BoltDeclarer ipZoneDataJoinBoltDeclarer = builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt, numOfIPZoneJoinTasks).setNumTasks(numOfIPZoneJoinTasks); ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new Fields("user")); StormStreamSink sinkBolt = environment.getStreamSink("hdfs_audit_log_stream",config); - BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks); + BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks); kafkaBoltDeclarer.fieldsGrouping("ipZoneJoin", new Fields("user")); return builder.createTopology();