[beam] branch master updated: [website] Correct date in release blog post

2019-06-10 Thread goenka
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6cb1fd5  [website] Correct date in release blog post
 new bd81d3f  Merge pull request #8806 from mxm/release-blog-post
6cb1fd5 is described below

commit 6cb1fd5ff67c12bdc8c7632d346066a46214877a
Author: Maximilian Michels 
AuthorDate: Mon Jun 10 11:47:59 2019 +0200

[website] Correct date in release blog post

This adds the correct release date but keeps the existing URL to not break 
any
links.
---
 website/src/_posts/2019-05-22-beam-2.13.0.md | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/website/src/_posts/2019-05-22-beam-2.13.0.md 
b/website/src/_posts/2019-05-22-beam-2.13.0.md
index 3017f12..c81aa2d 100644
--- a/website/src/_posts/2019-05-22-beam-2.13.0.md
+++ b/website/src/_posts/2019-05-22-beam-2.13.0.md
@@ -1,7 +1,9 @@
 ---
 layout: post
 title:  "Apache Beam 2.13.0"
-date:   2019-05-22 00:00:01 -0800
+date:   2019-06-07 00:00:01 -0800
+# Date above corrected but keep the old URL:
+permalink: /blog/2019/05/22/beam-2.13.0.html
 excerpt_separator: 
 categories: blog
 authors:



[beam] branch master updated: [BEAM-6693] replace mmh3 with default hash function (#8799)

2019-06-10 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new a8d0094  [BEAM-6693] replace mmh3 with default hash function (#8799)
a8d0094 is described below

commit a8d0094d9f0cf13516956c7aa9dcc61fce9d6015
Author: Hannah Jiang 
AuthorDate: Mon Jun 10 14:38:39 2019 -0700

[BEAM-6693] replace mmh3 with default hash function (#8799)

* [BEAM-6693] change version upper bound of mmh3 to <3.0.0

* [BEAM-6693 replace mmh3 with default hash function]

* BEAM-6693 skip some tests with py27
---
 sdks/python/apache_beam/transforms/stats.py  |  4 +-
 sdks/python/apache_beam/transforms/stats_test.py | 55 +---
 sdks/python/setup.py |  1 -
 3 files changed, 11 insertions(+), 49 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/stats.py 
b/sdks/python/apache_beam/transforms/stats.py
index 59c1df5..79158f3 100644
--- a/sdks/python/apache_beam/transforms/stats.py
+++ b/sdks/python/apache_beam/transforms/stats.py
@@ -25,8 +25,6 @@ import math
 import sys
 from builtins import round
 
-import mmh3
-
 from apache_beam import coders
 from apache_beam import typehints
 from apache_beam.transforms.core import *
@@ -214,7 +212,7 @@ class ApproximateUniqueCombineFn(CombineFn):
 
   def add_input(self, accumulator, element, *args, **kwargs):
 try:
-  accumulator.add(mmh3.hash64(self._coder.encode(element))[1])
+  accumulator.add(hash(self._coder.encode(element)))
   return accumulator
 except Exception as e:
   raise RuntimeError("Runtime exception: %s", e)
diff --git a/sdks/python/apache_beam/transforms/stats_test.py 
b/sdks/python/apache_beam/transforms/stats_test.py
index 0a6003a..d8760a8 100644
--- a/sdks/python/apache_beam/transforms/stats_test.py
+++ b/sdks/python/apache_beam/transforms/stats_test.py
@@ -21,6 +21,7 @@ from __future__ import division
 
 import math
 import random
+import sys
 import unittest
 from collections import defaultdict
 
@@ -151,11 +152,14 @@ class ApproximateUniqueTest(unittest.TestCase):
 assert beam.ApproximateUnique._get_sample_size_from_est_error(0.05) == 1600
 assert beam.ApproximateUnique._get_sample_size_from_est_error(0.01) == 
4
 
+  @unittest.skipIf(sys.version_info < (3, 0, 0),
+   'Skip with py27 because hash function is not good enough.')
   def test_approximate_unique_global_by_sample_size(self):
 # test if estimation error with a given sample size is not greater than
 # expected max error (sample size = 50% of population).
 sample_size = 50
 max_err = 2 / math.sqrt(sample_size)
+random.seed(1)
 test_input = [random.randint(0, 1000) for _ in range(100)]
 actual_count = len(set(test_input))
 
@@ -210,30 +214,12 @@ class ApproximateUniqueTest(unittest.TestCase):
 label='assert:global_by_sample_size_with_small_population')
 pipeline.run()
 
-  def test_approximate_unique_global_by_sample_size_with_big_population(self):
-# test if estimation error is smaller than expected max error with a small
-# sample and a big population (sample size = 1% of population).
-sample_size = 100
-max_err = 2 / math.sqrt(sample_size)
-test_input = [random.randint(0, 1000) for _ in range(1)]
-actual_count = len(set(test_input))
-
-pipeline = TestPipeline()
-result = (pipeline
-  | 'create' >> beam.Create(test_input)
-  | 'get_estimate'
-  >> beam.ApproximateUnique.Globally(size=sample_size)
-  | 'compare'
-  >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
- / actual_count <= max_err]))
-
-assert_that(result, equal_to([True]),
-label='assert:global_by_sample_size_with_big_population')
-pipeline.run()
-
+  @unittest.skipIf(sys.version_info < (3, 0, 0),
+   'Skip with py27 because hash function is not good enough.')
   def test_approximate_unique_global_by_error(self):
 # test if estimation error from input error is not greater than input 
