Hi Kenneth,
I am happy to!
Here is the stack trace:
Exception in thread "main" java.lang.NullPointerException: Outputs for
non-root node 1m_CalculatePageViews/1m_format_pageviews are null
at
org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:495)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$400(TransformHierarchy.java:235)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:210)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:409)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:520)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:294)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
at com.greta.Start.main(Start.java:85)
Process finished with exit code 1
Here is some cuts from how I try to setup the pipe:
Its start with a init on a PubSub topic unbound like:
streamData.apply(rollupM + "_CalculatePageViews", new
CalculatePageViews(Duration.standardMinutes(WINDOW_SIZE),
Duration.standardMinutes(10), rollupM))
.apply(rollupM + "_Pageviews_toPubSub",
PubsubIO.writeStrings().to(TOPICPUBSUB));
And CalculatePageViews looks like:
static class CalculatePageViews extends PTransform<PCollection<TableRow>,
PCollection<String>> {
private final Duration windowDuration;
private final Duration allowedLateness;
private final String rollup;
CalculatePageViews(Duration windowDuration, Duration allowedLateness,
String rollup) {
this.windowDuration = windowDuration;
this.allowedLateness = allowedLateness;
this.rollup = rollup;
}
@Override
public PCollection<String> expand(PCollection<TableRow> infos) {
return infos.apply(rollup + "CalculatePageViewsFixedWindows",
Window.<TableRow>into(FixedWindows.of(windowDuration))
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(windowDuration))
.discardingFiredPanes()
.withAllowedLateness(allowedLateness))
.apply(rollup + "_byAcc_pageviews", ParDo.of(new
MapByAcc()))
.apply(rollup + "_byAC_pageviews", GroupByKey.<String,
TableRow>create())
.apply(rollup + "_pardo_pageviews", Combine.<String,
TableRow, JsonDataStructure[]>groupedValues(new Pageviews(rollup)))
.apply(rollup + "_format_pageviews", MapElements.via(new
WriteToPubSub()));
}
}
Where the error is throw on the last line .apply(rollup +
"_format_pageviews", MapElements.via(new WriteToPubSub()));
but if I remove the Combine.groupedValues (just as a test) it works fin so
the problem looks to be in the Combine.groupedValues I use this because I
have a PCollection of very many TableRow for my hour and day window. so I
want to spit the calculations more then a normal ParDo, But if I change the
Combine.groupedValues to a ParDo and do the changes needed it also works.
So will also share the code for Pageviews,
public class Pageviews extends Combine.CombineFn<TableRow, Pageviews.Accum,
JsonDataStructure[]> {
private static final Logger LOG =
LoggerFactory.getLogger(Pageviews.class);
public String rollup;
public Pageviews(String rollup) {
this.rollup = rollup;
}
public static class Accum {
MapStorage mapStore = new MapStorage();
String rollup = "";
}
@Override
public Accum createAccumulator() {
Accum accum = new Accum();
accum.rollup = this.rollup;
return accum;
}
@Override
public Accum addInput(Accum accum, TableRow input) {
String geo = input.get("lat") + "." + input.get("lng");
String mapKey = "pageviews.geo." + geo;
String opentsdbKey = input.get("accessToken") + ".user.pageview";
String time = null;
try {
time = parseHelper.parseTime((String) input.get("time"));
} catch (Exception e) {
LOG.trace("Cant parse time");
}
Map<String, String> tagsMap = null;
try {
tagsMap = parseHelper.parseGeo((HashMap) input.get("geo"));
} catch (Exception e) {
LOG.trace("Cant parse geo");
}
tagsMap.put("geo", geo);
if (accum.rollup.equals("1m")) {
accum.mapStore.increment(mapKey, 1f, opentsdbKey, time,
tagsMap);
}
accum.mapStore.rollup(mapKey, opentsdbKey, time, tagsMap,
accum.rollup, 1f, 1f);
// all pageviews
String allMapKey = mapKey + ".all";
String allOpentsdbKey = "all.user.pageview";
if (accum.rollup.equals("1m")) {
accum.mapStore.increment(allMapKey, 1f, allOpentsdbKey, time,
tagsMap);
}
accum.mapStore.rollup(allMapKey, allOpentsdbKey, time, tagsMap,
accum.rollup, 1f, 1f);
return accum;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
for (String key : accum.mapStore.map.keySet()) {
if (accum.rollup.equals("1m")) {
merged.mapStore.increment(key,
accum.mapStore.map.get(key).value,
accum.mapStore.map.get(key).openTSDBKey,
accum.mapStore.map.get(key).time,
accum.mapStore.map.get(key).tags);
}
merged.mapStore.rollup(key,
accum.mapStore.map.get(key).openTSDBKey,
accum.mapStore.map.get(key).time,
accum.mapStore.map.get(key).tags,
accum.rollup,
accum.mapStore.map.get(key).sum,
accum.mapStore.map.get(key).count);
}
}
return merged;
}
@Override
public JsonDataStructure[] extractOutput(Accum accum) {
LOG.info(accum.rollup + " pageviews map size " +
accum.mapStore.map.size());
JsonDataStructure[] list = accum.mapStore.map.values().toArray(new
JsonDataStructure[accum.mapStore.map.values().size()]);
LOG.info(accum.rollup + "pageviews list length " + list.length);
return list;
}
}
Totally feels like I missed something that i should have gotten...
Thank you Kenneth
Best
Dennis
On Fri, 21 Jul 2017 at 17:35 Kenneth Knowles <[email protected]> wrote:
> Do you mind sharing a full stack trace, and perhaps a code snippet?
>
> On Fri, Jul 21, 2017 at 7:36 AM, Dennis Mårtensson <[email protected]>
> wrote:
>
>> Hi, I have a pipeline that do fixed windows and then it create a KV and
>> then GroupByKey the all this is fine and then I do a Combine.groupedValues
>> with a my own Combine.CombineFn and the the two last steps is a
>> MapElements.via() my a SimpleFunction that crates a JSON string and the
>> last step is a PubsubIO.writeStrings().to() and every thing looks find in
>> the IDE but then when I try to run it I get `Exception in thread "main"
>> java.lang.NullPointerException: Outputs for non-root node
>> 1m_CalculatePageViews/1m_format_pageviews are null`Anyone have any ideas
>> around what I have missed in my pipeline design?
>>
>>
>> dennis [4:02 PM]
>> where the 1m_format_pageviews is the MapElements.via() that create json
>>
>
>