Hi,
I am trying to create a simple Trident topology to save tuples to HDFS.
Here is my code:
public class Topology {
public static void main(String []args) {new Topology().run(args); }
private void run(String []args) {
FileNameFormat fileNameFormat =new DefaultFileNameFormat()
.withPath("/test")
.withPrefix("sk")
.withExtension(".txt"); RecordFormat recordFormat =new
DelimitedRecordFormat()
// .withFieldDelimiter("\t") .withFields(new Fields("word"));
FileRotationPolicy rotationPolicy =new FileSizeRotationPolicy(5.0f,
FileSizeRotationPolicy.Units.MB); HdfsState.Options options =new HdfsState.HdfsFileOptions()
.withFileNameFormat(fileNameFormat)
.withRecordFormat(recordFormat)
.withRotationPolicy(rotationPolicy)
.withFsUrl("hdfs://192.168.0.10:9000"); HdfsStateFactory stateFactory =new
HdfsStateFactory().withOptions(options); Config conf =new Config(); TridentTopology topology =new
TridentTopology(); TestWordSpout testWordSpout =new TestWordSpout(); Stream stream =
topology.newStream("teststream", testWordSpout); stream.partitionPersist(stateFactory, new
Fields("word"), new HdfsUpdater(), new Fields()); try {
LocalCluster localCluster =new LocalCluster();
localCluster.submitTopology("test_topology", conf, topology.build());
//StormSubmitter.submitTopology(args[0], conf, topology.build()); }catch (Exception e) {
System.err.println(e.getMessage()); }
}
}
I use the TestWordSpout to emit some random words. I can start the
topology with no problem, and the topology seem to be able to connect to
HDFS without any problems. It does create a file in /test directory, but
does not write any content to it. There are no error messages in the
log. I created TestHdfsUpdater, which extends HdfsUpdater, which
basically just prints some text everytime updateState method is called,
but it seems that the method is not even called. Can somebody help me
with this?
Thank you!