    //HiveBolt
    val mapper: DelimitedRecordHiveMapper =
      new DelimitedRecordHiveMapper()
        .withColumnFields(new Fields(colNames))
        .withTimeAsPartitionField("YYYY/MM/DD")
    val hiveOptions: HiveOptions =
      new HiveOptions(metastore, dbName, tblName, mapper)
        .withTxnsPerBatch(10)
        .withBatchSize(1000)
        .withIdleTimeout(10)
     
    //KafkaSpout
    val spoutConf = new TridentKafkaConfig(zkHosts_2, "air_traffic")
    spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme())
    val kafkaSpout = new OpaqueTridentKafkaSpout(spoutConf)
     
    //Topology
    val topology: TridentTopology = new TridentTopology
    val factory: StateFactory = new HiveStateFactory().withOptions(hiveOptions)
    val stream: trident.Stream = topology.newStream("jsonEmitter", kafkaSpout)
                                  .each(new Fields("str"), new ParseJSON , new Fields(colNames))
     
    stream.partitionPersist(factory, new Fields(colNames), new HiveUpdater(), new Fields()).parallelismHint(8)