Sorry for the dumb question, the problem is the spout. I have to use
like FixedBatchSpout which implements IBatchSpout, doh.
On 8/24/20 7:11 PM, Sergio Ryan wrote:
Hi,
I am trying to create a simple Trident topology to save tuples to
HDFS. Here is my code:
public class Topology {
public static void main(String []args) {new Topology().run(args); }
private void run(String []args) {
FileNameFormat fileNameFormat =new DefaultFileNameFormat()
.withPath("/test")
.withPrefix("sk")
.withExtension(".txt"); RecordFormat recordFormat =new
DelimitedRecordFormat()
// .withFieldDelimiter("\t") .withFields(new Fields("word"));
FileRotationPolicy rotationPolicy =new FileSizeRotationPolicy(5.0f,
FileSizeRotationPolicy.Units.MB); HdfsState.Options options =new HdfsState.HdfsFileOptions()
.withFileNameFormat(fileNameFormat)
.withRecordFormat(recordFormat)
.withRotationPolicy(rotationPolicy)
.withFsUrl("hdfs://192.168.0.10:9000"); HdfsStateFactory stateFactory =new
HdfsStateFactory().withOptions(options); Config conf =new Config(); TridentTopology topology =new
TridentTopology(); TestWordSpout testWordSpout =new TestWordSpout(); Stream stream =
topology.newStream("teststream", testWordSpout); stream.partitionPersist(stateFactory, new
Fields("word"), new HdfsUpdater(), new Fields()); try {
LocalCluster localCluster =new LocalCluster();
localCluster.submitTopology("test_topology", conf, topology.build());
//StormSubmitter.submitTopology(args[0], conf, topology.build()); }catch (Exception e) {
System.err.println(e.getMessage()); }
}
}
I use the TestWordSpout to emit some random words. I can start the
topology with no problem, and the topology seem to be able to connect
to HDFS without any problems. It does create a file in /test
directory, but does not write any content to it. There are no error
messages in the log. I created TestHdfsUpdater, which extends
HdfsUpdater, which basically just prints some text everytime
updateState method is called, but it seems that the method is not even
called. Can somebody help me with this?
Thank you!