Thanks Kenn,
Just curious, do you think there might be any approach that would let the 
validation occur at compile time?


---
Wesley Tanaka
http://wtanaka.com/

On Sunday, April 2, 2017, 5:18:26 AM HST, Kenneth Knowles <[email protected]> 
wrote:Hi Wesley,

On Apr 2, 2017 14:56, "Wesley Tanaka" <[email protected]> wrote:

I'm trying to understand the State API (in 0.6.0/Java). I started with 
https://s.apache.org/ presenting-a-new-dofn in order to understand the syntax, 
but am still not understanding something conceptually.  This may be related to 
me learning Beam before Flink/Dataflow/Apex.
Does the long term vision of Beam model have this technical contract as a part 
of its semantics:
"A DoFn which uses state API MUST have an input type of KV<K,V>"

Yes, this is required. The state is partitioned by key and window, so without a 
key we wouldn't have a well-defined partitioning.
You are correct that adding a key like "hello" to every value in a collection 
would suffice, but this is generally not a good idea for exactly the reason you 
surmised. (This is also why we don't support state without a key. Technically 
parallelism of stateful processing is also provided per window, but today no 
runner implements this in parallel)
Stateful computation occurs sequentially by definition - whatever computation 
reads a value that was previously written happens strictly afterwards. So by 
putting one key throughout your collection, you eliminate parallelism. 
Sometimes this could be OK for special places in your pipeline, but for big 
data it is not going to work.
The particular error you encountered should instead be clear and actionable, 
rather than a NullPointerException. I will follow up with a JIRA issue.
Kenn

  (if so, does Beam put further requirements upon the K type, e.g. does it need 
to implement hashCode or equals in particular ways, or require that the 
serialized bytes of the instances of K are equal if and only if the instances 
of K should share the same state cell)
In testValueStateSimple in https://github.com/apache/ beam/blob/ 
e31ca8b0d05e47c2588d5db29c92ba c49aa410da/sdks/java/core/src/ 
test/java/org/apache/beam/sdk/ transforms/ParDoTest.java# L1615 if I change the 
DoFn signature:

FROM: DoFn<KV<String, Integer>, Integer>TO: DoFn<String, Integer>
Then I start getting this error, which is confusing me.  Is this ultimately 
caused because the above technical contract is actually required but not 
enforced in some kind of validation, or is this something else silly that I'm 
doing wrong?  :)

java.lang. NullPointerException: Outputs for non-root node Nl/ParDo(Anonymous)/ 
ParMultiDo(Anonymous) are null
 at org.apache.beam.sdk. repackaged.com.google.common. base.Preconditions. 
checkNotNull(Preconditions. java:864)
 at org.apache.beam.sdk.runners. TransformHierarchy$Node.visit( 
TransformHierarchy.java:490)
 at org.apache.beam.sdk.runners. TransformHierarchy$Node.visit( 
TransformHierarchy.java:481)
 at org.apache.beam.sdk.runners. TransformHierarchy$Node.visit( 
TransformHierarchy.java:481)
 at org.apache.beam.sdk.runners. TransformHierarchy$Node.visit( 
TransformHierarchy.java:481)
 at org.apache.beam.sdk.runners. TransformHierarchy$Node. 
access$400(TransformHierarchy. java:231)
 at org.apache.beam.sdk.runners. TransformHierarchy.visit( 
TransformHierarchy.java:206)
 at org.apache.beam.sdk.Pipeline. traverseTopologically( Pipeline.java:321)
 at org.apache.beam.sdk.testing. TestPipeline$ PipelineAbandonedNodeEnforceme 
nt.recordPipelineNodes( TestPipeline.java:166)
 at org.apache.beam.sdk.testing. TestPipeline$ PipelineAbandonedNodeEnforceme 
nt.afterPipelineExecution( TestPipeline.java:200)
 at org.apache.beam.sdk.testing. TestPipeline.run(TestPipeline. java:314)

Finally, it seems like it would be possible to add state API to the processing 
of any arbitrary non-KV PCollection by simply tacking on the string "hello" 
like in the unit tests to every value using WithKeys.  I suspect the answer 
will probably depend on the runner, but is there a general intuition that I 
could gain for what bad thing will happen if I do this, e.g. will the stateful 
ParDo be stuck running within a single machine, or will we run some lower layer 
out of memory, or will we make the network traffic between cluster nodes much 
more chatty and synchronized?



---
Wesley Tanaka
http://wtanaka.com/

Reply via email to