error.
 est_err = 0.3
+random.seed(1)
 test_input = [random.randint(0, 1000) for _ in range(100)]
 actual_count = len(set(test_input))
 
@@ -246,11 +232,10 @@ class ApproximateUniqueTest(unittest.TestCase):
   >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
  / actual_count <= est_err]))
 
-assert_that(result, equal_to([True]),
-label='assert:global_by_error')
+assert_that(result, equal_to([True]), label='assert:global_by_error')
 pipeline.run()
 
-  def test_approximate_unique_global_by_error_with_samll_population(self):
+  def test_approximate_unique_global_by_error_with_small_population(self):
 # test if 

[beam] branch master updated (f16b689 -> b7b0868)

2019-06-10 Thread anton
This is an automated email from the ASF dual-hosted git repository.

anton pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from f16b689  Merge pull request #8810: [BEAM-7524] Update Python 
dependencies page for 2.13.0
 new 3e0b352  [BEAM-7511] Fixes the bug in KafkaTable Initialization.
 new 550c1cc  [BEAM-7511] Fixes the bug in KafkaTable Initialization, 
returned the deleted methods back.
 new 93d9334  [BEAM-7511] Fixes the bug in KafkaTable Initialization, 
returned the deleted methods back.
 new b7b0868  Merge pull request #8797 from riazela/master

The 21882 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java | 2 ++
 1 file changed, 2 insertions(+)



[beam] branch master updated (b6cb4f4 -> f16b689)

2019-06-10 Thread melap
This is an automated email from the ASF dual-hosted git repository.

melap pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from b6cb4f4  fix minor typo in BatchLoads.java
 add d445287  Update Python dependencies page for 2.13.0
 new f16b689  Merge pull request #8810: [BEAM-7524] Update Python 
dependencies page for 2.13.0

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/documentation/sdks/python-dependencies.md  | 39 ++
 1 file changed, 39 insertions(+)



[beam] 01/01: Merge pull request #8810: [BEAM-7524] Update Python dependencies page for 2.13.0

2019-06-10 Thread melap
This is an automated email from the ASF dual-hosted git repository.

melap pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f16b68945ce66c708e8f03942899a57005f364aa
Merge: b6cb4f4 d445287
Author: Melissa Pashniak 
AuthorDate: Mon Jun 10 13:49:51 2019 -0700

Merge pull request #8810: [BEAM-7524] Update Python dependencies page for 
2.13.0

 .../src/documentation/sdks/python-dependencies.md  | 39 ++
 1 file changed, 39 insertions(+)



[beam] branch beam7462 deleted (was b6cb4f4)

2019-06-10 Thread lcwik
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a change to branch beam7462
in repository https://gitbox.apache.org/repos/asf/beam.git.


 was b6cb4f4  fix minor typo in BatchLoads.java

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



[beam] branch beam7462 created (now b6cb4f4)

2019-06-10 Thread lcwik
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a change to branch beam7462
in repository https://gitbox.apache.org/repos/asf/beam.git.


  at b6cb4f4  fix minor typo in BatchLoads.java

No new revisions were added by this update.



[beam] branch master updated (f42d92e -> b6cb4f4)

