It seems like this is caused by the fact that the workaround I am using to 
write 
daily-partitioned tables in batch mode does not work.

My problem is that with more than 1000 days, the date-sharded table in BQ will 
be too large to be converted automatically via a simple “bq partition” command 
into a partitioned table as such table cannot have more than 1000 days. 

So the solution will be a divide-and-conquer strategy I guess.

On 17.02.17, 11:36, "Tobias Feldhaus" <[email protected]> wrote:

    Hello,
    
    could it be, that it's no longer possible to run pipelines with a BigQuery 
sink 
    locally on the dev machine? I migrated a "Read JSON from GCS, parse and 
    write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK. 
    All tests are green, the pipeline runs successfully on the Dataflow service 
with 
    the test files, but locally with the DirectRunner I get a NPE.
    
    It happens right after I create the TableRow element which I even double 
    checked not to be null. Even when I artificially create a LogLine 
    element in this step without taking the one from the input the NPE is 
thrown:
    
    
    static class Outputter extends DoFn<LogLine, TableRow> {
    (...)
        LogLine logLine = c.element();
    
        TableRow tableRow = logLine.toTableRow();
        tableRow.set("ts", c.timestamp().toString());
    
        if (c != null && tableRow != null){
            try {
    
                c.output(tableRow);
            }
            catch(NullPointerException e){
                LOG.error("catched NPE");
                e.printStackTrace();
            }
        }
    
    The corrensponding Stacktrace looks like this:
    
    ERROR: catched NPE
    java.lang.NullPointerException
        at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
        at java.util.AbstractMap.hashCode(AbstractMap.java:530)
        at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
        at java.util.AbstractMap.hashCode(AbstractMap.java:530)
        at java.util.Arrays.hashCode(Arrays.java:4146)
        at java.util.Objects.hash(Objects.java:128)
        at 
org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow.hashCode(WindowedValue.java:409)
        at java.util.HashMap.hash(HashMap.java:338)
        at java.util.HashMap.get(HashMap.java:556)
        at 
org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:193)
        at 
org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
        at 
org.apache.beam.runners.direct.repackaged.com.google.common.collect.HashMultimap.put(HashMultimap.java:49)
        at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
        at 
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:198)
        at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:352)
        at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:553)
        at 
ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter.processElement(FrontendPipeline.java:181)
        at 
ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter$auxiliary$sxgOpc6N.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:199)
        at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:161)
        at 
org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:111)
        at 
org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
        at 
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:134)
        at 
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
        at 
org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
        at 
org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    
    Best,
    Tobias
    
    

Reply via email to