It looks to me like the NPE comes from the Google API client library. It
looks like maybe you are creating an invalid tablerow (null key? null
value?)

        at com.google.api.client.util.ArrayMap$Entry.hashCode(
ArrayMap.java:419)

Dan

On Fri, Feb 17, 2017 at 3:19 PM, Kenneth Knowles <[email protected]> wrote:

> Hi Tobias,
>
> The specific error there looks like you have a forbidden null somewhere
> deep inside the output of logLine.toTableRow(). Hard to say more with this
> information.
>
> Kenn
>
> On Fri, Feb 17, 2017 at 4:46 AM, Tobias Feldhaus <
> [email protected]> wrote:
>
>> It seems like this is caused by the fact that the workaround I am using
>> to write
>> daily-partitioned tables in batch mode does not work.
>>
>> My problem is that with more than 1000 days, the date-sharded table in BQ
>> will
>> be too large to be converted automatically via a simple “bq partition”
>> command
>> into a partitioned table as such table cannot have more than 1000 days.
>>
>> So the solution will be a divide-and-conquer strategy I guess.
>>
>> On 17.02.17, 11:36, "Tobias Feldhaus" <[email protected]>
>> wrote:
>>
>>     Hello,
>>
>>     could it be, that it's no longer possible to run pipelines with a
>> BigQuery sink
>>     locally on the dev machine? I migrated a "Read JSON from GCS, parse
>> and
>>     write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK.
>>     All tests are green, the pipeline runs successfully on the Dataflow
>> service with
>>     the test files, but locally with the DirectRunner I get a NPE.
>>
>>     It happens right after I create the TableRow element which I even
>> double
>>     checked not to be null. Even when I artificially create a LogLine
>>     element in this step without taking the one from the input the NPE is
>> thrown:
>>
>>
>>     static class Outputter extends DoFn<LogLine, TableRow> {
>>     (...)
>>         LogLine logLine = c.element();
>>
>>         TableRow tableRow = logLine.toTableRow();
>>         tableRow.set("ts", c.timestamp().toString());
>>
>>         if (c != null && tableRow != null){
>>             try {
>>
>>                 c.output(tableRow);
>>             }
>>             catch(NullPointerException e){
>>                 LOG.error("catched NPE");
>>                 e.printStackTrace();
>>             }
>>         }
>>
>>     The corrensponding Stacktrace looks like this:
>>
>>     ERROR: catched NPE
>>     java.lang.NullPointerException
>>         at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.
>> java:419)
>>         at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>>         at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.
>> java:419)
>>         at java.util.AbstractMap.hashCode(AbstractMap.java:530)
>>         at java.util.Arrays.hashCode(Arrays.java:4146)
>>         at java.util.Objects.hash(Objects.java:128)
>>         at org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlo
>> balWindow.hashCode(WindowedValue.java:409)
>>         at java.util.HashMap.hash(HashMap.java:338)
>>         at java.util.HashMap.get(HashMap.java:556)
>>         at org.apache.beam.runners.direct.repackaged.com.google.common.
>> collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:193)
>>         at org.apache.beam.runners.direct.repackaged.com.google.common.
>> collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
>>         at org.apache.beam.runners.direct.repackaged.com.google.common.
>> collect.HashMultimap.put(HashMultimap.java:49)
>>         at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFac
>> tory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBun
>> dleFactory.java:112)
>>         at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputMa
>> nager.output(ParDoEvaluator.java:198)
>>         at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.ou
>> tputWindowedValue(SimpleDoFnRunner.java:352)
>>         at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessCon
>> text.output(SimpleDoFnRunner.java:553)
>>         at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter
>> .processElement(FrontendPipeline.java:181)
>>         at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter
>> $auxiliary$sxgOpc6N.invokeProcessElement(Unknown Source)
>>         at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessE
>> lement(SimpleDoFnRunner.java:199)
>>         at org.apache.beam.runners.core.SimpleDoFnRunner.processElement
>> (SimpleDoFnRunner.java:161)
>>         at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro
>> cessElement(PushbackSideInputDoFnRunner.java:111)
>>         at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro
>> cessElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
>>         at org.apache.beam.runners.direct.ParDoEvaluator.processElement
>> (ParDoEvaluator.java:134)
>>         at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingT
>> ransformEvaluator.processElement(DoFnLifecycleManagerRemovin
>> gTransformEvaluator.java:51)
>>         at org.apache.beam.runners.direct.TransformExecutor.processElem
>> ents(TransformExecutor.java:139)
>>         at org.apache.beam.runners.direct.TransformExecutor.run(Transfo
>> rmExecutor.java:107)
>>         at java.util.concurrent.Executors$RunnableAdapter.call(
>> Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>     Best,
>>     Tobias
>>
>>
>>
>>
>

Reply via email to