HI ,Nathan and thank you for responding ,I appreciate it !
no I'm not , I just run this topology for the first time
public class Ex {
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector
collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
TryRead T = new TryRead();
PrinterBolty P = new PrinterBolty();
builder.setSpout("word", T, 10);
builder.setBolt("exclaim1", new ExclamationBolt(),
3).shuffleGrouping("word");
builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
-------------------------------------------------------------------------------------
my spout Tryread
public class TryRead extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
BufferedReader fileReader;
FileSystem f;
WatchService watcher ;
String S;
Path dir;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
try{
f = FileSystems.getDefault();
watcher = f.newWatchService();
S="/root/src/storm-starter/src/jvm/storm/starter";
dir = f.getPath(S);
dir.register(watcher, ENTRY_CREATE);
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void nextTuple() {
Utils.sleep(2000);
for (;;) {
// wait for key to be signaled
WatchKey key;
try {
key = watcher.take();
} catch (InterruptedException x) {
return;
}
for (WatchEvent<?> event: key.pollEvents()) {
WatchEvent.Kind kind = event.kind();
if (kind == OVERFLOW) {
continue;
}
WatchEvent<Path> ev = (WatchEvent<Path>)event;
Path filename = ev.context();
System.out.format("Emailing file %s%n", filename);
try {
fileReader = new
BufferedReader(new FileReader(new File(S+"/"+filename)));
RandomAccessFile access =
null;
String line = null;
try
{
while ((line = fileReader.readLine()) !=
null)
{
if (line !=null)
{
_collector.emit(new
Values(line));
}
}
} catch (IOException e) {
// TODO Auto-generated
catch block
e.printStackTrace();
}
} catch (FileNotFoundException e) {
// TODO Auto-generated
catch block
e.printStackTrace();
}
}
}
boolean valid = key.reset();
if (!valid) {
break;
}
}
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
2014-05-06 12:35 GMT+01:00 Nathan Leung <[email protected]>:
> You are creating your file writer with append set to true. It's it
> possible your topology was run more than once?
> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <[email protected]>
> wrote:
>
>> I'm using a bolt that receives tuples from another bolt (exclamation bolt
>> ) and writes it on a file , the problem I got is that I have duplicated
>> results four times , like when I emit a word , I found the word Written four
>> times . where's the problem possibly could be ?
>>
>>
>>
>>
>> public class PrinterBolty extends BaseBasicBolt {
>>
>> @Override
>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>
>> try {
>>
>> BufferedWriter output;
>> output = new BufferedWriter(new
>> FileWriter("/root/src/storm-starter/hh.txt", true));
>> output.newLine();
>> output.append(tuple.getString(0));
>> output.close();
>>
>> } catch (IOException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>>
>> @Override
>> public void declareOutputFields(OutputFieldsDeclarer ofd) {
>> }
>>
>> }
>>
>>
>> --
>> *Al Fartakh Bilal*
>>
>
--
*Al Fartakh Bilal*