Hi Tobias, The specific error there looks like you have a forbidden null somewhere deep inside the output of logLine.toTableRow(). Hard to say more with this information.
Kenn On Fri, Feb 17, 2017 at 4:46 AM, Tobias Feldhaus < [email protected]> wrote: > It seems like this is caused by the fact that the workaround I am using to > write > daily-partitioned tables in batch mode does not work. > > My problem is that with more than 1000 days, the date-sharded table in BQ > will > be too large to be converted automatically via a simple “bq partition” > command > into a partitioned table as such table cannot have more than 1000 days. > > So the solution will be a divide-and-conquer strategy I guess. > > On 17.02.17, 11:36, "Tobias Feldhaus" <[email protected]> > wrote: > > Hello, > > could it be, that it's no longer possible to run pipelines with a > BigQuery sink > locally on the dev machine? I migrated a "Read JSON from GCS, parse and > write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK. > All tests are green, the pipeline runs successfully on the Dataflow > service with > the test files, but locally with the DirectRunner I get a NPE. > > It happens right after I create the TableRow element which I even > double > checked not to be null. Even when I artificially create a LogLine > element in this step without taking the one from the input the NPE is > thrown: > > > static class Outputter extends DoFn<LogLine, TableRow> { > (...) > LogLine logLine = c.element(); > > TableRow tableRow = logLine.toTableRow(); > tableRow.set("ts", c.timestamp().toString()); > > if (c != null && tableRow != null){ > try { > > c.output(tableRow); > } > catch(NullPointerException e){ > LOG.error("catched NPE"); > e.printStackTrace(); > } > } > > The corrensponding Stacktrace looks like this: > > ERROR: catched NPE > java.lang.NullPointerException > at com.google.api.client.util.ArrayMap$Entry.hashCode( > ArrayMap.java:419) > at java.util.AbstractMap.hashCode(AbstractMap.java:530) > at com.google.api.client.util.ArrayMap$Entry.hashCode( > ArrayMap.java:419) > at java.util.AbstractMap.hashCode(AbstractMap.java:530) > at java.util.Arrays.hashCode(Arrays.java:4146) > at java.util.Objects.hash(Objects.java:128) > at org.apache.beam.sdk.util.WindowedValue$ > TimestampedValueInGlobalWindow.hashCode(WindowedValue.java:409) > at java.util.HashMap.hash(HashMap.java:338) > at java.util.HashMap.get(HashMap.java:556) > at org.apache.beam.runners.direct.repackaged.com.google. > common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java: > 193) > at org.apache.beam.runners.direct.repackaged.com.google. > common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128) > at org.apache.beam.runners.direct.repackaged.com.google. > common.collect.HashMultimap.put(HashMultimap.java:49) > at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFact > ory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFact > ory.java:112) > at org.apache.beam.runners.direct.ParDoEvaluator$ > BundleOutputManager.output(ParDoEvaluator.java:198) > at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext. > outputWindowedValue(SimpleDoFnRunner.java:352) > at org.apache.beam.runners.core.SimpleDoFnRunner$ > DoFnProcessContext.output(SimpleDoFnRunner.java:553) > at ch.localsearch.dataintel.logfiles.FrontendPipeline$ > Outputter.processElement(FrontendPipeline.java:181) > at ch.localsearch.dataintel.logfiles.FrontendPipeline$ > Outputter$auxiliary$sxgOpc6N.invokeProcessElement(Unknown Source) > at org.apache.beam.runners.core.SimpleDoFnRunner. > invokeProcessElement(SimpleDoFnRunner.java:199) > at org.apache.beam.runners.core.SimpleDoFnRunner.processElement( > SimpleDoFnRunner.java:161) > at org.apache.beam.runners.core.PushbackSideInputDoFnRunner. > processElement(PushbackSideInputDoFnRunner.java:111) > at org.apache.beam.runners.core.PushbackSideInputDoFnRunner. > processElementInReadyWindows(PushbackSideInputDoFnRunner.java:77) > at org.apache.beam.runners.direct.ParDoEvaluator. > processElement(ParDoEvaluator.java:134) > at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTr > ansformEvaluator.processElement(DoFnLifecycleManagerRemovingTr > ansformEvaluator.java:51) > at org.apache.beam.runners.direct.TransformExecutor. > processElements(TransformExecutor.java:139) > at org.apache.beam.runners.direct.TransformExecutor.run( > TransformExecutor.java:107) > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Best, > Tobias > > > >
