wonook commented on code in PR #314:
URL: https://github.com/apache/incubator-nemo/pull/314#discussion_r934156561
##########
examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java:
##########
@@ -43,36 +45,45 @@ private WindowedWordCount() {
public static final String INPUT_TYPE_BOUNDED = "bounded";
public static final String INPUT_TYPE_UNBOUNDED = "unbounded";
private static final String SPLITTER = "!";
-
+ public static final Random RAND = new Random();
/**
* @param p pipeline.
- * @param args arguments.
+ * @param inputType input type arg.
+ * @param inputFilePath input file path arg.
* @return source.
*/
private static PCollection<KV<String, Long>> getSource(
final Pipeline p,
- final String[] args) {
+ final String inputType,
+ final String inputFilePath) {
- final String inputType = args[2];
if (inputType.compareTo(INPUT_TYPE_BOUNDED) == 0) {
- final String inputFilePath = args[3];
return GenericSourceSink.read(p, inputFilePath)
.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(@Element final String elem,
final OutputReceiver<String> out) {
final String[] splitt = elem.split(SPLITTER);
- out.outputWithTimestamp(splitt[0], new
Instant(Long.valueOf(splitt[1])));
+ if (splitt.length > 1 && splitt[1].matches("[0-9]+")) {
+ out.outputWithTimestamp(splitt[0], new
Instant(Long.valueOf(splitt[1])));
+ } else {
+ final long timestamp = System.currentTimeMillis() -
RAND.nextInt(1000000);
+ out.outputWithTimestamp(elem, new Instant(timestamp));
Review Comment:
This code produces timestamps within 1000s of the past in order to
demonstrate out-of-order data and so. I've added this as a comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]