We have a feature request that demands ordering elements for a given user.
Currently, as an option, I'm investigating SortValues from the Java SDK
extensions (https://beam.apache.org/documentation/sdks/java-extensions/).
Problem: Our simple test don't show the results in order.
I saw that MapElements is internally using a ParDo, which is probably
why I'm getting out of order. If so, is there a way to avoid this?
Here is the code (running with Direct Runner):
//Creating the example input messages final List<String> testMessages =new
ArrayList<>();
long time = System.currentTimeMillis();
for (int i =0; i <100; i++) {
testMessages.add("{\"id\":123456789, \"time\":" + (time) +"\", time_ts\":\""+new
Instant(time)+"\"}");
time += TimeUnit.SECONDS.toMillis(1);
};
TypeDescriptor<KV<Long, KV<Long, String>>> typeDescriptor =
TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.kvs(TypeDescriptors.longs(),
TypeDescriptors.strings()));
Pipeline pipe = Pipeline.create();
pipe
.apply("Create with test input", Create.of(testMessages))
.apply("Converting from Strings to KV<PrimaryKey, KV<SecondaryKey,
String>>", MapElements.into(typeDescriptor).via(eventStr -> {
Any event = JsonIterator.deserialize(eventStr);
return KV.of(event.get("id").toLong(),
KV.of(event.get("time").toLong(), eventStr));
}))
.apply(GroupByKey.<Long, KV<Long, String>>create())
.apply(SortValues.<Long, Long,
String>create(BufferedExternalSorter.options()))
.apply(MapElements.into(TypeDescriptors.strings()).via(kvWithIterable->{
//Shouldn't this be sorted by the Secondary Key, which is the time?
StreamSupport.stream(kvWithIterable.getValue().spliterator(),false)
.map(KV::getValue)
.forEach(System.out::println);
return "";
}));
pipe.run();
Output in console:
{"id":123456789, "time":1533046491394", time_ts":"2018-07-31T14:14:51.394Z"}
{"id":123456789, "time":1533046475394", time_ts":"2018-07-31T14:14:35.394Z"}
{"id":123456789, "time":1533046459394", time_ts":"2018-07-31T14:14:19.394Z"}
{"id":123456789, "time":1533046443394", time_ts":"2018-07-31T14:14:03.394Z"}
{"id":123456789, "time":1533046427394", time_ts":"2018-07-31T14:13:47.394Z"}
{"id":123456789, "time":1533046411394", time_ts":"2018-07-31T14:13:31.394Z"}
{"id":123456789, "time":1533046395394", time_ts":"2018-07-31T14:13:15.394Z"}
{"id":123456789, "time":1533046480394", time_ts":"2018-07-31T14:14:40.394Z"}
{"id":123456789, "time":1533046464394", time_ts":"2018-07-31T14:14:24.394Z"}
{"id":123456789, "time":1533046448394", time_ts":"2018-07-31T14:14:08.394Z"}
{"id":123456789, "time":1533046432394", time_ts":"2018-07-31T14:13:52.394Z"}
{"id":123456789, "time":1533046416394", time_ts":"2018-07-31T14:13:36.394Z"}
{"id":123456789, "time":1533046400394", time_ts":"2018-07-31T14:13:20.394Z"}
{"id":123456789, "time":1533046485394", time_ts":"2018-07-31T14:14:45.394Z"}
{"id":123456789, "time":1533046469394", time_ts":"2018-07-31T14:14:29.394Z"}
{"id":123456789, "time":1533046453394", time_ts":"2018-07-31T14:14:13.394Z"}
{"id":123456789, "time":1533046437394", time_ts":"2018-07-31T14:13:57.394Z"}
{"id":123456789, "time":1533046421394", time_ts":"2018-07-31T14:13:41.394Z"}
{"id":123456789, "time":1533046405394", time_ts":"2018-07-31T14:13:25.394Z"}
{"id":123456789, "time":1533046490394", time_ts":"2018-07-31T14:14:50.394Z"}
{"id":123456789, "time":1533046474394", time_ts":"2018-07-31T14:14:34.394Z"}
{"id":123456789, "time":1533046458394", time_ts":"2018-07-31T14:14:18.394Z"}
{"id":123456789, "time":1533046442394", time_ts":"2018-07-31T14:14:02.394Z"}
{"id":123456789, "time":1533046426394", time_ts":"2018-07-31T14:13:46.394Z"}
{"id":123456789, "time":1533046410394", time_ts":"2018-07-31T14:13:30.394Z"}
{"id":123456789, "time":1533046394394", time_ts":"2018-07-31T14:13:14.394Z"}
{"id":123456789, "time":1533046479394", time_ts":"2018-07-31T14:14:39.394Z"}
{"id":123456789, "time":1533046463394", time_ts":"2018-07-31T14:14:23.394Z"}
{"id":123456789, "time":1533046447394", time_ts":"2018-07-31T14:14:07.394Z"}
{"id":123456789, "time":1533046431394", time_ts":"2018-07-31T14:13:51.394Z"}
{"id":123456789, "time":1533046415394", time_ts":"2018-07-31T14:13:35.394Z"}
{"id":123456789, "time":1533046399394", time_ts":"2018-07-31T14:13:19.394Z"}
{"id":123456789, "time":1533046484394", time_ts":"2018-07-31T14:14:44.394Z"}
{"id":123456789, "time":1533046468394", time_ts":"2018-07-31T14:14:28.394Z"}
{"id":123456789, "time":1533046452394", time_ts":"2018-07-31T14:14:12.394Z"}
{"id":123456789, "time":1533046436394", time_ts":"2018-07-31T14:13:56.394Z"}
{"id":123456789, "time":1533046420394", time_ts":"2018-07-31T14:13:40.394Z"}
{"id":123456789, "time":1533046404394", time_ts":"2018-07-31T14:13:24.394Z"}
{"id":123456789, "time":1533046489394", time_ts":"2018-07-31T14:14:49.394Z"}
{"id":123456789, "time":1533046473394", time_ts":"2018-07-31T14:14:33.394Z"}
{"id":123456789, "time":1533046457394", time_ts":"2018-07-31T14:14:17.394Z"}
{"id":123456789, "time":1533046441394", time_ts":"2018-07-31T14:14:01.394Z"}
{"id":123456789, "time":1533046425394", time_ts":"2018-07-31T14:13:45.394Z"}
{"id":123456789, "time":1533046409394", time_ts":"2018-07-31T14:13:29.394Z"}
{"id":123456789, "time":1533046393394", time_ts":"2018-07-31T14:13:13.394Z"}
{"id":123456789, "time":1533046478394", time_ts":"2018-07-31T14:14:38.394Z"}
{"id":123456789, "time":1533046462394", time_ts":"2018-07-31T14:14:22.394Z"}
{"id":123456789, "time":1533046446394", time_ts":"2018-07-31T14:14:06.394Z"}
{"id":123456789, "time":1533046430394", time_ts":"2018-07-31T14:13:50.394Z"}
{"id":123456789, "time":1533046414394", time_ts":"2018-07-31T14:13:34.394Z"}
{"id":123456789, "time":1533046398394", time_ts":"2018-07-31T14:13:18.394Z"}
{"id":123456789, "time":1533046483394", time_ts":"2018-07-31T14:14:43.394Z"}
{"id":123456789, "time":1533046467394", time_ts":"2018-07-31T14:14:27.394Z"}
{"id":123456789, "time":1533046451394", time_ts":"2018-07-31T14:14:11.394Z"}
{"id":123456789, "time":1533046435394", time_ts":"2018-07-31T14:13:55.394Z"}
{"id":123456789, "time":1533046419394", time_ts":"2018-07-31T14:13:39.394Z"}
{"id":123456789, "time":1533046403394", time_ts":"2018-07-31T14:13:23.394Z"}
{"id":123456789, "time":1533046456394", time_ts":"2018-07-31T14:14:16.394Z"}
{"id":123456789, "time":1533046440394", time_ts":"2018-07-31T14:14:00.394Z"}
{"id":123456789, "time":1533046424394", time_ts":"2018-07-31T14:13:44.394Z"}
{"id":123456789, "time":1533046408394", time_ts":"2018-07-31T14:13:28.394Z"}
{"id":123456789, "time":1533046392394", time_ts":"2018-07-31T14:13:12.394Z"}
{"id":123456789, "time":1533046488394", time_ts":"2018-07-31T14:14:48.394Z"}
{"id":123456789, "time":1533046472394", time_ts":"2018-07-31T14:14:32.394Z"}
{"id":123456789, "time":1533046477394", time_ts":"2018-07-31T14:14:37.394Z"}
{"id":123456789, "time":1533046461394", time_ts":"2018-07-31T14:14:21.394Z"}
{"id":123456789, "time":1533046445394", time_ts":"2018-07-31T14:14:05.394Z"}
{"id":123456789, "time":1533046429394", time_ts":"2018-07-31T14:13:49.394Z"}
{"id":123456789, "time":1533046413394", time_ts":"2018-07-31T14:13:33.394Z"}
{"id":123456789, "time":1533046397394", time_ts":"2018-07-31T14:13:17.394Z"}
{"id":123456789, "time":1533046482394", time_ts":"2018-07-31T14:14:42.394Z"}
{"id":123456789, "time":1533046466394", time_ts":"2018-07-31T14:14:26.394Z"}
{"id":123456789, "time":1533046450394", time_ts":"2018-07-31T14:14:10.394Z"}
{"id":123456789, "time":1533046434394", time_ts":"2018-07-31T14:13:54.394Z"}
{"id":123456789, "time":1533046418394", time_ts":"2018-07-31T14:13:38.394Z"}
{"id":123456789, "time":1533046402394", time_ts":"2018-07-31T14:13:22.394Z"}
{"id":123456789, "time":1533046407394", time_ts":"2018-07-31T14:13:27.394Z"}
{"id":123456789, "time":1533046487394", time_ts":"2018-07-31T14:14:47.394Z"}
{"id":123456789, "time":1533046471394", time_ts":"2018-07-31T14:14:31.394Z"}
{"id":123456789, "time":1533046455394", time_ts":"2018-07-31T14:14:15.394Z"}
{"id":123456789, "time":1533046439394", time_ts":"2018-07-31T14:13:59.394Z"}
{"id":123456789, "time":1533046423394", time_ts":"2018-07-31T14:13:43.394Z"}
{"id":123456789, "time":1533046476394", time_ts":"2018-07-31T14:14:36.394Z"}
{"id":123456789, "time":1533046460394", time_ts":"2018-07-31T14:14:20.394Z"}
{"id":123456789, "time":1533046444394", time_ts":"2018-07-31T14:14:04.394Z"}
{"id":123456789, "time":1533046428394", time_ts":"2018-07-31T14:13:48.394Z"}
{"id":123456789, "time":1533046412394", time_ts":"2018-07-31T14:13:32.394Z"}
{"id":123456789, "time":1533046396394", time_ts":"2018-07-31T14:13:16.394Z"}
{"id":123456789, "time":1533046481394", time_ts":"2018-07-31T14:14:41.394Z"}
{"id":123456789, "time":1533046465394", time_ts":"2018-07-31T14:14:25.394Z"}
{"id":123456789, "time":1533046449394", time_ts":"2018-07-31T14:14:09.394Z"}
{"id":123456789, "time":1533046433394", time_ts":"2018-07-31T14:13:53.394Z"}
{"id":123456789, "time":1533046417394", time_ts":"2018-07-31T14:13:37.394Z"}
{"id":123456789, "time":1533046401394", time_ts":"2018-07-31T14:13:21.394Z"}
{"id":123456789, "time":1533046486394", time_ts":"2018-07-31T14:14:46.394Z"}
{"id":123456789, "time":1533046470394", time_ts":"2018-07-31T14:14:30.394Z"}
{"id":123456789, "time":1533046454394", time_ts":"2018-07-31T14:14:14.394Z"}
{"id":123456789, "time":1533046438394", time_ts":"2018-07-31T14:13:58.394Z"}
{"id":123456789, "time":1533046422394", time_ts":"2018-07-31T14:13:42.394Z"}
{"id":123456789, "time":1533046406394", time_ts":"2018-07-31T14:13:26.394Z"}
So, the question is: How does SortValue work so that we can threat each
message in the order given by the secondary key?