I'm not really familiar with Storm lifecycle. I'm writing a Storm bolt to
write file to S3 when the buffer is around 10MB. Here's what I've come up
with. I just want to make sure that if I instantiate the buffer like that
it would be ok.
public class S3Appender implements Serializable {
/*
* This is about 10 Megabytes
*/
private static final long BUFFER_SIZE = 1024 * 1000;
private final AmazonS3Client s3Client;
private Map config;
private Clock currentClock;
private StringBuilder buffer = new StringBuilder();
public S3Appender(AmazonS3Client s3Client, Map config, Clock
currentClock) {
this.s3Client = s3Client;
this.config = config;
this.currentClock = currentClock;
}
public boolean append(String status) {
buffer.append(status);
buffer.append(System.getProperty("line.separator"));
if (buffer.length() >= BUFFER_SIZE) {
try {
LocalDateTime now = currentClock.getLocalDateTime();
int year = now.getYear();
int month = now.getMonthOfYear();
int day = now.getDayOfMonth();
long millisOfSecond = now.toDateTime().getMillis();
byte bytes[] = String.valueOf(buffer).getBytes("UTF-8");
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(bytes.length);
metadata.setContentType(APPLICATION_JSON.getMimeType());
String key = String.format("%s/%s/%s/%s.json", year, month,
day, millisOfSecond);
String bucket = (String) this.config.get(S3Bolt.BUCKET);
s3Client.putObject(new PutObjectRequest(bucket, key, is,
metadata));
buffer.setLength(0);
return true;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return false;
}
}
return true;
}
}
Thanks a lot