This snippet from the javadoc[1] is crucial:
"This transform sorts by lexicographic comparison of the byte
representations of the secondary keys and may write secondary key-value
pairs to disk."

It is likely that your automatically getting the VarLongCoder which doesn't
have the property where a < b iff encode(a) < encode(b).
You'll want to use a coder which preserves lexicographic order equivalent
to the sorting order that you want. In your case it seems like the
BigEndianLongCoder[2] would work since your only using non-negative values.
If you want to also support negative timestamps, you could use the
InstantCoder[3] (or do the same trick it does).

1:
https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/extensions/sorter/SortValues.html
2:
https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/coders/BigEndianLongCoder.html
3:
https://github.com/apache/beam/blob/abb3d348deca8859f9d15ceb57c86483441cef94/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java#L47

On Tue, Jul 31, 2018 at 7:26 AM Leonardo Campos <
[email protected]> wrote:

> We have a feature request that demands ordering elements for a given user.
>
> Currently, as an option, I'm investigating SortValues from the Java SDK
> extensions (https://beam.apache.org/documentation/sdks/java-extensions/).
>
> Problem: Our simple test don't show the results in order.
>
> I saw that MapElements is internally using a ParDo, which is probably why
> I'm getting out of order. If so, is there a way to avoid this?
>
> Here is the code (running with Direct Runner):
>
> //Creating the example input messagesfinal List<String> testMessages = new 
> ArrayList<>();long time = System.currentTimeMillis();for (int i = 0; i < 100; 
> i++) {
>     testMessages.add("{\"id\":123456789, \"time\":" + (time) + "\", 
> time_ts\":\""+ new Instant(time)+ "\"}");
>     time += TimeUnit.SECONDS.toMillis(1);
> };
>
> TypeDescriptor<KV<Long, KV<Long, String>>> typeDescriptor = 
> TypeDescriptors.kvs(TypeDescriptors.longs(), 
> TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.strings()));
>
> Pipeline pipe = Pipeline.create();
> pipe
>         .apply("Create with test input", Create.of(testMessages))
>         .apply("Converting from Strings to KV<PrimaryKey, KV<SecondaryKey, 
> String>>", MapElements.into(typeDescriptor).via(eventStr -> {
>             Any event = JsonIterator.deserialize(eventStr);
>             return KV.of(event.get("id").toLong(), 
> KV.of(event.get("time").toLong(), eventStr));
>         }))
>         .apply(GroupByKey.<Long, KV<Long, String>>create())
>         .apply(SortValues.<Long, Long, 
> String>create(BufferedExternalSorter.options()))
>         
> .apply(MapElements.into(TypeDescriptors.strings()).via(kvWithIterable->{
>             //Shouldn't this be sorted by the Secondary Key, which is the 
> time?            
> StreamSupport.stream(kvWithIterable.getValue().spliterator(), false)
>                     .map(KV::getValue)
>                     .forEach(System.out::println);
>             return "";
>         }));
>
> pipe.run();
>
> Output in console:
> {"id":123456789, "time":1533046491394", time_ts":"2018-07-31T14:14:51.394Z"}
> {"id":123456789, "time":1533046475394", time_ts":"2018-07-31T14:14:35.394Z"}
> {"id":123456789, "time":1533046459394", time_ts":"2018-07-31T14:14:19.394Z"}
> {"id":123456789, "time":1533046443394", time_ts":"2018-07-31T14:14:03.394Z"}
> {"id":123456789, "time":1533046427394", time_ts":"2018-07-31T14:13:47.394Z"}
> {"id":123456789, "time":1533046411394", time_ts":"2018-07-31T14:13:31.394Z"}
> {"id":123456789, "time":1533046395394", time_ts":"2018-07-31T14:13:15.394Z"}
> {"id":123456789, "time":1533046480394", time_ts":"2018-07-31T14:14:40.394Z"}
> {"id":123456789, "time":1533046464394", time_ts":"2018-07-31T14:14:24.394Z"}
> {"id":123456789, "time":1533046448394", time_ts":"2018-07-31T14:14:08.394Z"}
> {"id":123456789, "time":1533046432394", time_ts":"2018-07-31T14:13:52.394Z"}
> {"id":123456789, "time":1533046416394", time_ts":"2018-07-31T14:13:36.394Z"}
> {"id":123456789, "time":1533046400394", time_ts":"2018-07-31T14:13:20.394Z"}
> {"id":123456789, "time":1533046485394", time_ts":"2018-07-31T14:14:45.394Z"}
> {"id":123456789, "time":1533046469394", time_ts":"2018-07-31T14:14:29.394Z"}
> {"id":123456789, "time":1533046453394", time_ts":"2018-07-31T14:14:13.394Z"}
> {"id":123456789, "time":1533046437394", time_ts":"2018-07-31T14:13:57.394Z"}
> {"id":123456789, "time":1533046421394", time_ts":"2018-07-31T14:13:41.394Z"}
> {"id":123456789, "time":1533046405394", time_ts":"2018-07-31T14:13:25.394Z"}
> {"id":123456789, "time":1533046490394", time_ts":"2018-07-31T14:14:50.394Z"}
> {"id":123456789, "time":1533046474394", time_ts":"2018-07-31T14:14:34.394Z"}
> {"id":123456789, "time":1533046458394", time_ts":"2018-07-31T14:14:18.394Z"}
> {"id":123456789, "time":1533046442394", time_ts":"2018-07-31T14:14:02.394Z"}
> {"id":123456789, "time":1533046426394", time_ts":"2018-07-31T14:13:46.394Z"}
> {"id":123456789, "time":1533046410394", time_ts":"2018-07-31T14:13:30.394Z"}
> {"id":123456789, "time":1533046394394", time_ts":"2018-07-31T14:13:14.394Z"}
> {"id":123456789, "time":1533046479394", time_ts":"2018-07-31T14:14:39.394Z"}
> {"id":123456789, "time":1533046463394", time_ts":"2018-07-31T14:14:23.394Z"}
> {"id":123456789, "time":1533046447394", time_ts":"2018-07-31T14:14:07.394Z"}
> {"id":123456789, "time":1533046431394", time_ts":"2018-07-31T14:13:51.394Z"}
> {"id":123456789, "time":1533046415394", time_ts":"2018-07-31T14:13:35.394Z"}
> {"id":123456789, "time":1533046399394", time_ts":"2018-07-31T14:13:19.394Z"}
> {"id":123456789, "time":1533046484394", time_ts":"2018-07-31T14:14:44.394Z"}
> {"id":123456789, "time":1533046468394", time_ts":"2018-07-31T14:14:28.394Z"}
> {"id":123456789, "time":1533046452394", time_ts":"2018-07-31T14:14:12.394Z"}
> {"id":123456789, "time":1533046436394", time_ts":"2018-07-31T14:13:56.394Z"}
> {"id":123456789, "time":1533046420394", time_ts":"2018-07-31T14:13:40.394Z"}
> {"id":123456789, "time":1533046404394", time_ts":"2018-07-31T14:13:24.394Z"}
> {"id":123456789, "time":1533046489394", time_ts":"2018-07-31T14:14:49.394Z"}
> {"id":123456789, "time":1533046473394", time_ts":"2018-07-31T14:14:33.394Z"}
> {"id":123456789, "time":1533046457394", time_ts":"2018-07-31T14:14:17.394Z"}
> {"id":123456789, "time":1533046441394", time_ts":"2018-07-31T14:14:01.394Z"}
> {"id":123456789, "time":1533046425394", time_ts":"2018-07-31T14:13:45.394Z"}
> {"id":123456789, "time":1533046409394", time_ts":"2018-07-31T14:13:29.394Z"}
> {"id":123456789, "time":1533046393394", time_ts":"2018-07-31T14:13:13.394Z"}
> {"id":123456789, "time":1533046478394", time_ts":"2018-07-31T14:14:38.394Z"}
> {"id":123456789, "time":1533046462394", time_ts":"2018-07-31T14:14:22.394Z"}
> {"id":123456789, "time":1533046446394", time_ts":"2018-07-31T14:14:06.394Z"}
> {"id":123456789, "time":1533046430394", time_ts":"2018-07-31T14:13:50.394Z"}
> {"id":123456789, "time":1533046414394", time_ts":"2018-07-31T14:13:34.394Z"}
> {"id":123456789, "time":1533046398394", time_ts":"2018-07-31T14:13:18.394Z"}
> {"id":123456789, "time":1533046483394", time_ts":"2018-07-31T14:14:43.394Z"}
> {"id":123456789, "time":1533046467394", time_ts":"2018-07-31T14:14:27.394Z"}
> {"id":123456789, "time":1533046451394", time_ts":"2018-07-31T14:14:11.394Z"}
> {"id":123456789, "time":1533046435394", time_ts":"2018-07-31T14:13:55.394Z"}
> {"id":123456789, "time":1533046419394", time_ts":"2018-07-31T14:13:39.394Z"}
> {"id":123456789, "time":1533046403394", time_ts":"2018-07-31T14:13:23.394Z"}
> {"id":123456789, "time":1533046456394", time_ts":"2018-07-31T14:14:16.394Z"}
> {"id":123456789, "time":1533046440394", time_ts":"2018-07-31T14:14:00.394Z"}
> {"id":123456789, "time":1533046424394", time_ts":"2018-07-31T14:13:44.394Z"}
> {"id":123456789, "time":1533046408394", time_ts":"2018-07-31T14:13:28.394Z"}
> {"id":123456789, "time":1533046392394", time_ts":"2018-07-31T14:13:12.394Z"}
> {"id":123456789, "time":1533046488394", time_ts":"2018-07-31T14:14:48.394Z"}
> {"id":123456789, "time":1533046472394", time_ts":"2018-07-31T14:14:32.394Z"}
> {"id":123456789, "time":1533046477394", time_ts":"2018-07-31T14:14:37.394Z"}
> {"id":123456789, "time":1533046461394", time_ts":"2018-07-31T14:14:21.394Z"}
> {"id":123456789, "time":1533046445394", time_ts":"2018-07-31T14:14:05.394Z"}
> {"id":123456789, "time":1533046429394", time_ts":"2018-07-31T14:13:49.394Z"}
> {"id":123456789, "time":1533046413394", time_ts":"2018-07-31T14:13:33.394Z"}
> {"id":123456789, "time":1533046397394", time_ts":"2018-07-31T14:13:17.394Z"}
> {"id":123456789, "time":1533046482394", time_ts":"2018-07-31T14:14:42.394Z"}
> {"id":123456789, "time":1533046466394", time_ts":"2018-07-31T14:14:26.394Z"}
> {"id":123456789, "time":1533046450394", time_ts":"2018-07-31T14:14:10.394Z"}
> {"id":123456789, "time":1533046434394", time_ts":"2018-07-31T14:13:54.394Z"}
> {"id":123456789, "time":1533046418394", time_ts":"2018-07-31T14:13:38.394Z"}
> {"id":123456789, "time":1533046402394", time_ts":"2018-07-31T14:13:22.394Z"}
> {"id":123456789, "time":1533046407394", time_ts":"2018-07-31T14:13:27.394Z"}
> {"id":123456789, "time":1533046487394", time_ts":"2018-07-31T14:14:47.394Z"}
> {"id":123456789, "time":1533046471394", time_ts":"2018-07-31T14:14:31.394Z"}
> {"id":123456789, "time":1533046455394", time_ts":"2018-07-31T14:14:15.394Z"}
> {"id":123456789, "time":1533046439394", time_ts":"2018-07-31T14:13:59.394Z"}
> {"id":123456789, "time":1533046423394", time_ts":"2018-07-31T14:13:43.394Z"}
> {"id":123456789, "time":1533046476394", time_ts":"2018-07-31T14:14:36.394Z"}
> {"id":123456789, "time":1533046460394", time_ts":"2018-07-31T14:14:20.394Z"}
> {"id":123456789, "time":1533046444394", time_ts":"2018-07-31T14:14:04.394Z"}
> {"id":123456789, "time":1533046428394", time_ts":"2018-07-31T14:13:48.394Z"}
> {"id":123456789, "time":1533046412394", time_ts":"2018-07-31T14:13:32.394Z"}
> {"id":123456789, "time":1533046396394", time_ts":"2018-07-31T14:13:16.394Z"}
> {"id":123456789, "time":1533046481394", time_ts":"2018-07-31T14:14:41.394Z"}
> {"id":123456789, "time":1533046465394", time_ts":"2018-07-31T14:14:25.394Z"}
> {"id":123456789, "time":1533046449394", time_ts":"2018-07-31T14:14:09.394Z"}
> {"id":123456789, "time":1533046433394", time_ts":"2018-07-31T14:13:53.394Z"}
> {"id":123456789, "time":1533046417394", time_ts":"2018-07-31T14:13:37.394Z"}
> {"id":123456789, "time":1533046401394", time_ts":"2018-07-31T14:13:21.394Z"}
> {"id":123456789, "time":1533046486394", time_ts":"2018-07-31T14:14:46.394Z"}
> {"id":123456789, "time":1533046470394", time_ts":"2018-07-31T14:14:30.394Z"}
> {"id":123456789, "time":1533046454394", time_ts":"2018-07-31T14:14:14.394Z"}
> {"id":123456789, "time":1533046438394", time_ts":"2018-07-31T14:13:58.394Z"}
> {"id":123456789, "time":1533046422394", time_ts":"2018-07-31T14:13:42.394Z"}
> {"id":123456789, "time":1533046406394", time_ts":"2018-07-31T14:13:26.394Z"}
>
> So, the question is: How does SortValue work so that we can threat each
> message in the order given by the secondary key?
>

Reply via email to