Hi, All

I am using TridentKafkaConfig to consume data, I was not setting

spoutConf.forceFromStart = true;

so I am consuming the data from the latest time I assume, and it works.
However, if I
set spoutConf.forceFromStart = true; spout just consumes nothing, I thought
it should consume the data from the beginning of the Kafka stream.

Here is the code:

  BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
//              TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
"PofApiTest");
                TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
"topictest");
                spoutConf.scheme = new SchemeAsMultiScheme(new
StringScheme());

                spoutConf.fetchSizeBytes = 50*1024*1024;
                spoutConf.bufferSizeBytes = 50*1024*1024;
//                spoutConf.startOffsetTime =
kafka.api.OffsetRequest.EarliestTime();
//                spoutConf.startOffsetTime =
kafka.api.OffsetRequest.LatestTime();
//                spoutConf.socketTimeoutMs = 1000;
//                spoutConf.fetchMaxWait = 1000;
               spoutConf.forceFromStart = true;
//                spoutConf.maxOffsetBehind = Long.MAX_VALUE;
//                spoutConf.useStartOffsetTimeIfOffsetOutOfRange = true;
//                spoutConf.metricsTimeBucketSizeInSecs = 600;

                OpaqueTridentKafkaSpout kafkaSpout = new
OpaqueTridentKafkaSpout(spoutConf);
//                TransactionalTridentKafkaSpout kafkaSpout = new
TransactionalTridentKafkaSpout(spoutConf);

                TridentTopology topology = new TridentTopology();


 topology.newStream("topictestspout", kafkaSpout)
//                  topology.newStream("test", new RandomTupleSpout())
   // this test tells the kafkaSpout has the overhead to cause the latency
                                      .parallelismHint(4)
//                                      .shuffle()
//                                      .each(new Fields("batchid","word"),
                                      .each(new Fields("str"),
                                            new JsonObjectParse(),
                                            new Fields("userid","event"))
                                      .groupBy(new Fields("userid"))

.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
//                                      .parallelismHint(6);


Can anyone tell me why I can't consume the data from beginning?

thanks


Alec

Reply via email to