Hi Tobias,

The specific error there looks like you have a forbidden null somewhere
deep inside the output of logLine.toTableRow(). Hard to say more with this
information.

Kenn

On Fri, Feb 17, 2017 at 4:46 AM, Tobias Feldhaus <
[email protected]> wrote:

> It seems like this is caused by the fact that the workaround I am using to
> write
> daily-partitioned tables in batch mode does not work.
>
> My problem is that with more than 1000 days, the date-sharded table in BQ
> will
> be too large to be converted automatically via a simple “bq partition”
> command
> into a partitioned table as such table cannot have more than 1000 days.
>
> So the solution will be a divide-and-conquer strategy I guess.
>
> On 17.02.17, 11:36, "Tobias Feldhaus" <[email protected]>
> wrote:
>
>     Hello,
>
>     could it be, that it's no longer possible to run pipelines with a
> BigQuery sink
>     locally on the dev machine? I migrated a "Read JSON from GCS, parse and
>     write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK.
>     All tests are green, the pipeline runs successfully on the Dataflow
> service with
>     the test files, but locally with the DirectRunner I get a NPE.
>
>     It happens right after I create the TableRow element which I even
> double
>     checked not to be null. Even when I artificially create a LogLine
>     element in this step without taking the one from the input the NPE is
> thrown:
>
>
>     static class Outputter extends DoFn<LogLine, TableRow> {
>     (...)
>         LogLine logLine = c.element();
>
>         TableRow tableRow = logLine.toTableRow();
>         tableRow.set("ts", c.timestamp().toString());
>
>         if (c != null && tableRow != null){
>             try {
>
>                 c.output(tableRow);
>             }
>             catch(NullPointerException e){
>                 LOG.error("catched NPE");
>                 e.printStackTrace();
>             }
>         }
>
>     The corrensponding Stacktrace looks like this:
>
>     ERROR: catched NPE
>     java.lang.NullPointerException
>         at com.google.api.client.util.ArrayMap$Entry.hashCode(
> ArrayMap.java:419)
>         at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>         at com.google.api.client.util.ArrayMap$Entry.hashCode(
> ArrayMap.java:419)
>         at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>         at java.util.Arrays.hashCode(Arrays.java:4146)
>         at java.util.Objects.hash(Objects.java:128)
>         at org.apache.beam.sdk.util.WindowedValue$
> TimestampedValueInGlobalWindow.hashCode(WindowedValue.java:409)
>         at java.util.HashMap.hash(HashMap.java:338)
>         at java.util.HashMap.get(HashMap.java:556)
>         at org.apache.beam.runners.direct.repackaged.com.google.
> common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:
> 193)
>         at org.apache.beam.runners.direct.repackaged.com.google.
> common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
>         at org.apache.beam.runners.direct.repackaged.com.google.
> common.collect.HashMultimap.put(HashMultimap.java:49)
>         at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFact
> ory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFact
> ory.java:112)
>         at org.apache.beam.runners.direct.ParDoEvaluator$
> BundleOutputManager.output(ParDoEvaluator.java:198)
>         at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.
> outputWindowedValue(SimpleDoFnRunner.java:352)
>         at org.apache.beam.runners.core.SimpleDoFnRunner$
> DoFnProcessContext.output(SimpleDoFnRunner.java:553)
>         at ch.localsearch.dataintel.logfiles.FrontendPipeline$
> Outputter.processElement(FrontendPipeline.java:181)
>         at ch.localsearch.dataintel.logfiles.FrontendPipeline$
> Outputter$auxiliary$sxgOpc6N.invokeProcessElement(Unknown Source)
>         at org.apache.beam.runners.core.SimpleDoFnRunner.
> invokeProcessElement(SimpleDoFnRunner.java:199)
>         at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(
> SimpleDoFnRunner.java:161)
>         at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.
> processElement(PushbackSideInputDoFnRunner.java:111)
>         at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.
> processElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
>         at org.apache.beam.runners.direct.ParDoEvaluator.
> processElement(ParDoEvaluator.java:134)
>         at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTr
> ansformEvaluator.processElement(DoFnLifecycleManagerRemovingTr
> ansformEvaluator.java:51)
>         at org.apache.beam.runners.direct.TransformExecutor.
> processElements(TransformExecutor.java:139)
>         at org.apache.beam.runners.direct.TransformExecutor.run(
> TransformExecutor.java:107)
>         at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
>     Best,
>     Tobias
>
>
>
>

Reply via email to