2019-06-10 Thread lcwik
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from f42d92e  Adding Python samples to the Stateful Processing post. (#8363)
 add d6301f59 fix minor typo
 add b6cb4f4  fix minor typo in BatchLoads.java

No new revisions were added by this update.

Summary of changes:
 .../src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[beam] branch master updated: Adding Python samples to the Stateful Processing post. (#8363)

2019-06-10 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new f42d92e  Adding Python samples to the Stateful Processing post. (#8363)
f42d92e is described below

commit f42d92ea5cc51ed56b29df5e872fcb374f4f263e
Author: Pablo 
AuthorDate: Mon Jun 10 10:04:51 2019 -0700

Adding Python samples to the Stateful Processing post. (#8363)

* Adding Python code to the Stateful Processing post.

* Fixing indentation and removing erroneous comment.

* Addressing comments

* Improving samples to match CombiningValueState

* Update userstate_test.py

* Update 2017-02-13-stateful-processing.md

* Addressing some comments

* s/stage/step

* removing prose changes
---
 sdks/python/apache_beam/transforms/trigger.py  |  13 ++-
 .../apache_beam/transforms/userstate_test.py   |   9 +-
 .../src/_posts/2017-02-13-stateful-processing.md   | 110 +
 3 files changed, 99 insertions(+), 33 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/trigger.py 
b/sdks/python/apache_beam/transforms/trigger.py
index ddf4c24..f0d2a1c 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -67,8 +67,7 @@ __all__ = [
 
 
 class AccumulationMode(object):
-  """Controls what to do with data when a trigger fires multiple times.
-  """
+  """Controls what to do with data when a trigger fires multiple times."""
   DISCARDING = beam_runner_api_pb2.AccumulationMode.DISCARDING
   ACCUMULATING = beam_runner_api_pb2.AccumulationMode.ACCUMULATING
   # TODO(robertwb): Provide retractions of previous outputs.
@@ -78,10 +77,7 @@ class AccumulationMode(object):
 class _StateTag(with_metaclass(ABCMeta, object)):
   """An identifier used to store and retrieve typed, combinable state.
 
-  The given tag must be unique for this stage.  If CombineFn is None then
-  all elements will be returned as a list, otherwise the given CombineFn
-  will be applied (possibly incrementally and eagerly) when adding elements.
-  """
+  The given tag must be unique for this step."""
 
   def __init__(self, tag):
 self.tag = tag
@@ -98,7 +94,10 @@ class _ValueStateTag(_StateTag):
 
 
 class _CombiningValueStateTag(_StateTag):
-  """StateTag pointing to an element, accumulated with a combiner."""
+  """StateTag pointing to an element, accumulated with a combiner.
+
+  The given tag must be unique for this step. The given CombineFn will be
+  applied (possibly incrementally and eagerly) when adding elements."""
 
   # TODO(robertwb): Also store the coder (perhaps extracted from the 
combine_fn)
   def __init__(self, tag, combine_fn):
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py 
b/sdks/python/apache_beam/transforms/userstate_test.py
index 53a7e36..7a05c73 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -594,14 +594,13 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
 
   def test_index_assignment(self):
 class IndexAssigningStatefulDoFn(DoFn):
-  INDEX_STATE = BagStateSpec('index', VarIntCoder())
+  INDEX_STATE = CombiningValueStateSpec('index', sum)
 
   def process(self, element, state=DoFn.StateParam(INDEX_STATE)):
 unused_key, value = element
-next_index, = list(state.read()) or [0]
-yield (value, next_index)
-state.clear()
-state.add(next_index + 1)
+current_index = state.read()
+yield (value, current_index)
+state.add(1)
 
 with TestPipeline() as p:
   test_stream = (TestStream()
diff --git a/website/src/_posts/2017-02-13-stateful-processing.md 
b/website/src/_posts/2017-02-13-stateful-processing.md
index f51b307..a778dde 100644
--- a/website/src/_posts/2017-02-13-stateful-processing.md
+++ b/website/src/_posts/2017-02-13-stateful-processing.md
@@ -28,6 +28,9 @@ efficiencies. In this post, I will guide you through stateful 
processing in
 Beam: how it works, how it fits in with the other features of the Beam model,
 what you might use it for, and what it looks like in code.
 
+**Note: This post has been updated in May of 2019, to include Python
+snippets!**
+
 
 
 > **Warning: new features ahead!**: This is a very new aspect of the Beam
@@ -278,7 +281,7 @@ new DoFn, KV>>() {
 
   // A state cell holding a single Integer per key+window
   @StateId("index")
-  private final StateSpec> indexSpec = 
+  private final StateSpec> indexSpec =
   StateSpecs.value(VarIntCoder.of());
 
   @ProcessElement
@@ -293,8 +296,14 @@ new DoFn, KV>>() {
 ```
 
 ```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Watch this space!
+class IndexAssigningStatefulDoFn(DoFn):
+  INDEX_STATE = CombiningStateSpec('index', sum)
+
+ 

[beam] branch master updated (58634bc -> 0eec015)

2019-06-10 Thread chamikara
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 58634bc  [BEAM-6620] Remove references to DEFAULT_SHADOW_CLOSURE since 
it does nothing now.
 add b7c2512  Fixes the version number for concat compressed files fix
 new 0eec015  Merge pull request #8784:  [BEAM-6952]Fixes the version 
number for concat compressed files fix

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/io/filesystem_test.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[beam] 01/01: Merge pull request #8784: [BEAM-6952]Fixes the version number for concat compressed files fix

2019-06-10 Thread chamikara
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0eec0153906dd66bca697a29f9b9937aa4cb82ff
Merge: 58634bc b7c2512
Author: Chamikara Jayalath 
AuthorDate: Mon Jun 10 09:09:43 2019 -0700

Merge pull request #8784:  [BEAM-6952]Fixes the version number for concat 
compressed files fix

 sdks/python/apache_beam/io/filesystem_test.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[beam] branch master updated (ce63125 -> 58634bc)

2019-06-10 Thread lcwik
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from ce63125  BEAM-7475: Update programming guide with stateful and timer 
example (#8740)
 add 25a6874  [BEAM-6620] Remove references to DEFAULT_SHADOW_CLOSURE since 
it does nothing now.
 add 58634bc  [BEAM-6620] Remove references to DEFAULT_SHADOW_CLOSURE since 
it does nothing now.

No new revisions were added by this update.

Summary of changes:
 .../org/apache/beam/gradle/BeamModulePlugin.groovy  | 17 -
 runners/direct-java/build.gradle|  2 +-
 .../worker/legacy-worker/build.gradle   |  2 +-
 sdks/java/core/build.gradle |  2 +-
 sdks/java/extensions/kryo/build.gradle  |  2 +-
 sdks/java/extensions/sql/build.gradle   |  2 +-
 sdks/java/harness/build.gradle  |  2 +-
 vendor/sdks-java-extensions-protobuf/build.gradle   |  2 +-
 8 files changed, 7 insertions(+), 24 deletions(-)



[beam] 01/01: Merge pull request #8787: [release-2.7.1] Flink backports for LTS release

2019-06-10 Thread mxm
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.7.1
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 298e54bfeedb45a0ced6fd4a32d80bdbb94e1774
Merge: a882a28 6df381a
Author: Maximilian Michels 
AuthorDate: Mon Jun 10 13:42:28 2019 +0200

Merge pull request #8787: [release-2.7.1] Flink backports for LTS release

 .../beam/runners/core/StateInternalsTest.java  |  92 +-
 runners/flink/build.gradle |   2 +-
 .../runners/flink/DefaultParallelismFactory.java   |  39 ---
 .../runners/flink/FlinkExecutionEnvironments.java  |  76 -
 .../apache/beam/runners/flink/FlinkJobInvoker.java |   5 +-
 .../flink/FlinkPipelineExecutionEnvironment.java   |  10 +
 .../beam/runners/flink/FlinkPipelineOptions.java   |  11 +-
 .../org/apache/beam/runners/flink/FlinkRunner.java |   5 -
 .../flink/PipelineTranslationOptimizer.java|  16 +
 .../translation/functions/FlinkDoFnFunction.java   |   7 +-
 .../functions/FlinkStatefulDoFnFunction.java   |   7 +-
 .../flink/translation/utils/FlinkClassloading.java |  30 ++
 .../wrappers/streaming/DoFnOperator.java   |  45 ++-
 .../streaming/io/UnboundedSourceWrapper.java   |  85 +++--
 .../state/FlinkBroadcastStateInternals.java|  12 +-
 .../streaming/state/FlinkStateInternals.java   | 353 -
 .../flink/FlinkExecutionEnvironmentsTest.java  | 162 ++
 .../FlinkPipelineExecutionEnvironmentTest.java |  44 +++
 .../beam/runners/flink/PipelineOptionsTest.java|  26 ++
 .../flink/streaming/BoundedSourceRestoreTest.java  |   1 +
 .../runners/flink/streaming/DoFnOperatorTest.java  |  49 ++-
 .../streaming/FlinkKeyGroupStateInternalsTest.java |   8 +
 .../streaming/FlinkSplitStateInternalsTest.java|   8 +
 .../flink/streaming/FlinkStateInternalsTest.java   | 108 ++-
 .../flink/streaming/GroupByNullKeyTest.java|   2 +
 .../wrappers/streaming/io}/TestCountingSource.java |  15 +-
 .../streaming/io}/UnboundedSourceWrapperTest.java  | 207 +---
 runners/flink/src/test/resources/flink-conf.yaml   |  19 ++
 .../spark/stateful/SparkStateInternals.java|   3 +-
 29 files changed, 1120 insertions(+), 327 deletions(-)



[beam] branch release-2.7.1 updated (a882a28 -> 298e54b)

2019-06-10 Thread mxm
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a change to branch release-2.7.1
in repository https://gitbox.apache.org/repos/asf/beam.git.


from a882a28  Merge pull request #6818: Fix errorprone to version 2.3.1 on 
release-2.7.0 branch
 add 6d04808  Fix issues related to shutdown logic of UnboundedSourceWrapper
 add 7a66768  [BEAM-6460] Remove cached class references upon start/shutdown
 add a8f9ab6  [BEAM-6533] Let UnboundedSourceWrapper parallel source index 
start with 1
 add 793a32d  Upgrade to Flink 1.5.6
 add 3ec4e76  [BEAM-4520] Warn when UnboundedSources are used without 
checkpointing
 add 74db08e  [BEAM-3863] Ensure correct firing of processing time timers
 add 15658ea  [BEAM-6929] Prevent NullPointerException in Flink's 
CombiningState
 add bca9c6c  [BEAM-3089] Fix default values in FlinkPipelineOptions / Add 
tests
 add 7284956  Replace deprecated StateTag.StateBinder in 
FlinkStateInternals  (#6754)
 add 6df381a  Merge pull request #7849: [BEAM-6678] Persist watermark holds 
view in Flink checkpoints
 new 298e54b  Merge pull request #8787: [release-2.7.1] Flink backports for 
LTS release

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/runners/core/StateInternalsTest.java  |  92 +-
 runners/flink/build.gradle |   2 +-
 .../runners/flink/DefaultParallelismFactory.java   |  39 ---
 .../runners/flink/FlinkExecutionEnvironments.java  |  76 -
 .../apache/beam/runners/flink/FlinkJobInvoker.java |   5 +-
 .../flink/FlinkPipelineExecutionEnvironment.java   |  10 +
 .../beam/runners/flink/FlinkPipelineOptions.java   |  11 +-
 .../org/apache/beam/runners/flink/FlinkRunner.java |   5 -
 .../flink/PipelineTranslationOptimizer.java|  16 +
 .../translation/functions/FlinkDoFnFunction.java   |   7 +-
 .../functions/FlinkStatefulDoFnFunction.java   |   7 +-
 .../{package-info.java => FlinkClassloading.java}  |  14 +-
 .../wrappers/streaming/DoFnOperator.java   |  45 ++-
 .../streaming/io/UnboundedSourceWrapper.java   |  85 +++--
 .../state/FlinkBroadcastStateInternals.java|  12 +-
 .../streaming/state/FlinkStateInternals.java   | 353 -
 .../flink/FlinkExecutionEnvironmentsTest.java  | 162 ++
 .../FlinkPipelineExecutionEnvironmentTest.java |  44 +++
 .../beam/runners/flink/PipelineOptionsTest.java|  26 ++
 .../flink/streaming/BoundedSourceRestoreTest.java  |   1 +
 .../runners/flink/streaming/DoFnOperatorTest.java  |  49 ++-
 .../streaming/FlinkKeyGroupStateInternalsTest.java |   8 +
 .../streaming/FlinkSplitStateInternalsTest.java|   8 +
 .../flink/streaming/FlinkStateInternalsTest.java   | 108 ++-
 .../flink/streaming/GroupByNullKeyTest.java|   2 +
 .../wrappers/streaming/io}/TestCountingSource.java |  15 +-
 .../streaming/io}/UnboundedSourceWrapperTest.java  | 207 +---
 runners/flink/src/test/resources/flink-conf.yaml   |  19 ++
 .../spark/stateful/SparkStateInternals.java|   3 +-
 29 files changed, 1102 insertions(+), 329 deletions(-)
 delete mode 100644 
runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
 copy 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/{package-info.java
 => FlinkClassloading.java} (70%)
 create mode 100644 
runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
 rename runners/flink/src/test/java/org/apache/beam/runners/flink/{streaming => 
translation/wrappers/streaming/io}/TestCountingSource.java (95%)
 rename runners/flink/src/test/java/org/apache/beam/runners/flink/{streaming => 
translation/wrappers/streaming/io}/UnboundedSourceWrapperTest.java (73%)
 create mode 100644 runners/flink/src/test/resources/flink-conf.yaml