Sorry for the dumb question, the problem is the spout. I have to use like FixedBatchSpout which implements IBatchSpout, doh.

On 8/24/20 7:11 PM, Sergio Ryan wrote:

Hi,

I am trying to create a simple Trident topology to save tuples to HDFS. Here is my code:

public class Topology {
     public static void main(String []args) {new Topology().run(args); }

     private void run(String []args) {
         FileNameFormat fileNameFormat =new DefaultFileNameFormat()
                 .withPath("/test")
                 .withPrefix("sk")
                 .withExtension(".txt"); RecordFormat recordFormat =new 
DelimitedRecordFormat()
// .withFieldDelimiter("\t") .withFields(new Fields("word")); 
FileRotationPolicy rotationPolicy =new FileSizeRotationPolicy(5.0f, 
FileSizeRotationPolicy.Units.MB); HdfsState.Options options =new HdfsState.HdfsFileOptions()
                 .withFileNameFormat(fileNameFormat)
                 .withRecordFormat(recordFormat)
                 .withRotationPolicy(rotationPolicy)
                 .withFsUrl("hdfs://192.168.0.10:9000"); HdfsStateFactory stateFactory =new 
HdfsStateFactory().withOptions(options); Config conf =new Config(); TridentTopology topology =new 
TridentTopology(); TestWordSpout testWordSpout =new TestWordSpout(); Stream stream = 
topology.newStream("teststream", testWordSpout); stream.partitionPersist(stateFactory, new 
Fields("word"), new HdfsUpdater(), new Fields()); try {
             LocalCluster localCluster =new LocalCluster(); 
localCluster.submitTopology("test_topology", conf, topology.build()); 
//StormSubmitter.submitTopology(args[0], conf, topology.build()); }catch (Exception e) {
             System.err.println(e.getMessage()); }
     }
}

I use the TestWordSpout to emit some random words. I can start the topology with no problem, and the topology seem to be able to connect to HDFS without any problems. It does create a file in /test directory, but does not write any content to it. There are no error messages in the log. I created TestHdfsUpdater, which extends HdfsUpdater, which basically just prints some text everytime updateState method is called, but it seems that the method is not even called. Can somebody help me with this?

Thank you!

Reply via email to