I'm trying to understand the State API (in 0.6.0/Java). I started with https://s.apache.org/presenting-a-new-dofn in order to understand the syntax, but am still not understanding something conceptually. This may be related to me learning Beam before Flink/Dataflow/Apex. Does the long term vision of Beam model have this technical contract as a part of its semantics: "A DoFn which uses state API MUST have an input type of KV<K,V>" (if so, does Beam put further requirements upon the K type, e.g. does it need to implement hashCode or equals in particular ways, or require that the serialized bytes of the instances of K are equal if and only if the instances of K should share the same state cell) In testValueStateSimple in https://github.com/apache/beam/blob/e31ca8b0d05e47c2588d5db29c92bac49aa410da/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L1615 if I change the DoFn signature:
FROM: DoFn<KV<String, Integer>, Integer>TO: DoFn<String, Integer> Then I start getting this error, which is confusing me. Is this ultimately caused because the above technical contract is actually required but not enforced in some kind of validation, or is this something else silly that I'm doing wrong? :) java.lang.NullPointerException: Outputs for non-root node Nl/ParDo(Anonymous)/ParMultiDo(Anonymous) are null at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:490) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$400(TransformHierarchy.java:231) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:206) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:321) at org.apache.beam.sdk.testing.TestPipeline$PipelineAbandonedNodeEnforcement.recordPipelineNodes(TestPipeline.java:166) at org.apache.beam.sdk.testing.TestPipeline$PipelineAbandonedNodeEnforcement.afterPipelineExecution(TestPipeline.java:200) at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:314) Finally, it seems like it would be possible to add state API to the processing of any arbitrary non-KV PCollection by simply tacking on the string "hello" like in the unit tests to every value using WithKeys. I suspect the answer will probably depend on the runner, but is there a general intuition that I could gain for what bad thing will happen if I do this, e.g. will the stateful ParDo be stuck running within a single machine, or will we run some lower layer out of memory, or will we make the network traffic between cluster nodes much more chatty and synchronized? --- Wesley Tanaka http://wtanaka.com/
