[jira] (BEAM-843) Use New DoFn Directly in Flink Runner

2017-01-30 Thread Aljoscha Krettek (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aljoscha Krettek assigned an issue to Jingsong Lee 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-843 
 
 
 
  Use New DoFn Directly in Flink Runner  
 
 
 
 
 
 
 
 
 

Change By:
 
 Aljoscha Krettek 
 
 
 

Assignee:
 
 Jingsong Lee 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-843) Use New DoFn Directly in Flink Runner

2017-01-30 Thread Aljoscha Krettek (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aljoscha Krettek commented on  BEAM-843 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Use New DoFn Directly in Flink Runner  
 
 
 
 
 
 
 
 
 
 
Jingsong Lee: I assigned the issue to you and am now merging your PR. In the future it would be good to comment on an issue before you start implementing to avoid duplicate work.  
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-01-30 Thread Aljoscha Krettek (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aljoscha Krettek commented on  BEAM-1346 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Drop Late Data in ReduceFnRunner  
 
 
 
 
 
 
 
 
 
 
Kenneth Knowles, this is related to 

BEAM-241
. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-01-30 Thread Aljoscha Krettek (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aljoscha Krettek created an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-1346 
 
 
 
  Drop Late Data in ReduceFnRunner  
 
 
 
 
 
 
 
 
 

Issue Type:
 
  Bug 
 
 
 

Affects Versions:
 

 0.5.0 
 
 
 

Assignee:
 
 Kenneth Knowles 
 
 
 

Components:
 

 runner-core 
 
 
 

Created:
 

 30/Jan/17 11:39 
 
 
 

Priority:
 
  Major 
 
 
 

Reporter:
 
 Aljoscha Krettek 
 
 
 
 
 
 
 
 
 
 
I think these two commits recently broke late-data dropping for the Flink Runner (and maybe for other runners as well): 
 

https://github.com/apache/beam/commit/2b26ec8
 

https://github.com/apache/beam/commit/8989473
 
 
It boils down to the LateDataDroppingDoFnRunner not being used anymore because DoFnRunners.lateDataDroppingRunner() is not called anymore when a DoFn is a ReduceFnExecutor (because that interface was removed). 
Maybe we should think about dropping late data in another place, my suggestion is ReduceFnRunner but that's open for discussion. 
 
 
   

[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-01-30 Thread Aljoscha Krettek (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aljoscha Krettek assigned an issue to Unassigned 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-1346 
 
 
 
  Drop Late Data in ReduceFnRunner  
 
 
 
 
 
 
 
 
 

Change By:
 
 Aljoscha Krettek 
 
 
 

Assignee:
 
 Kenneth Knowles 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[2/2] beam git commit: This closes #1787

2017-01-30 Thread aljoscha
This closes #1787


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/582c4a8a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/582c4a8a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/582c4a8a

Branch: refs/heads/master
Commit: 582c4a8a4dd33d698beae2990d682848b593de21
Parents: 34b4a6d 1dcda72
Author: Aljoscha Krettek 
Authored: Mon Jan 30 13:48:19 2017 +0100
Committer: Aljoscha Krettek 
Committed: Mon Jan 30 13:48:19 2017 +0100

--
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   | 156 +++
 .../wrappers/streaming/DoFnOperator.java|  69 
 .../wrappers/streaming/WindowDoFnOperator.java  | 143 +
 3 files changed, 264 insertions(+), 104 deletions(-)
--




[jira] (BEAM-843) Use New DoFn Directly in Flink Runner

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  BEAM-843 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Use New DoFn Directly in Flink Runner  
 
 
 
 
 
 
 
 
 
 
Github user asfgit closed the pull request at: 
 https://github.com/apache/beam/pull/1787 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[1/2] beam git commit: [BEAM-843] Use New DoFn Directly in Flink Runner

2017-01-30 Thread aljoscha
Repository: beam
Updated Branches:
  refs/heads/master 34b4a6d9d -> 582c4a8a4


[BEAM-843] Use New DoFn Directly in Flink Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1dcda726
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1dcda726
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1dcda726

Branch: refs/heads/master
Commit: 1dcda72686fa0a9a6e2033939aa53f7a7a31d548
Parents: 34b4a6d
Author: JingsongLi 
Authored: Wed Jan 18 11:34:06 2017 +0800
Committer: Aljoscha Krettek 
Committed: Mon Jan 30 12:04:09 2017 +0100

--
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   | 156 +++
 .../wrappers/streaming/DoFnOperator.java|  69 
 .../wrappers/streaming/WindowDoFnOperator.java  | 143 +
 3 files changed, 264 insertions(+), 104 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/1dcda726/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
new file mode 100644
index 000..cff6e00
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.Collection;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic 
to the
+ * {@link ReduceFnRunner}.
+ */
+@SystemDoFnInternal
+public class GroupAlsoByWindowViaWindowSetNewDoFn<
+K, InputT, OutputT, W extends BoundedWindow, RinT extends 
KeyedWorkItem>
+extends DoFn> {
+
+  private static final long serialVersionUID = 1L;
+
+  public static 
+  DoFn, KV> create(
+  WindowingStrategy strategy,
+  StateInternalsFactory stateInternalsFactory,
+  TimerInternalsFactory timerInternalsFactory,
+  SideInputReader sideInputReader,
+  SystemReduceFn reduceFn,
+  DoFnRunners.OutputManager outputManager,
+  TupleTag> mainTag) {
+return new GroupAlsoByWindowViaWindowSetNewDoFn<>(
+strategy, stateInternalsFactory, timerInternalsFactory, 
sideInputReader,
+reduceFn, outputManager, mainTag);
+  }
+
+  protected final Aggregator droppedDueToClosedWindow =
+  createAggregator(
+  GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, 
Sum.ofLongs());
+  protected final Aggregator droppedDueToLateness =
+  createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, 
Sum.ofLongs());
+  private final WindowingStrategy windowingStrategy;
+  private SystemReduceFn reduceFn;
+  private transient StateInternalsFactory stateInternalsFactory;
+  private transient TimerInternalsFactory timerInternalsFactory;
+  private transient SideInput

[GitHub] beam pull request #1787: [BEAM-843] Flink DoFnOpeartor use new DoFn

2017-01-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/1787


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] (BEAM-843) Use New DoFn Directly in Flink Runner

2017-01-30 Thread Aljoscha Krettek (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aljoscha Krettek closed an issue as Fixed 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-843 
 
 
 
  Use New DoFn Directly in Flink Runner  
 
 
 
 
 
 
 
 
 

Change By:
 
 Aljoscha Krettek 
 
 
 

Resolution:
 
 Fixed 
 
 
 

Fix Version/s:
 
 0.5.0 
 
 
 

Status:
 
 Open Closed 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-1335) ValueState could use an initial/default value

2017-01-30 Thread Aljoscha Krettek (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Aljoscha Krettek commented on  BEAM-1335 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: ValueState could use an initial/default value  
 
 
 
 
 
 
 
 
 
 
In Flink, we actually have default values for StateDescriptors (roughly the equivalent of StateSpec) but we recently decided to deprecate that. My argument was that having a default value complicates the StateSpec because we would have to keep a user object in there and make sure that it is serialized/deserialized using the correct Coder when the StateSpec itself is serialised using Java Serialization. 
Just a data point to consider. I'm not against adding this for Beam if we think it makes sense. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-01-30 Thread Kenneth Knowles (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Kenneth Knowles commented on  BEAM-1346 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Drop Late Data in ReduceFnRunner  
 
 
 
 
 
 
 
 
 
 
Yikes, that is a troubling lack of test coverage that it could go away without being caught. I probably reviewed it  
Previously, because GroupAlsoByWindowViaWindowSets followed the same code path as a user-defined DoFn, the way we would distinguish it was checking for ReduceFnExecutor (technically, I think ReduceFnExecutor only applied if the GABW took KeyedWorkItem as input, which was always true for streaming runners, kind of a random collection of dependencies). 
The right way to fix this IMO is for the flink WindowDoFnOperator to explicitly create a lateDataDroppingRunner() since that code already knows it is using GABW. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-1347) Basic Java harness capable of understanding process bundle tasks and sending data over the Fn Api

2017-01-30 Thread Luke Cwik (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Luke Cwik created an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-1347 
 
 
 
  Basic Java harness capable of understanding process bundle tasks and sending data over the Fn Api  
 
 
 
 
 
 
 
 
 

Issue Type:
 
  Improvement 
 
 
 

Assignee:
 
 Luke Cwik 
 
 
 

Components:
 

 beam-model-fn-api 
 
 
 

Created:
 

 30/Jan/17 16:25 
 
 
 

Priority:
 
  Major 
 
 
 

Reporter:
 
 Luke Cwik 
 
 
 
 
 
 
 
 
 
 
Create a basic Java harness capable of understanding process bundle requests and able to stream data over the Fn Api. 
Overview: https://s.apache.org/beam-fn-api 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

[jira] (BEAM-1348) Model the Fn Api

2017-01-30 Thread Luke Cwik (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Luke Cwik created an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-1348 
 
 
 
  Model the Fn Api  
 
 
 
 
 
 
 
 
 

Issue Type:
 
  Improvement 
 
 
 

Assignee:
 
 Luke Cwik 
 
 
 

Components:
 

 beam-model-fn-api 
 
 
 

Created:
 

 30/Jan/17 16:26 
 
 
 

Priority:
 
  Major 
 
 
 

Reporter:
 
 Luke Cwik 
 
 
 
 
 
 
 
 
 
 
Create a proto representation of the services and data types required to execute the Fn Api. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 

[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner

2017-01-30 Thread Daniel Halperin (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Daniel Halperin commented on  BEAM-1346 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Drop Late Data in ReduceFnRunner  
 
 
 
 
 
 
 
 
 
 
The commits referenced are in the 0.5.0-RC1 candidate – so I'm guessing this should be release blocking. Aljoscha Krettek can you please confirm & add this information to the vote thread?  
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-843) Use New DoFn Directly in Flink Runner

2017-01-30 Thread Daniel Halperin (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Daniel Halperin updated an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-843 
 
 
 
  Use New DoFn Directly in Flink Runner  
 
 
 
 
 
 
 
 
 

Change By:
 
 Daniel Halperin 
 
 
 

Fix Version/s:
 
 0.5.0 
 
 
 

Fix Version/s:
 
 0.6.0 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-843) Use New DoFn Directly in Flink Runner

2017-01-30 Thread Daniel Halperin (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Daniel Halperin commented on  BEAM-843 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Use New DoFn Directly in Flink Runner  
 
 
 
 
 
 
 
 
 
 
Aljoscha Krettek, is this needed for 0.5.0? You marked it as "FV 0.5.0" although if you merged it today, it will not be on the 0.5.0 release branch and so not in that release. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (BEAM-843) Use New DoFn Directly in Flink Runner

2017-01-30 Thread Daniel Halperin (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Daniel Halperin reopened an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
Reopen in order to set fix version 
 
 
 
 
 
 
 
 
 
 Beam /  BEAM-843 
 
 
 
  Use New DoFn Directly in Flink Runner  
 
 
 
 
 
 
 
 
 

Change By:
 
 Daniel Halperin 
 
 
 

Status:
 
 Closed Reopened 
 
 
 

Resolution:
 
 Fixed 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[GitHub] beam-site pull request #135: Use PMC instead of PPMC

2017-01-30 Thread jbonofre
GitHub user jbonofre opened a pull request:

https://github.com/apache/beam-site/pull/135

Use PMC instead of PPMC

Minor change to reflect the graduation: we don't have PPMC anymore, but PMC.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jbonofre/beam-site PPMC_REMOVE

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam-site/pull/135.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #135


commit f9110658c09958012bba1caf7d3e94d89988f548
Author: Jean-Baptiste Onofré 
Date:   2017-01-30T17:02:48Z

Use PMC instead of PPMC




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/3] beam-site git commit: Regenerate website

2017-01-30 Thread davor
Regenerate website


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/38c9a1e6
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/38c9a1e6
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/38c9a1e6

Branch: refs/heads/asf-site
Commit: 38c9a1e6f06e86bb0b8406c7b2688b4598ad29a7
Parents: f911065
Author: Davor Bonaci 
Authored: Mon Jan 30 09:14:01 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 09:14:01 2017 -0800

--
 content/contribute/release-guide/index.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam-site/blob/38c9a1e6/content/contribute/release-guide/index.html
--
diff --git a/content/contribute/release-guide/index.html 
b/content/contribute/release-guide/index.html
index 624df88..5ef3dfd 100644
--- a/content/contribute/release-guide/index.html
+++ b/content/contribute/release-guide/index.html
@@ -580,7 +580,7 @@ The complete staging area is available for your review, 
which includes:
 * source code tag "v1.2.3-RC3" [5],
 * website pull request listing the release and publishing the API reference 
manual [6].
 
-The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PPMC affirmative votes.
+The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.
 
 Thanks,
 Release Manager



[3/3] beam-site git commit: This closes #135

2017-01-30 Thread davor
This closes #135


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/fedd4ab9
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/fedd4ab9
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/fedd4ab9

Branch: refs/heads/asf-site
Commit: fedd4ab9d0294de50f4a47cc91cebda9daf8db75
Parents: f1d931e 38c9a1e
Author: Davor Bonaci 
Authored: Mon Jan 30 09:14:01 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 09:14:01 2017 -0800

--
 content/contribute/release-guide/index.html | 2 +-
 src/contribute/release-guide.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--




[GitHub] beam-site pull request #135: Use PMC instead of PPMC

2017-01-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam-site/pull/135


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/3] beam-site git commit: Use PMC instead of PPMC

2017-01-30 Thread davor
Repository: beam-site
Updated Branches:
  refs/heads/asf-site f1d931ea9 -> fedd4ab9d


Use PMC instead of PPMC


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/f9110658
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/f9110658
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/f9110658

Branch: refs/heads/asf-site
Commit: f9110658c09958012bba1caf7d3e94d89988f548
Parents: f1d931e
Author: Jean-Baptiste Onofré 
Authored: Mon Jan 30 18:02:48 2017 +0100
Committer: Jean-Baptiste Onofré 
Committed: Mon Jan 30 18:02:48 2017 +0100

--
 src/contribute/release-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam-site/blob/f9110658/src/contribute/release-guide.md
--
diff --git a/src/contribute/release-guide.md b/src/contribute/release-guide.md
index bb6a95f..c2fea8f 100644
--- a/src/contribute/release-guide.md
+++ b/src/contribute/release-guide.md
@@ -319,7 +319,7 @@ Start the review-and-vote thread on the dev@ mailing list. 
Here’s an email tem
 * source code tag "v1.2.3-RC3" [5],
 * website pull request listing the release and publishing the API 
reference manual [6].
 
-The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PPMC affirmative votes.
+The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.
 
 Thanks,
 Release Manager



[GitHub] beam pull request #1869: [BEAM-886] Some performance improvements to NewDoFn

2017-01-30 Thread sb2nov
GitHub user sb2nov opened a pull request:

https://github.com/apache/beam/pull/1869

[BEAM-886] Some performance improvements to NewDoFn

- Add types to some of variables for performance
- Do minimal work in the process function by stashing the placeholders to 
be replaced.

R: @robertwb PTAL
CC: @aaltay

---

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sb2nov/incubator-beam 
BEAM-886-Add-NewDoFn-class-1-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1869.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1869


commit 3427ee1da41ec1d42418615c1eac99be7ffe22a7
Author: Sourabh Bajaj 
Date:   2017-01-30T19:16:16Z

Some performance improvements to NewDoFn




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] (BEAM-886) Support new DoFn in Python SDK

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  BEAM-886 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Support new DoFn in Python SDK  
 
 
 
 
 
 
 
 
 
 
GitHub user sb2nov opened a pull request: 
 https://github.com/apache/beam/pull/1869 
 BEAM-886 Some performance improvements to NewDoFn 
 

Add types to some of variables for performance
 

Do minimal work in the process function by stashing the placeholders to be replaced.
 
 
 R: @robertwb PTAL CC: @aaltay 
 — 
 Be sure to do all of the following to help us incorporate your contribution quickly and easily: 
 

[x] Make sure the PR title is formatted like: `BEAM- Description of pull request`
 

[x] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes).
 

[x] Replace `` in the title with the actual Jira issue number, if there is one.
 

[ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt).
 
 
 — 
You can merge this pull request into a Git repository by running: 
 $ git pull https://github.com/sb2nov/incubator-beam BEAM-886-Add-NewDoFn-class-1-1 
Alternatively you can review and apply these changes as the patch at: 
 https://github.com/apache/beam/pull/1869.patch 
To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: 
 This closes #1869 
 
commit 3427ee1da41ec1d42418615c1eac99be7ffe22a7 Author: Sourabh Bajaj  Date: 2017-01-30T19:16:16Z 
 Some performance improvements to NewDoFn 
 
 
 
 
 
 
 
 
 
 
 
 
 

 

[GitHub] beam pull request #1870: Merge master into python-sdk

2017-01-30 Thread aaltay
GitHub user aaltay opened a pull request:

https://github.com/apache/beam/pull/1870

Merge master into python-sdk



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aaltay/incubator-beam merge2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1870.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1870


commit 1dcda72686fa0a9a6e2033939aa53f7a7a31d548
Author: JingsongLi 
Date:   2017-01-18T03:34:06Z

[BEAM-843] Use New DoFn Directly in Flink Runner

commit 582c4a8a4dd33d698beae2990d682848b593de21
Author: Aljoscha Krettek 
Date:   2017-01-30T12:48:19Z

This closes #1787

commit c97dd6cbf05e5670eba2c7180c9b6e059418a2fd
Author: Ahmet Altay 
Date:   2017-01-30T20:14:34Z

Merge remote-tracking branch 'origin/master' into merge2

commit 408d9b6b8e8fe22637f88c7553980b068468b5f8
Author: Ahmet Altay 
Date:   2017-01-30T20:21:28Z

Update the version.py file to match the latest beam version.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/3] beam git commit: [BEAM-843] Use New DoFn Directly in Flink Runner

2017-01-30 Thread davor
[BEAM-843] Use New DoFn Directly in Flink Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4aaaf8fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4aaaf8fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4aaaf8fb

Branch: refs/heads/python-sdk
Commit: 4aaaf8fb94222e6e283fa79d9c7dc8d9a730d278
Parents: 27cf68e
Author: JingsongLi 
Authored: Wed Jan 18 11:34:06 2017 +0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:38:38 2017 -0800

--
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   | 156 +++
 .../wrappers/streaming/DoFnOperator.java|  69 
 .../wrappers/streaming/WindowDoFnOperator.java  | 143 +
 3 files changed, 264 insertions(+), 104 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
new file mode 100644
index 000..cff6e00
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.Collection;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic 
to the
+ * {@link ReduceFnRunner}.
+ */
+@SystemDoFnInternal
+public class GroupAlsoByWindowViaWindowSetNewDoFn<
+K, InputT, OutputT, W extends BoundedWindow, RinT extends 
KeyedWorkItem>
+extends DoFn> {
+
+  private static final long serialVersionUID = 1L;
+
+  public static 
+  DoFn, KV> create(
+  WindowingStrategy strategy,
+  StateInternalsFactory stateInternalsFactory,
+  TimerInternalsFactory timerInternalsFactory,
+  SideInputReader sideInputReader,
+  SystemReduceFn reduceFn,
+  DoFnRunners.OutputManager outputManager,
+  TupleTag> mainTag) {
+return new GroupAlsoByWindowViaWindowSetNewDoFn<>(
+strategy, stateInternalsFactory, timerInternalsFactory, 
sideInputReader,
+reduceFn, outputManager, mainTag);
+  }
+
+  protected final Aggregator droppedDueToClosedWindow =
+  createAggregator(
+  GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, 
Sum.ofLongs());
+  protected final Aggregator droppedDueToLateness =
+  createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, 
Sum.ofLongs());
+  private final WindowingStrategy windowingStrategy;
+  private SystemReduceFn reduceFn;
+  private transient StateInternalsFactory stateInternalsFactory;
+  private transient TimerInternalsFactory timerInternalsFactory;
+  private transient SideInputReader sideInputReader;
+  private transient DoFnRunners.OutputManager outputMan

[3/3] beam git commit: This closes #1870

2017-01-30 Thread davor
This closes #1870


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f29527f6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f29527f6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f29527f6

Branch: refs/heads/python-sdk
Commit: f29527f68b8de92caf18b183e3a7e97eb190f67e
Parents: 27cf68e 38575a1
Author: Davor Bonaci 
Authored: Mon Jan 30 12:38:53 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:38:53 2017 -0800

--
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   | 156 +++
 .../wrappers/streaming/DoFnOperator.java|  69 
 .../wrappers/streaming/WindowDoFnOperator.java  | 143 +
 sdks/python/apache_beam/version.py  |   3 +-
 4 files changed, 265 insertions(+), 106 deletions(-)
--




[1/3] beam git commit: Update the version.py file to match the latest beam version.

2017-01-30 Thread davor
Repository: beam
Updated Branches:
  refs/heads/python-sdk 27cf68ee7 -> f29527f68


Update the version.py file to match the latest beam version.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38575a14
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38575a14
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38575a14

Branch: refs/heads/python-sdk
Commit: 38575a14e2b17c93de2d0e27fe6213daa7101695
Parents: 4aaaf8f
Author: Ahmet Altay 
Authored: Mon Jan 30 12:21:28 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:38:38 2017 -0800

--
 sdks/python/apache_beam/version.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/38575a14/sdks/python/apache_beam/version.py
--
diff --git a/sdks/python/apache_beam/version.py 
b/sdks/python/apache_beam/version.py
index 60d9634..12509fb 100644
--- a/sdks/python/apache_beam/version.py
+++ b/sdks/python/apache_beam/version.py
@@ -21,7 +21,7 @@
 import re
 
 
-__version__ = '0.3.0-incubating.dev'  # TODO: PEP 440 and incubating suffix
+__version__ = '0.6.0.dev'
 
 
 # The following utilities are legacy code from the Maven integration;
@@ -40,7 +40,6 @@ def get_version_from_pom():
 search = pattern.search(pom)
 version = search.group(1)
 version = version.replace("-SNAPSHOT", ".dev")
-# TODO: PEP 440 and incubating suffix
 return version
 
 



[1/2] beam git commit: Updates places in SDK that creates thread pools.

2017-01-30 Thread davor
Repository: beam
Updated Branches:
  refs/heads/python-sdk f29527f68 -> 475707f0f


Updates places in SDK that creates thread pools.

Moves ThreadPool creation to a util function.
Records and resets logging level due to this being reset by  apitools when used 
with a ThreadPool.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51afc1cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51afc1cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51afc1cc

Branch: refs/heads/python-sdk
Commit: 51afc1ccfe78a0657b5f9bc139d1d4e7938ed672
Parents: f29527f
Author: Chamikara Jayalath 
Authored: Sat Jan 28 08:54:33 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:43:37 2017 -0800

--
 sdks/python/apache_beam/internal/util.py  | 33 ++
 sdks/python/apache_beam/io/filebasedsource.py | 17 +++
 sdks/python/apache_beam/io/fileio.py  | 11 ++--
 3 files changed, 40 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/internal/util.py
--
diff --git a/sdks/python/apache_beam/internal/util.py 
b/sdks/python/apache_beam/internal/util.py
index 2d12d49..5b31e88 100644
--- a/sdks/python/apache_beam/internal/util.py
+++ b/sdks/python/apache_beam/internal/util.py
@@ -17,6 +17,11 @@
 
 """Utility functions used throughout the package."""
 
+import logging
+from multiprocessing.pool import ThreadPool
+import threading
+import weakref
+
 
 class ArgumentPlaceholder(object):
   """A place holder object replacing PValues in argument lists.
@@ -92,3 +97,31 @@ def insert_values_in_args(args, kwargs, values):
   (k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v)
   for k, v in sorted(kwargs.iteritems()))
   return (new_args, new_kwargs)
+
+
+def run_using_threadpool(fn_to_execute, inputs, pool_size):
+  """Runs the given function on given inputs using a thread pool.
+
+  Args:
+fn_to_execute: Function to execute
+inputs: Inputs on which given function will be executed in parallel.
+pool_size: Size of thread pool.
+  Returns:
+Results retrieved after executing the given function on given inputs.
+  """
+
+  # ThreadPool crashes in old versions of Python (< 2.7.5) if created
+  # from a child thread. (http://bugs.python.org/issue10015)
+  if not hasattr(threading.current_thread(), '_children'):
+threading.current_thread()._children = weakref.WeakKeyDictionary()
+  pool = ThreadPool(min(pool_size, len(inputs)))
+  try:
+# We record and reset logging level here since 'apitools' library Beam
+# depends on updates the logging level when used with a threadpool -
+# https://github.com/google/apitools/issues/141
+# TODO: Remove this once above issue in 'apitools' is fixed.
+old_level = logging.getLogger().level
+return pool.map(fn_to_execute, inputs)
+  finally:
+pool.terminate()
+logging.getLogger().setLevel(old_level)

http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/io/filebasedsource.py
--
diff --git a/sdks/python/apache_beam/io/filebasedsource.py 
b/sdks/python/apache_beam/io/filebasedsource.py
index 1bfde25..582d673 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -26,11 +26,9 @@ For an example implementation of ``FileBasedSource`` see 
``avroio.AvroSource``.
 """
 
 import random
-import threading
-import weakref
-from multiprocessing.pool import ThreadPool
 
 from apache_beam.internal import pickler
+from apache_beam.internal import util
 from apache_beam.io import concat_source
 from apache_beam.io import fileio
 from apache_beam.io import iobase
@@ -158,16 +156,9 @@ class FileBasedSource(iobase.BoundedSource):
   return [fileio.ChannelFactory.size_in_bytes(file_names[0])]
 else:
   if pattern is None:
-# ThreadPool crashes in old versions of Python (< 2.7.5) if created
-# from a child thread. (http://bugs.python.org/issue10015)
-if not hasattr(threading.current_thread(), '_children'):
-  threading.current_thread()._children = weakref.WeakKeyDictionary()
-pool = ThreadPool(
-min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names)))
-try:
-  return pool.map(fileio.ChannelFactory.size_in_bytes, file_names)
-finally:
-  pool.terminate()
+return util.run_using_threadpool(
+fileio.ChannelFactory.size_in_bytes, file_names,
+MAX_NUM_THREADS_FOR_SIZE_ESTIMATION)
   else:
 file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern,
   

[2/2] beam git commit: This closes #1866

2017-01-30 Thread davor
This closes #1866


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/475707f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/475707f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/475707f0

Branch: refs/heads/python-sdk
Commit: 475707f0ffd7bc82ca78fa3f3c9e78f661478b99
Parents: f29527f 51afc1c
Author: Davor Bonaci 
Authored: Mon Jan 30 12:43:48 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:43:48 2017 -0800

--
 sdks/python/apache_beam/internal/util.py  | 33 ++
 sdks/python/apache_beam/io/filebasedsource.py | 17 +++
 sdks/python/apache_beam/io/fileio.py  | 11 ++--
 3 files changed, 40 insertions(+), 21 deletions(-)
--




[2/2] beam git commit: This closes #1863

2017-01-30 Thread davor
This closes #1863


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1390699c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1390699c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1390699c

Branch: refs/heads/python-sdk
Commit: 1390699c37596ebe34a773627660b6c496375a8e
Parents: 475707f e02ddac
Author: Davor Bonaci 
Authored: Mon Jan 30 12:45:03 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:45:03 2017 -0800

--
 sdks/python/apache_beam/io/bigquery_test.py | 26 
 1 file changed, 18 insertions(+), 8 deletions(-)
--




[1/2] beam git commit: Add mock time to slow bigquery unit tests.

2017-01-30 Thread davor
Repository: beam
Updated Branches:
  refs/heads/python-sdk 475707f0f -> 1390699c3


Add mock time to slow bigquery unit tests.

Unit tests, testing retries does not need to use real time. This change
reduces the total tox time for unit tests from 235 seconds to 73 seconds
locally.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e02ddac3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e02ddac3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e02ddac3

Branch: refs/heads/python-sdk
Commit: e02ddac308b8b1ea0bd0cb0ae4f9ba4908a50595
Parents: 475707f
Author: Ahmet Altay 
Authored: Fri Jan 27 17:35:24 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:44:55 2017 -0800

--
 sdks/python/apache_beam/io/bigquery_test.py | 26 
 1 file changed, 18 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/e02ddac3/sdks/python/apache_beam/io/bigquery_test.py
--
diff --git a/sdks/python/apache_beam/io/bigquery_test.py 
b/sdks/python/apache_beam/io/bigquery_test.py
index b8682d1..14eb035 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -539,7 +539,8 @@ class TestBigQueryReader(unittest.TestCase):
 
 class TestBigQueryWriter(unittest.TestCase):
 
-  def test_no_table_and_create_never(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_no_table_and_create_never(self, patched_time_sleep):
 client = mock.Mock()
 client.tables.Get.side_effect = HttpError(
 response={'status': '404'}, url='', content='')
@@ -572,7 +573,9 @@ class TestBigQueryWriter(unittest.TestCase):
 self.assertTrue(client.tables.Get.called)
 self.assertTrue(client.tables.Insert.called)
 
-  def test_no_table_and_create_if_needed_and_no_schema(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_no_table_and_create_if_needed_and_no_schema(
+  self, patched_time_sleep):
 client = mock.Mock()
 client.tables.Get.side_effect = HttpError(
 response={'status': '404'}, url='', content='')
@@ -587,7 +590,9 @@ class TestBigQueryWriter(unittest.TestCase):
 'Table project:dataset.table requires a schema. None can be inferred '
 'because the table does not exist.')
 
-  def test_table_not_empty_and_write_disposition_empty(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_table_not_empty_and_write_disposition_empty(
+  self, patched_time_sleep):
 client = mock.Mock()
 client.tables.Get.return_value = bigquery.Table(
 tableReference=bigquery.TableReference(
@@ -712,7 +717,8 @@ class TestBigQueryWrapper(unittest.TestCase):
 wrapper._delete_dataset('', '')
 self.assertTrue(client.datasets.Delete.called)
 
-  def test_delete_dataset_retries_fail(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_delete_dataset_retries_fail(self, patched_time_sleep):
 client = mock.Mock()
 client.datasets.Delete.side_effect = ValueError("Cannot delete")
 wrapper = beam.io.bigquery.BigQueryWrapper(client)
@@ -730,7 +736,8 @@ class TestBigQueryWrapper(unittest.TestCase):
 wrapper._delete_table('', '', '')
 self.assertTrue(client.tables.Delete.called)
 
-  def test_delete_table_retries_fail(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_delete_table_retries_fail(self, patched_time_sleep):
 client = mock.Mock()
 client.tables.Delete.side_effect = ValueError("Cannot delete")
 wrapper = beam.io.bigquery.BigQueryWrapper(client)
@@ -738,7 +745,8 @@ class TestBigQueryWrapper(unittest.TestCase):
   wrapper._delete_table('', '', '')
 self.assertTrue(client.tables.Delete.called)
 
-  def test_delete_dataset_retries_for_timeouts(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_delete_dataset_retries_for_timeouts(self, patched_time_sleep):
 client = mock.Mock()
 client.datasets.Delete.side_effect = [
 HttpError(
@@ -749,7 +757,8 @@ class TestBigQueryWrapper(unittest.TestCase):
 wrapper._delete_dataset('', '')
 self.assertTrue(client.datasets.Delete.called)
 
-  def test_delete_table_retries_for_timeouts(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_delete_table_retries_for_timeouts(self, patched_time_sleep):
 client = mock.Mock()
 client.tables.Delete.side_effect = [
 HttpError(
@@ -760,7 +769,8 @@ class TestBigQueryWrapper(unittest.TestCase):
 wrapper._delete_table('', '', '')
 self.assertTrue(client.tables.Delete.called)
 
-  def test_temporary_dataset_is_unique(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_temporary_dataset_is_unique(self, patched_time_sleep):
 client = mock.Mock()
 c

[3/6] beam git commit: A proposal for a portability framework to execute user definable functions.

2017-01-30 Thread lcwik
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
--
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
new file mode 100644
index 000..92042d0
--- /dev/null
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Registers as a consumer for data over the Beam Fn API. Multiplexes any 
received data
+ * to all consumers in the specified output map.
+ *
+ * Can be re-used serially across {@link 
org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s.
+ * For each request, call {@link #registerInputLocation()} to start and call
+ * {@link #blockTillReadFinishes()} to finish.
+ */
+public class BeamFnDataReadRunner {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BeamFnDataReadRunner.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+  private final Collection>> consumers;
+  private final Supplier processBundleInstructionIdSupplier;
+  private final BeamFnDataClient beamFnDataClientFactory;
+  private final Coder> coder;
+  private final BeamFnApi.Target inputTarget;
+
+  private CompletableFuture readFuture;
+
+  public BeamFnDataReadRunner(
+  BeamFnApi.FunctionSpec functionSpec,
+  Supplier processBundleInstructionIdSupplier,
+  BeamFnApi.Target inputTarget,
+  BeamFnApi.Coder coderSpec,
+  BeamFnDataClient beamFnDataClientFactory,
+  Map>>> 
outputMap)
+  throws IOException {
+this.apiServiceDescriptor = 
functionSpec.getData().unpack(BeamFnApi.RemoteGrpcPort.class)
+.getApiServiceDescriptor();
+this.inputTarget = inputTarget;
+this.processBundleInstructionIdSupplier = 
processBundleInstructionIdSupplier;
+this.beamFnDataClientFactory = beamFnDataClientFactory;
+this.consumers = 
ImmutableList.copyOf(FluentIterable.concat(outputMap.values()));
+
+@SuppressWarnings("unchecked")
+Coder> coder = Serializer.deserialize(
+OBJECT_MAPPER.readValue(
+
coderSpec.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(),
+Map.class),
+Coder.class);
+this.coder = coder;
+  }
+
+  public void registerInputLocation() {
+this.readFuture = beamFnDataClientFactory.forInboundConsumer(
+apiServiceDescriptor,
+KV.of(processBundleInstructionIdSupplier.get(), inputTarget),
+coder,
+this::multiplexToConsumers);
+  }
+
+  public void blockTillReadFinishes() throws Exception {
+LOGGER.debug("Waiting for process bundle instruction {} and target {} to 
close.",
+processBundleInstructionIdSupplier.get(), inputTarget);
+readFuture.get();
+  }
+
+  private void multiplexToConsumers(WindowedValue value) throws 
Exception {
+for (ThrowingConsumer> consumer : consumers) {
+  consumer.accept(value);
+}
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
--

[4/6] beam git commit: A proposal for a portability framework to execute user definable functions.

2017-01-30 Thread lcwik
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
--
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
new file mode 100644
index 000..14e26f0
--- /dev/null
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.control;
+
+import com.google.protobuf.Message;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnApi.RegisterResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A handler and datastore for types that be can be registered via the Fn API.
+ *
+ * Allows for {@link org.apache.beam.fn.v1.BeamFnApi.RegisterRequest}s to 
occur in parallel with
+ * subsequent requests that may lookup registered values by blocking lookups 
until registration
+ * occurs.
+ */
+public class RegisterHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RegisterHandler.class);
+  private final ConcurrentMap> idToObject;
+
+  public RegisterHandler() {
+idToObject = new ConcurrentHashMap<>();
+  }
+
+  public  T getById(long id) {
+try {
+  @SuppressWarnings("unchecked")
+  CompletableFuture returnValue = (CompletableFuture) 
computeIfAbsent(id);
+  /*
+   * TODO: Even though the register request instruction occurs before the 
process bundle
+   * instruction in the control stream, the instructions are being 
processed in parallel
+   * in the Java harness causing a data race which is why we use a future. 
This will block
+   * forever in the case of a runner having a bug sending the wrong ids. 
Instead of blocking
+   * forever, we could do a timed wait or come up with another way of 
ordering the instruction
+   * processing to remove the data race.
+   */
+  return returnValue.get();
+} catch (ExecutionException e) {
+  throw new RuntimeException(String.format("Failed to load %s", id), e);
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(String.format("Failed to load %s", id), e);
+}
+  }
+
+  public BeamFnApi.InstructionResponse.Builder 
register(BeamFnApi.InstructionRequest request) {
+BeamFnApi.InstructionResponse.Builder response = 
BeamFnApi.InstructionResponse.newBuilder()
+.setRegister(RegisterResponse.getDefaultInstance());
+
+BeamFnApi.RegisterRequest registerRequest = request.getRegister();
+for (BeamFnApi.ProcessBundleDescriptor processBundleDescriptor
+: registerRequest.getProcessBundleDescriptorList()) {
+  LOGGER.debug("Registering {} with type {}",
+  processBundleDescriptor.getId(),
+  processBundleDescriptor.getClass());
+  
computeIfAbsent(processBundleDescriptor.getId()).complete(processBundleDescriptor);
+  for (BeamFnApi.Coder coder : processBundleDescriptor.getCodersList()) {
+LOGGER.debug("Registering {} with type {}",
+coder.getFunctionSpec().getId(),
+coder.getClass());
+computeIfAbsent(coder.getFunctionSpec().getId()).complete(coder);
+  }
+}
+
+return response;
+  }
+
+  private CompletableFuture computeIfAbsent(long id) {
+return idToObject.computeIfAbsent(id, (Long ignored) -> new 
CompletableFuture<>());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java
--
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control

[5/6] beam git commit: A proposal for a portability framework to execute user definable functions.

2017-01-30 Thread lcwik
A proposal for a portability framework to execute user definable functions.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b4b2bec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b4b2bec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b4b2bec

Branch: refs/heads/master
Commit: 0b4b2becb45b9f637ba31f599ebe8be0331bd633
Parents: 582c4a8
Author: Luke Cwik 
Authored: Thu Jan 19 15:16:55 2017 -0800
Committer: Luke Cwik 
Committed: Mon Jan 30 12:47:55 2017 -0800

--
 pom.xml |  36 +-
 runners/apex/pom.xml|   2 +-
 sdks/common/fn-api/pom.xml  | 111 +++
 .../fn-api/src/main/proto/beam_fn_api.proto | 771 +++
 sdks/common/pom.xml |  38 +
 .../src/main/resources/beam/findbugs-filter.xml |  32 +-
 sdks/java/harness/pom.xml   | 167 
 .../org/apache/beam/fn/harness/FnHarness.java   | 131 
 .../harness/channel/ManagedChannelFactory.java  |  80 ++
 .../harness/channel/SocketAddressFactory.java   |  64 ++
 .../beam/fn/harness/channel/package-info.java   |  22 +
 .../fn/harness/control/BeamFnControlClient.java | 165 
 .../harness/control/ProcessBundleHandler.java   | 334 
 .../fn/harness/control/RegisterHandler.java |  92 +++
 .../beam/fn/harness/control/package-info.java   |  22 +
 .../BeamFnDataBufferingOutboundObserver.java| 135 
 .../beam/fn/harness/data/BeamFnDataClient.java  |  64 ++
 .../fn/harness/data/BeamFnDataGrpcClient.java   | 122 +++
 .../harness/data/BeamFnDataGrpcMultiplexer.java | 140 
 .../harness/data/BeamFnDataInboundObserver.java |  81 ++
 .../beam/fn/harness/data/package-info.java  |  22 +
 .../fn/harness/fake/FakeAggregatorFactory.java  |  52 ++
 .../beam/fn/harness/fake/FakeStepContext.java   |  70 ++
 .../beam/fn/harness/fake/package-info.java  |  22 +
 .../harness/fn/CloseableThrowingConsumer.java   |  23 +
 .../beam/fn/harness/fn/ThrowingBiFunction.java  |  32 +
 .../beam/fn/harness/fn/ThrowingConsumer.java|  32 +
 .../beam/fn/harness/fn/ThrowingFunction.java|  32 +
 .../beam/fn/harness/fn/ThrowingRunnable.java|  30 +
 .../apache/beam/fn/harness/fn/package-info.java |  22 +
 .../fn/harness/logging/BeamFnLoggingClient.java | 308 
 .../beam/fn/harness/logging/package-info.java   |  22 +
 .../apache/beam/fn/harness/package-info.java|  22 +
 .../beam/fn/harness/stream/AdvancingPhaser.java |  36 +
 .../harness/stream/BufferingStreamObserver.java | 166 
 .../fn/harness/stream/DirectStreamObserver.java |  71 ++
 .../ForwardingClientResponseObserver.java   |  63 ++
 .../harness/stream/StreamObserverFactory.java   |  91 +++
 .../beam/fn/harness/stream/package-info.java|  22 +
 .../beam/runners/core/BeamFnDataReadRunner.java | 104 +++
 .../runners/core/BeamFnDataWriteRunner.java |  87 +++
 .../beam/runners/core/BoundedSourceRunner.java  | 105 +++
 .../apache/beam/runners/core/package-info.java  |  22 +
 .../apache/beam/fn/harness/FnHarnessTest.java   | 130 
 .../channel/ManagedChannelFactoryTest.java  |  74 ++
 .../channel/SocketAddressFactoryTest.java   |  56 ++
 .../control/BeamFnControlClientTest.java| 182 +
 .../control/ProcessBundleHandlerTest.java   | 674 
 .../fn/harness/control/RegisterHandlerTest.java |  80 ++
 ...BeamFnDataBufferingOutboundObserverTest.java | 142 
 .../harness/data/BeamFnDataGrpcClientTest.java  | 309 
 .../data/BeamFnDataGrpcMultiplexerTest.java |  96 +++
 .../data/BeamFnDataInboundObserverTest.java | 116 +++
 .../logging/BeamFnLoggingClientTest.java| 169 
 .../fn/harness/stream/AdvancingPhaserTest.java  |  48 ++
 .../stream/BufferingStreamObserverTest.java | 146 
 .../stream/DirectStreamObserverTest.java| 139 
 .../ForwardingClientResponseObserverTest.java   |  60 ++
 .../stream/StreamObserverFactoryTest.java   |  84 ++
 .../beam/fn/harness/test/TestExecutors.java |  85 ++
 .../beam/fn/harness/test/TestExecutorsTest.java | 160 
 .../beam/fn/harness/test/TestStreams.java   | 162 
 .../beam/fn/harness/test/TestStreamsTest.java   |  84 ++
 .../runners/core/BeamFnDataReadRunnerTest.java  | 187 +
 .../runners/core/BeamFnDataWriteRunnerTest.java | 155 
 .../runners/core/BoundedSourceRunnerTest.java   | 113 +++
 sdks/java/pom.xml   |   1 +
 sdks/pom.xml|   1 +
 68 files changed, 7514 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/pom.xml
--
diff --git a/pom.xml b/pom.xml
index d09bf59..a53453b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11

[GitHub] beam pull request #1801: [BEAM-1347, BEAM-1348] Beam Fn API Basic Java Harne...

2017-01-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/1801


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[6/6] beam git commit: [BEAM-1347, BEAM-1348] Beam Fn API Basic Java Harness and Proto Model

2017-01-30 Thread lcwik
[BEAM-1347, BEAM-1348] Beam Fn API Basic Java Harness and Proto Model

This closes #1801


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/343176c0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/343176c0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/343176c0

Branch: refs/heads/master
Commit: 343176c008832f4d3776a6e591d36be84dfb022a
Parents: 582c4a8 0b4b2be
Author: Luke Cwik 
Authored: Mon Jan 30 12:48:35 2017 -0800
Committer: Luke Cwik 
Committed: Mon Jan 30 12:48:35 2017 -0800

--
 pom.xml |  36 +-
 runners/apex/pom.xml|   2 +-
 sdks/common/fn-api/pom.xml  | 111 +++
 .../fn-api/src/main/proto/beam_fn_api.proto | 771 +++
 sdks/common/pom.xml |  38 +
 .../src/main/resources/beam/findbugs-filter.xml |  32 +-
 sdks/java/harness/pom.xml   | 167 
 .../org/apache/beam/fn/harness/FnHarness.java   | 131 
 .../harness/channel/ManagedChannelFactory.java  |  80 ++
 .../harness/channel/SocketAddressFactory.java   |  64 ++
 .../beam/fn/harness/channel/package-info.java   |  22 +
 .../fn/harness/control/BeamFnControlClient.java | 165 
 .../harness/control/ProcessBundleHandler.java   | 334 
 .../fn/harness/control/RegisterHandler.java |  92 +++
 .../beam/fn/harness/control/package-info.java   |  22 +
 .../BeamFnDataBufferingOutboundObserver.java| 135 
 .../beam/fn/harness/data/BeamFnDataClient.java  |  64 ++
 .../fn/harness/data/BeamFnDataGrpcClient.java   | 122 +++
 .../harness/data/BeamFnDataGrpcMultiplexer.java | 140 
 .../harness/data/BeamFnDataInboundObserver.java |  81 ++
 .../beam/fn/harness/data/package-info.java  |  22 +
 .../fn/harness/fake/FakeAggregatorFactory.java  |  52 ++
 .../beam/fn/harness/fake/FakeStepContext.java   |  70 ++
 .../beam/fn/harness/fake/package-info.java  |  22 +
 .../harness/fn/CloseableThrowingConsumer.java   |  23 +
 .../beam/fn/harness/fn/ThrowingBiFunction.java  |  32 +
 .../beam/fn/harness/fn/ThrowingConsumer.java|  32 +
 .../beam/fn/harness/fn/ThrowingFunction.java|  32 +
 .../beam/fn/harness/fn/ThrowingRunnable.java|  30 +
 .../apache/beam/fn/harness/fn/package-info.java |  22 +
 .../fn/harness/logging/BeamFnLoggingClient.java | 308 
 .../beam/fn/harness/logging/package-info.java   |  22 +
 .../apache/beam/fn/harness/package-info.java|  22 +
 .../beam/fn/harness/stream/AdvancingPhaser.java |  36 +
 .../harness/stream/BufferingStreamObserver.java | 166 
 .../fn/harness/stream/DirectStreamObserver.java |  71 ++
 .../ForwardingClientResponseObserver.java   |  63 ++
 .../harness/stream/StreamObserverFactory.java   |  91 +++
 .../beam/fn/harness/stream/package-info.java|  22 +
 .../beam/runners/core/BeamFnDataReadRunner.java | 104 +++
 .../runners/core/BeamFnDataWriteRunner.java |  87 +++
 .../beam/runners/core/BoundedSourceRunner.java  | 105 +++
 .../apache/beam/runners/core/package-info.java  |  22 +
 .../apache/beam/fn/harness/FnHarnessTest.java   | 130 
 .../channel/ManagedChannelFactoryTest.java  |  74 ++
 .../channel/SocketAddressFactoryTest.java   |  56 ++
 .../control/BeamFnControlClientTest.java| 182 +
 .../control/ProcessBundleHandlerTest.java   | 674 
 .../fn/harness/control/RegisterHandlerTest.java |  80 ++
 ...BeamFnDataBufferingOutboundObserverTest.java | 142 
 .../harness/data/BeamFnDataGrpcClientTest.java  | 309 
 .../data/BeamFnDataGrpcMultiplexerTest.java |  96 +++
 .../data/BeamFnDataInboundObserverTest.java | 116 +++
 .../logging/BeamFnLoggingClientTest.java| 169 
 .../fn/harness/stream/AdvancingPhaserTest.java  |  48 ++
 .../stream/BufferingStreamObserverTest.java | 146 
 .../stream/DirectStreamObserverTest.java| 139 
 .../ForwardingClientResponseObserverTest.java   |  60 ++
 .../stream/StreamObserverFactoryTest.java   |  84 ++
 .../beam/fn/harness/test/TestExecutors.java |  85 ++
 .../beam/fn/harness/test/TestExecutorsTest.java | 160 
 .../beam/fn/harness/test/TestStreams.java   | 162 
 .../beam/fn/harness/test/TestStreamsTest.java   |  84 ++
 .../runners/core/BeamFnDataReadRunnerTest.java  | 187 +
 .../runners/core/BeamFnDataWriteRunnerTest.java | 155 
 .../runners/core/BoundedSourceRunnerTest.java   | 113 +++
 sdks/java/pom.xml   |   1 +
 sdks/pom.xml|   1 +
 68 files changed, 7514 insertions(+), 4 deletions(-)
--




[1/6] beam git commit: A proposal for a portability framework to execute user definable functions.

2017-01-30 Thread lcwik
Repository: beam
Updated Branches:
  refs/heads/master 582c4a8a4 -> 343176c00


http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
--
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
new file mode 100644
index 000..73860ef
--- /dev/null
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BoundedSourceRunner}. */
+@RunWith(JUnit4.class)
+public class BoundedSourceRunnerTest {
+  @Test
+  public void testRunReadLoopWithMultipleSources() throws Exception {
+List> out1ValuesA = new ArrayList<>();
+List> out1ValuesB = new ArrayList<>();
+List> out2Values = new ArrayList<>();
+Map>>> outputMap = 
ImmutableMap.of(
+"out1", ImmutableList.of(out1ValuesA::add, out1ValuesB::add),
+"out2", ImmutableList.of(out2Values::add));
+
+BoundedSourceRunner, Long> runner =
+new BoundedSourceRunner<>(
+PipelineOptionsFactory.create(),
+BeamFnApi.FunctionSpec.getDefaultInstance(),
+outputMap);
+
+runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(2)));
+runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(1)));
+
+assertThat(out1ValuesA,
+contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), 
valueInGlobalWindow(0L)));
+assertThat(out1ValuesB,
+contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), 
valueInGlobalWindow(0L)));
+assertThat(out2Values,
+contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), 
valueInGlobalWindow(0L)));
+  }
+
+  @Test
+  public void testRunReadLoopWithEmptySource() throws Exception {
+List> out1Values = new ArrayList<>();
+Map>>> outputMap = 
ImmutableMap.of(
+"out1", ImmutableList.of(out1Values::add));
+
+BoundedSourceRunner, Long> runner =
+new BoundedSourceRunner<>(
+PipelineOptionsFactory.create(),
+BeamFnApi.FunctionSpec.getDefaultInstance(),
+outputMap);
+
+runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(0)));
+
+assertThat(out1Values, empty());
+  }
+
+  @Test
+  public void testStart() throws Exception {
+List> outValues = new ArrayList<>();
+Map>>> outputMap = 
ImmutableMap.of(
+"out", ImmutableList.of(outValues::add));
+
+ByteString encodedSource =
+
ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3)));
+
+BoundedSourceRunner, Long> runner =
+new BoundedSourceRunner<>(
+PipelineOptionsFactory.create(),
+BeamFnApi.FunctionSpec.newBuilder().setData(
+
Any.pack(BytesValue.newBuilder().setValue(encodedSource).build())).build(),
+outputMap);
+
+runner.start();
+
+assertThat(outValues,
+contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), 
valueIn

[2/6] beam git commit: A proposal for a portability framework to execute user definable functions.

2017-01-30 Thread lcwik
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
--
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
new file mode 100644
index 000..20566ea
--- /dev/null
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.data;
+
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnDataGrpc;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BeamFnDataGrpcClient}. */
+@RunWith(JUnit4.class)
+public class BeamFnDataGrpcClientTest {
+  private static final Coder> CODER =
+  LengthPrefixCoder.of(
+  WindowedValue.getFullCoder(StringUtf8Coder.of(),
+  GlobalWindow.Coder.INSTANCE));
+  private static final KV KEY_A = KV.of(
+  12L,
+  
BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(34L).setName("targetA").build());
+  private static final KV KEY_B = KV.of(
+  56L,
+  
BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(78L).setName("targetB").build());
+
+  private static final BeamFnApi.Elements ELEMENTS_A_1;
+  private static final BeamFnApi.Elements ELEMENTS_A_2;
+  private static final BeamFnApi.Elements ELEMENTS_B_1;
+  static {
+try {
+ELEMENTS_A_1 = BeamFnApi.Elements.newBuilder()
+.addData(BeamFnApi.Elements.Data.newBuilder()
+.setInstructionReference(KEY_A.getKey())
+.setTarget(KEY_A.getValue())
+.setData(ByteString.copyFrom(encodeToByteArray(CODER, 
valueInGlobalWindow("ABC")))
+.concat(ByteString.copyFrom(encodeToByteArray(CODER, 
valueInGlobalWindow("DEF"))
+.build();
+ELEMENTS_A_2 = BeamFnApi.Elements.newBuilder()
+.addData(BeamFnApi.Elements.Data.newBuilder()
+.setInstructionReference(KEY_A.getKey())
+.setTarget(KEY_A.getValue())
+.setData(ByteString.copyFrom(encodeToByteArray(CODER, 
valueInGlobalWindow("GHI")
+.addData(BeamFnApi.Elements.Data.newBuilder()
+.setInstructionReference(KEY_A.getKey())
+.setTarget(KEY_A.getValue()))
+.build();
+ELEMENTS_B_1 = BeamFnApi.El

[jira] (BEAM-1347) Basic Java harness capable of understanding process bundle tasks and sending data over the Fn Api

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  BEAM-1347 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Basic Java harness capable of understanding process bundle tasks and sending data over the Fn Api  
 
 
 
 
 
 
 
 
 
 
Github user asfgit closed the pull request at: 
 https://github.com/apache/beam/pull/1801 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[GitHub] beam pull request #1871: Revert python-sdk only changes in travis, and clean...

2017-01-30 Thread aaltay
GitHub user aaltay opened a pull request:

https://github.com/apache/beam/pull/1871

Revert python-sdk only changes in travis, and clean incubator keywords.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aaltay/incubator-beam travis

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1871.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1871


commit d888060153101a5339a2869440ee67aaf793d987
Author: Ahmet Altay 
Date:   2017-01-30T20:51:15Z

Revert python-sdk only changes in travis, and clean incubator keywords.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #1866: [BEAM-1338] Moves ThreadPool creation to a util fun...

2017-01-30 Thread chamikaramj
Github user chamikaramj closed the pull request at:

https://github.com/apache/beam/pull/1866


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] (BEAM-1349) Merge python-sdk to master

2017-01-30 Thread Ahmet Altay (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Ahmet Altay created an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-1349 
 
 
 
  Merge python-sdk to master  
 
 
 
 
 
 
 
 
 

Issue Type:
 
  Bug 
 
 
 

Assignee:
 
 Ahmet Altay 
 
 
 

Components:
 

 sdk-py 
 
 
 

Created:
 

 30/Jan/17 20:54 
 
 
 

Fix Versions:
 

 0.6.0 
 
 
 

Priority:
 
  Major 
 
 
 

Reporter:
 
 Ahmet Altay 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  

[jira] (BEAM-1338) Move ThreadPool creation logic to a util function

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  BEAM-1338 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Move ThreadPool creation logic to a util function  
 
 
 
 
 
 
 
 
 
 
Github user chamikaramj closed the pull request at: 
 https://github.com/apache/beam/pull/1866 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[GitHub] beam pull request #1863: [BEAM-1333] Add mock time to slow bigquery unit tes...

2017-01-30 Thread aaltay
Github user aaltay closed the pull request at:

https://github.com/apache/beam/pull/1863


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] beam pull request #1870: Merge master into python-sdk

2017-01-30 Thread aaltay
Github user aaltay closed the pull request at:

https://github.com/apache/beam/pull/1870


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] (BEAM-1333) Biq query unit tests are slow

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  BEAM-1333 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Biq query unit tests are slow  
 
 
 
 
 
 
 
 
 
 
Github user aaltay closed the pull request at: 
 https://github.com/apache/beam/pull/1863 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[2/2] beam git commit: This closes #1815

2017-01-30 Thread amitsela
This closes #1815


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/847e4e9f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/847e4e9f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/847e4e9f

Branch: refs/heads/master
Commit: 847e4e9f0b84efa4726692cc8b7c9a0610703888
Parents: 343176c 62f9e7b
Author: Sela 
Authored: Mon Jan 30 22:53:56 2017 +0200
Committer: Sela 
Committed: Mon Jan 30 22:53:56 2017 +0200

--
 .../apache/beam/runners/spark/SparkRunner.java  | 21 -
 .../spark/aggregators/AccumulatorSingleton.java | 96 ++--
 .../spark/aggregators/SparkAggregators.java | 20 +++-
 .../translation/streaming/CheckpointDir.java| 69 ++
 .../SparkRunnerStreamingContextFactory.java | 44 ++---
 .../ResumeFromCheckpointStreamingTest.java  |  5 +-
 6 files changed, 230 insertions(+), 25 deletions(-)
--




[1/2] beam git commit: [BEAM-648] Persist and restore Aggergator values in case of recovery from failure

2017-01-30 Thread amitsela
Repository: beam
Updated Branches:
  refs/heads/master 343176c00 -> 847e4e9f0


[BEAM-648] Persist and restore Aggergator values in case of recovery from 
failure

Added javadoc and minor refactor

Moved creation of beam checkpoint dir


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62f9e7b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62f9e7b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62f9e7b1

Branch: refs/heads/master
Commit: 62f9e7b1e1a8a8f2317e3508ccce615f2b30d4f6
Parents: 343176c
Author: Aviem Zur 
Authored: Sun Jan 22 14:30:44 2017 +0200
Committer: Sela 
Committed: Mon Jan 30 22:53:34 2017 +0200

--
 .../apache/beam/runners/spark/SparkRunner.java  | 21 -
 .../spark/aggregators/AccumulatorSingleton.java | 96 ++--
 .../spark/aggregators/SparkAggregators.java | 20 +++-
 .../translation/streaming/CheckpointDir.java| 69 ++
 .../SparkRunnerStreamingContextFactory.java | 44 ++---
 .../ResumeFromCheckpointStreamingTest.java  |  5 +-
 6 files changed, 230 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 92c07bb..578ed21 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -18,12 +18,14 @@
 
 package org.apache.beam.runners.spark;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import 
org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetricSource;
@@ -32,6 +34,7 @@ import 
org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
+import org.apache.beam.runners.spark.translation.streaming.CheckpointDir;
 import 
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
@@ -54,6 +57,7 @@ import org.apache.spark.SparkEnv$;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.metrics.MetricsSystem;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -130,7 +134,11 @@ public final class SparkRunner extends 
PipelineRunner {
   }
 
   private void registerMetrics(final SparkPipelineOptions opts, final 
JavaSparkContext jsc) {
-final Accumulator accum = 
SparkAggregators.getNamedAggregators(jsc);
+Optional maybeCheckpointDir =
+opts.isStreaming() ? Optional.of(new 
CheckpointDir(opts.getCheckpointDir()))
+: Optional.absent();
+final Accumulator accum =
+SparkAggregators.getOrCreateNamedAggregators(jsc, maybeCheckpointDir);
 final NamedAggregators initialValue = accum.value();
 
 if (opts.getEnableSparkMetricSinks()) {
@@ -154,10 +162,17 @@ public final class SparkRunner extends 
PipelineRunner {
 detectTranslationMode(pipeline);
 
 if (mOptions.isStreaming()) {
+  CheckpointDir checkpointDir = new 
CheckpointDir(mOptions.getCheckpointDir());
   final SparkRunnerStreamingContextFactory contextFactory =
-  new SparkRunnerStreamingContextFactory(pipeline, mOptions);
+  new SparkRunnerStreamingContextFactory(pipeline, mOptions, 
checkpointDir);
   final JavaStreamingContext jssc =
-  JavaStreamingContext.getOrCreate(mOptions.getCheckpointDir(), 
contextFactory);
+  
JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(),
+  contextFactory);
+
+  // Checkpoint aggregator values
+  jssc.addStreamingListener(
+  new JavaStreamingListenerWrapper(
+  new 
AccumulatorSingleton.AccumulatorCheckpointingSparkListener()));
 
   startPipeline = executorService.submit(new Runnable() {
 

http

[jira] (BEAM-648) Persist and restore Aggergator values in case of recovery from failure

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  BEAM-648 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Persist and restore Aggergator values in case of recovery from failure  
 
 
 
 
 
 
 
 
 
 
Github user asfgit closed the pull request at: 
 https://github.com/apache/beam/pull/1815 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[GitHub] beam pull request #1815: [BEAM-648] Persist and restore Aggergator values in...

2017-01-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/1815


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] (BEAM-648) Persist and restore Aggergator values in case of recovery from failure

2017-01-30 Thread Amit Sela (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Amit Sela resolved as Fixed 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-648 
 
 
 
  Persist and restore Aggergator values in case of recovery from failure  
 
 
 
 
 
 
 
 
 

Change By:
 
 Amit Sela 
 
 
 

Resolution:
 
 Fixed 
 
 
 

Fix Version/s:
 
 0.6.0 
 
 
 

Status:
 
 Open Resolved 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[1/2] beam git commit: Revert python-sdk only changes in travis, and clean incubator keywords.

2017-01-30 Thread davor
Repository: beam
Updated Branches:
  refs/heads/python-sdk 1390699c3 -> be0e32e36


Revert python-sdk only changes in travis, and clean incubator keywords.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b4ee73a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b4ee73a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b4ee73a

Branch: refs/heads/python-sdk
Commit: 0b4ee73a36f47a8f1b5c7ece775eae2c68af4245
Parents: 1390699
Author: Ahmet Altay 
Authored: Mon Jan 30 12:51:15 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 13:25:32 2017 -0800

--
 .travis.yml  | 15 +++
 .../examples/cookbook/datastore_wordcount.py |  2 +-
 sdks/python/setup.py |  4 ++--
 3 files changed, 18 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/0b4ee73a/.travis.yml
--
diff --git a/.travis.yml b/.travis.yml
index cb6f790..a392f7d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -39,6 +39,20 @@ env:
 
 matrix:
   include:
+# On OSX, run with default JDK only.
+- os: osx
+
+# On Linux, run with specific JDKs only.
+- os: linux
+  env: CUSTOM_JDK="oraclejdk8" MAVEN_OVERRIDE="$MAVEN_OVERRIDE 
$MAVEN_CONTAINER_OVERRIDE"
+- os: linux
+  env: CUSTOM_JDK="oraclejdk7" MAVEN_OVERRIDE="$MAVEN_OVERRIDE 
$MAVEN_CONTAINER_OVERRIDE"
+- os: linux
+  env: CUSTOM_JDK="openjdk7" MAVEN_OVERRIDE="$MAVEN_OVERRIDE 
$MAVEN_CONTAINER_OVERRIDE"
+- os: linux
+  env: MAVEN_OVERRIDE="-Peclipse-jdt -DskipTests $MAVEN_OVERRIDE 
$MAVEN_CONTAINER_OVERRIDE" CUSTOM_JDK="oraclejdk8"
+
+# Python SDK tests.
 - os: osx
   env: TEST_PYTHON="1"
 - os: linux
@@ -51,6 +65,7 @@ before_install:
   - cat ~/.mavenrc
   - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export 
JAVA_HOME=$(/usr/libexec/java_home); fi
   - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; 
fi
+  - export BEAM_SUREFIRE_ARGLINE="-Xmx512m"
   # Python SDK environment settings.
   - export TOX_ENV=py27
   - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export 
TOX_HOME=$HOME/Library/Python/2.7/bin; fi

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4ee73a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
--
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py 
b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 25abb3e..067cb80 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -23,7 +23,7 @@ Cloud Datastore operations.
 
 See https://developers.google.com/datastore/ for more details on Google Cloud
 Datastore.
-See http://beam.incubator.apache.org/get-started/quickstart on
+See https://beam.apache.org/get-started/quickstart on
 how to run a Beam pipeline.
 
 Read-only Mode: In this mode, this example reads Cloud Datastore entities using

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4ee73a/sdks/python/setup.py
--
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 37125c2..e75a583 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -37,10 +37,10 @@ def get_version():
 PACKAGE_NAME = 'apache-beam-sdk'
 PACKAGE_VERSION = get_version()
 PACKAGE_DESCRIPTION = 'Apache Beam SDK for Python'
-PACKAGE_URL = 'https://beam.incubator.apache.org'
+PACKAGE_URL = 'https://beam.apache.org'
 PACKAGE_DOWNLOAD_URL = 'TBD'
 PACKAGE_AUTHOR = 'Apache Software Foundation'
-PACKAGE_EMAIL = 'd...@beam.incubator.apache.org'
+PACKAGE_EMAIL = 'd...@beam.apache.org'
 PACKAGE_KEYWORDS = 'apache beam'
 PACKAGE_LONG_DESCRIPTION = '''
 TBD



[2/2] beam git commit: This closes #1871

2017-01-30 Thread davor
This closes #1871


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/be0e32e3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/be0e32e3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/be0e32e3

Branch: refs/heads/python-sdk
Commit: be0e32e36313390ed04106d57f4c9dfeabb91b4d
Parents: 1390699 0b4ee73
Author: Davor Bonaci 
Authored: Mon Jan 30 13:25:53 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 13:25:53 2017 -0800

--
 .travis.yml  | 15 +++
 .../examples/cookbook/datastore_wordcount.py |  2 +-
 sdks/python/setup.py |  4 ++--
 3 files changed, 18 insertions(+), 3 deletions(-)
--




[GitHub] beam pull request #1871: Revert python-sdk only changes in travis, and clean...

2017-01-30 Thread aaltay
Github user aaltay closed the pull request at:

https://github.com/apache/beam/pull/1871


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] (BEAM-1349) Merge python-sdk to master

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  BEAM-1349 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Merge python-sdk to master  
 
 
 
 
 
 
 
 
 
 
GitHub user aaltay opened a pull request: 
 https://github.com/apache/beam/pull/1872 
 BEAM-1349 Merge python-sdk to master 
You can merge this pull request into a Git repository by running: 
 $ git pull https://github.com/apache/beam python-sdk 
Alternatively you can review and apply these changes as the patch at: 
 https://github.com/apache/beam/pull/1872.patch 
To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: 
 This closes #1872 
 
commit 0a66721b5528483536eac753d5fc8cf28844b2eb Author: Maria Garcia Herrero  Date: 2016-11-22T17:57:50Z 
 Merge remote-tracking branch 'origin/master' into merge_master 
commit cc706608b281c3beeebd2487084946c06bc83f30 Author: Mark Liu  Date: 2016-11-17T17:53:01Z 
 Support @ValidatesRunner(RunnableOnService) in Python [1/2] 
commit 28bfd9090b0ce33c3da0bab0220fcc1ef8a72b4b Author: Thomas Groh  Date: 2016-11-22T18:11:22Z 
 This closes #1376 
commit b4187bd91e9e53c1562ee845ffa87cc9e734006f Author: Kenneth Knowles  Date: 2016-11-22T20:08:38Z 
 This closes #1416 
commit a6be102a9ebafd3e616ee6aca9a11fbb21c375d9 Author: Maria Garcia Herrero  Date: 2016-11-22T21:24:08Z 
 Remove tests for merge 
commit 3b5cd0efc5f5e4b8fad34ee0d976e5e6ba501065 Author: Kenneth Knowles  Date: 2016-11-22T21:56:37Z 
 This closes #1384 
commit 21f9c6d2cff052d662326ce73fdcf1fb08504dda Author: Kenneth Knowles  Date: 2016-11-22T23:31:10Z 
 This closes #1423 
commit 2b69cce0f311a2ef40fdef4fe60d3e6fc13a8868 Author: Vikas Kedigehalli  Date: 2016-11-16T00:41:24Z 
 Add DatastoreIO to Python SDK 
commit 9b9d016c80b9a7e73a7485d3e579ead3ada18ac6 Author: Davor Bonaci  Date: 2016-11-23T18:42:17Z 
 This closes #1398 
commit 4dd19782f2624bf8aed3df8484fa314f94904571 Author: Kenneth Knowles  Date: 2016-11-16T05:33:13Z 
 Reject stateful DoFn in SparkRunner 
commit 413a40243a30e059476395a2dcbfc98a94bb22f2 Author: Kenneth Knowles  Date: 2016-11-16T05:33:28Z 
 Reject stateful DoFn in FlinkRunner 
commit 255ad9a327133ab4f05ebbceca236d5fe0006028 Author: Kenneth Knowles  Date: 2016-11-21T23:41:13Z 
 Add JUnit category for stateful ParDo tests 
commit 1fc8d65a079e58d740a9b954da980963f20e9edf Author: Scott Wegner  Date: 2016-11-22T00:33:07Z 
 Update StarterPipeline 
  

[GitHub] beam pull request #1872: [BEAM-1349] Merge python-sdk to master

2017-01-30 Thread aaltay
GitHub user aaltay opened a pull request:

https://github.com/apache/beam/pull/1872

[BEAM-1349] Merge python-sdk to master



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/beam python-sdk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1872.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1872


commit 0a66721b5528483536eac753d5fc8cf28844b2eb
Author: Maria Garcia Herrero 
Date:   2016-11-22T17:57:50Z

Merge remote-tracking branch 'origin/master' into merge_master

commit cc706608b281c3beeebd2487084946c06bc83f30
Author: Mark Liu 
Date:   2016-11-17T17:53:01Z

Support @ValidatesRunner(RunnableOnService) in Python [1/2]

commit 28bfd9090b0ce33c3da0bab0220fcc1ef8a72b4b
Author: Thomas Groh 
Date:   2016-11-22T18:11:22Z

This closes #1376

commit b4187bd91e9e53c1562ee845ffa87cc9e734006f
Author: Kenneth Knowles 
Date:   2016-11-22T20:08:38Z

This closes #1416

commit a6be102a9ebafd3e616ee6aca9a11fbb21c375d9
Author: Maria Garcia Herrero 
Date:   2016-11-22T21:24:08Z

Remove tests for merge

commit 3b5cd0efc5f5e4b8fad34ee0d976e5e6ba501065
Author: Kenneth Knowles 
Date:   2016-11-22T21:56:37Z

This closes #1384

commit 21f9c6d2cff052d662326ce73fdcf1fb08504dda
Author: Kenneth Knowles 
Date:   2016-11-22T23:31:10Z

This closes #1423

commit 2b69cce0f311a2ef40fdef4fe60d3e6fc13a8868
Author: Vikas Kedigehalli 
Date:   2016-11-16T00:41:24Z

Add DatastoreIO to Python SDK

commit 9b9d016c80b9a7e73a7485d3e579ead3ada18ac6
Author: Davor Bonaci 
Date:   2016-11-23T18:42:17Z

This closes #1398

commit 4dd19782f2624bf8aed3df8484fa314f94904571
Author: Kenneth Knowles 
Date:   2016-11-16T05:33:13Z

Reject stateful DoFn in SparkRunner

commit 413a40243a30e059476395a2dcbfc98a94bb22f2
Author: Kenneth Knowles 
Date:   2016-11-16T05:33:28Z

Reject stateful DoFn in FlinkRunner

commit 255ad9a327133ab4f05ebbceca236d5fe0006028
Author: Kenneth Knowles 
Date:   2016-11-21T23:41:13Z

Add JUnit category for stateful ParDo tests

commit 1fc8d65a079e58d740a9b954da980963f20e9edf
Author: Scott Wegner 
Date:   2016-11-22T00:33:07Z

Update StarterPipeline

Convert StarterPipeline ParDo to MapElements.

Use the new DoFn for non-outputting transforms.

commit 796ba7ab75bc8d01a3a59efc29cdc17bcd26af4a
Author: Kenneth Knowles 
Date:   2016-11-16T05:33:01Z

Reject stateful DoFn in ApexRunner

commit 6fa8f658abaac1d3a983bfc3b8c09422159af8aa
Author: bchambers 
Date:   2016-11-22T19:37:23Z

Simplify the API for managing MetricsEnvironment

1. setCurrentContainer returns the previous MetricsEnvironment
2. setCurrentContainer(null) resets the thread local
3. scopedCurrentContainer sets the container and returns a Closeable to
   reset the previous container.

commit f03b4fe11cb605edf216903738a6c305b3a91066
Author: Thomas Groh 
Date:   2016-11-22T22:51:39Z

Output Keyed Bundles in GroupAlsoByWindowEvaluator

This allows reuse of keys for downstream serialization.

commit dcd401ba0b5bd12343484b0df50b15b6ef10ace9
Author: Thomas Groh 
Date:   2016-11-23T00:14:29Z

Add TransformHierarchyTest

This tests basic features of TransformHierarchy

commit 6f86af612f97ad57cf4ba2cae21ba232f7494ada
Author: Kenneth Knowles 
Date:   2016-11-23T06:16:29Z

Use more natural class to find class loader in ReflectHelpers

commit 2e03bb8a136078064014a0a7101960f6d2019487
Author: Thomas Weise 
Date:   2016-11-22T19:38:00Z

Update transitive dependencies for Apex 3.5.0 snapshot version.

commit 3dbeb8edfdfe4c9e8987e4d8df4451fdb748dc07
Author: Davor Bonaci 
Date:   2016-11-24T00:02:41Z

This closes #1432

commit d46203b7fcdc9895c9cee1d82710f48aba31a748
Author: Vikas Kedigehalli 
Date:   2016-11-23T22:09:09Z

datastoreio write/delete ptransform

update datastore_wordcount example to include writes

commit 1530a17279d098ae7459f689ef02401f5116e54e
Author: Dan Halperin 
Date:   2016-11-28T23:54:27Z

Closes #1433

commit 7a059d37e71b62702e8cdeafec6956fc7e1e38c4
Author: Sourabh Bajaj 
Date:   2016-11-21T23:50:21Z

Improve the speed of getting file sizes

commit ad4dc87a472387b507545ab80dbd2fe42e02cea3
Author: Davor Bonaci 
Date:   2016-11-29T01:40:50Z

This closes #1404

commit 6c8c17a1c1977ed69860d25dc8ab45640e7a1c53
Author: Vikas Kedigehalli 
Date:   2016-11-29T17:54:00Z

Update googledatastore version

commit 5ce75a2eae31dbab4d07d301716b4d7e3218b8b9
Author: Dan Halperin 
Date:   2016-11-29T22:01:50Z

Closes #1453

commit 81e7a0f653864212a5c9d3d0802608f92bb34501
Author: Mark Liu 
Date:   2016-11-17T22:45:42Z

Support ValidatesRunner Attribute in Python

This is roughly equivalent to "RunnableOnService" in the Java SDK. See
BEAM-655

commit 70c1de9b95e9c20e5efb277d9ad50ae6348

[jira] (BEAM-1350) BigQueryIO does not honor SQL dialect or result flattening in DirectRunner

2017-01-30 Thread Daniel Halperin (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Daniel Halperin created an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-1350 
 
 
 
  BigQueryIO does not honor SQL dialect or result flattening in DirectRunner  
 
 
 
 
 
 
 
 
 

Issue Type:
 
  Bug 
 
 
 

Assignee:
 
 Daniel Halperin 
 
 
 

Components:
 

 sdk-java-gcp 
 
 
 

Created:
 

 30/Jan/17 22:06 
 
 
 

Priority:
 
  Major 
 
 
 

Reporter:
 
 Daniel Halperin 
 
 
 
 
 
 
 
 
 
 
BigQueryTableRowIterator is used when reading on some runners that do not perform initial splitting. It has a bug in that it did not propagate configuration correctly, by ignoring useLegacySQL and flattenResults flags. 
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java#L395 
vs 
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1148 
This can cause failures on some runners, possibly including the DirectRunner. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
   

[GitHub] beam pull request #1873: [BEAM-1350] BigQuery: refactor services so that all...

2017-01-30 Thread dhalperi
GitHub user dhalperi opened a pull request:

https://github.com/apache/beam/pull/1873

[BEAM-1350] BigQuery: refactor services so that all queryConfig happens in 
BigQueryIO

By putting all the configuration in the same place, we can avoid
bugs that happen from mismatching code across files.

Also made a few unnecessarily-public APIs package-private.

And improved tests, removed a few dataflow references.

Forward port from Dataflow.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhalperi/beam bigquery-io-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1873.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1873


commit e643a85cbf64687de024ca0dc0f34b9c6ae451f5
Author: Dan Halperin 
Date:   2017-01-30T22:04:32Z

BigQuery: refactor services so that all queryConfig happens in BigQueryIO

By putting all the configuration in the same place, we can avoid
bugs that happen from mismatching code across files.

Also made a few unnecessarily-public APIs package-private.

And improved tests, removed a few dataflow references.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] (BEAM-1350) BigQueryIO does not honor SQL dialect or result flattening in DirectRunner

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  BEAM-1350 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: BigQueryIO does not honor SQL dialect or result flattening in DirectRunner  
 
 
 
 
 
 
 
 
 
 
GitHub user dhalperi opened a pull request: 
 https://github.com/apache/beam/pull/1873 
 BEAM-1350 BigQuery: refactor services so that all queryConfig happens in BigQueryIO 
 By putting all the configuration in the same place, we can avoid bugs that happen from mismatching code across files. 
 Also made a few unnecessarily-public APIs package-private. 
 And improved tests, removed a few dataflow references. 
 Forward port from Dataflow. 
You can merge this pull request into a Git repository by running: 
 $ git pull https://github.com/dhalperi/beam bigquery-io-fix 
Alternatively you can review and apply these changes as the patch at: 
 https://github.com/apache/beam/pull/1873.patch 
To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: 
 This closes #1873 
 
commit e643a85cbf64687de024ca0dc0f34b9c6ae451f5 Author: Dan Halperin  Date: 2017-01-30T22:04:32Z 
 BigQuery: refactor services so that all queryConfig happens in BigQueryIO 
 By putting all the configuration in the same place, we can avoid bugs that happen from mismatching code across files. 
 Also made a few unnecessarily-public APIs package-private. 
 And improved tests, removed a few dataflow references. 
 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 

[jira] (BEAM-1351) Upgrade AutoValue 1.1->1.3

2017-01-30 Thread Pei He (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Pei He created an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-1351 
 
 
 
  Upgrade AutoValue 1.1->1.3  
 
 
 
 
 
 
 
 
 

Issue Type:
 
  Task 
 
 
 

Assignee:
 
 Pei He 
 
 
 

Components:
 

 sdk-java-core 
 
 
 

Created:
 

 30/Jan/17 22:06 
 
 
 

Priority:
 
  Major 
 
 
 

Reporter:
 
 Pei He 
 
 
 
 
 
 
 
 
 
 
Update to allow using AutoValue on an inherited class. 
I am using this feature in Beam FileSystem, and to define a file system specific CreateOptions. 
Additional benefits: 1. The classes in the autovalue jar are now shaded with a $ so they never appear in IDE autocompletion. 2. AutoValue now uses its own implementation of a subset of Apache Velocity, so there will no longer be problems with interference between the Velocity that was bundled with AutoValue and other versions that might be present. 3. Explicit check for nested @AutoValue classes being private, or not being static. Otherwise the compiler errors could be hard to understand, especially in IDEs. 4. An Eclipse bug that could occasionally lead to exceptions in the IDE has been fixed (GitHub issue #200) 5. Added logic to AutoValue to detect the confusing case where you think you are using JavaBeans conventions (like getFoo()) but you aren't because at least one method isn't. 
 
 
 
 
 
 
 
 
 
 
 
 


[jira] (BEAM-1352) io/google-cloud-platform should not depend on runners/dataflow for testing

2017-01-30 Thread Pei He (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Pei He created an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Beam /  BEAM-1352 
 
 
 
  io/google-cloud-platform should not depend on runners/dataflow for testing  
 
 
 
 
 
 
 
 
 

Issue Type:
 
  Task 
 
 
 

Assignee:
 
 Pei He 
 
 
 

Components:
 

 sdk-java-core 
 
 
 

Created:
 

 30/Jan/17 22:29 
 
 
 

Priority:
 
  Major 
 
 
 

Reporter:
 
 Pei He 
 
 
 
 
 
 
 
 
 
 
dataflow-runner needs to depends on io/google-cloud-platform to specialize configurations. 
Currently, it is done by putting GcsUtil in the sdk.util. https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java#L189 
It is no longer possible after FileSystem refactoring, given GcsFileSystem and its configuration will be in io/google-cloud-platform. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 

[06/50] [abbrv] beam git commit: Closes #1810

2017-01-30 Thread davor
Closes #1810


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/894461e6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/894461e6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/894461e6

Branch: refs/heads/master
Commit: 894461e64b09e6d719ba3eef282cd36cea550f7b
Parents: d0474ab f68c9dc
Author: Robert Bradshaw 
Authored: Mon Jan 23 09:48:55 2017 -0800
Committer: Robert Bradshaw 
Committed: Mon Jan 23 09:48:55 2017 -0800

--
 sdks/python/apache_beam/runners/common.pxd | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--




[39/50] [abbrv] beam git commit: Update the version.py file to match the latest beam version.

2017-01-30 Thread davor
Update the version.py file to match the latest beam version.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38575a14
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38575a14
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38575a14

Branch: refs/heads/master
Commit: 38575a14e2b17c93de2d0e27fe6213daa7101695
Parents: 4aaaf8f
Author: Ahmet Altay 
Authored: Mon Jan 30 12:21:28 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:38:38 2017 -0800

--
 sdks/python/apache_beam/version.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/38575a14/sdks/python/apache_beam/version.py
--
diff --git a/sdks/python/apache_beam/version.py 
b/sdks/python/apache_beam/version.py
index 60d9634..12509fb 100644
--- a/sdks/python/apache_beam/version.py
+++ b/sdks/python/apache_beam/version.py
@@ -21,7 +21,7 @@
 import re
 
 
-__version__ = '0.3.0-incubating.dev'  # TODO: PEP 440 and incubating suffix
+__version__ = '0.6.0.dev'
 
 
 # The following utilities are legacy code from the Maven integration;
@@ -40,7 +40,6 @@ def get_version_from_pom():
 search = pattern.search(pom)
 version = search.group(1)
 version = version.replace("-SNAPSHOT", ".dev")
-# TODO: PEP 440 and incubating suffix
 return version
 
 



[13/50] [abbrv] beam git commit: Revert "Remove dataflow_test.py"

2017-01-30 Thread davor
Revert "Remove dataflow_test.py"

This reverts commit d5b90d8383e662e803ea79b31661250a043bcfd2.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/96fcc7d3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/96fcc7d3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/96fcc7d3

Branch: refs/heads/master
Commit: 96fcc7d31c2540f867c3a73903c2aa99183a6b8b
Parents: af49908
Author: Robert Bradshaw 
Authored: Tue Jan 24 09:28:38 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Jan 24 09:28:38 2017 -0800

--
 sdks/python/apache_beam/dataflow_test.py| 418 +++
 .../apache_beam/transforms/ptransform_test.py   |  67 ---
 .../apache_beam/transforms/sideinputs_test.py   | 208 +
 3 files changed, 419 insertions(+), 274 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/96fcc7d3/sdks/python/apache_beam/dataflow_test.py
--
diff --git a/sdks/python/apache_beam/dataflow_test.py 
b/sdks/python/apache_beam/dataflow_test.py
new file mode 100644
index 000..f410230
--- /dev/null
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -0,0 +1,418 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Integration tests for the dataflow package."""
+
+from __future__ import absolute_import
+
+import logging
+import re
+import unittest
+
+import apache_beam as beam
+from apache_beam.pvalue import AsDict
+from apache_beam.pvalue import AsIter as AllOf
+from apache_beam.pvalue import AsList
+from apache_beam.pvalue import AsSingleton
+from apache_beam.pvalue import EmptySideInput
+from apache_beam.pvalue import SideOutputValue
+from apache_beam.test_pipeline import TestPipeline
+from apache_beam.transforms import Create
+from apache_beam.transforms import DoFn
+from apache_beam.transforms import FlatMap
+from apache_beam.transforms import GroupByKey
+from apache_beam.transforms import Map
+from apache_beam.transforms import ParDo
+from apache_beam.transforms import WindowInto
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import equal_to
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.transforms.window import WindowFn
+from nose.plugins.attrib import attr
+
+
+class DataflowTest(unittest.TestCase):
+  """Dataflow integration tests."""
+
+  SAMPLE_DATA = ['aa bb cc aa bb aa \n'] * 10
+  SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)]
+
+  @beam.ptransform_fn
+  def Count(pcoll):  # pylint: disable=invalid-name, no-self-argument
+"""A Count transform: v, ... => (v, n), ..."""
+return (pcoll
+| 'AddCount' >> Map(lambda x: (x, 1))
+| 'GroupCounts' >> GroupByKey()
+| 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones
+
+  @attr('ValidatesRunner')
+  def test_word_count(self):
+pipeline = TestPipeline()
+lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
+result = (
+(lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
+.apply('CountWords', DataflowTest.Count))
+assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
+pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_map(self):
+pipeline = TestPipeline()
+lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
+result = (lines
+  | 'upper' >> Map(str.upper)
+  | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-'))
+assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
+pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_par_do_with_side_input_as_arg(self):
+pipeline = TestPipeline()
+words_list = ['aa', 'bb', 'cc']
+words = pipeline | 'SomeWords' >> Create(words_list)
+prefix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
+suffix = 'zyx'
+result = words | FlatMap(
+'DecorateWords',
+lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
+AsSingleton(prefix), suffix)
+assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list

[36/50] [abbrv] beam git commit: Merge remote-tracking branch 'origin/master' into python-sdk.

2017-01-30 Thread davor
Merge remote-tracking branch 'origin/master' into python-sdk.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2859a55
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2859a55
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2859a55

Branch: refs/heads/master
Commit: c2859a55f89c9807a037adfde9f7e8f506c108ce
Parents: 1bc6859 34b4a6d
Author: Ahmet Altay 
Authored: Fri Jan 27 16:57:44 2017 -0800
Committer: Ahmet Altay 
Committed: Fri Jan 27 16:57:44 2017 -0800

--
 .jenkins/common_job_properties.groovy   |9 +-
 ...job_beam_PostCommit_Java_MavenInstall.groovy |2 +-
 .../job_beam_PreCommit_Java_MavenInstall.groovy |2 +-
 .../job_beam_Release_NightlySnapshot.groovy |2 +-
 .jenkins/job_seed.groovy|2 +-
 .travis/README.md   |2 +-
 DISCLAIMER  |   10 -
 NOTICE  |4 +-
 README.md   |   46 +-
 examples/java/README.md |   16 +-
 examples/java/pom.xml   |   21 +-
 .../beam/examples/DebuggingWordCount.java   |4 +-
 .../org/apache/beam/examples/WordCount.java |6 +-
 .../beam/examples/complete/AutoComplete.java|2 +-
 .../org/apache/beam/examples/complete/README.md |   14 +-
 .../apache/beam/examples/complete/TfIdf.java|2 +-
 .../examples/complete/TopWikipediaSessions.java |2 +-
 .../examples/complete/TrafficMaxLaneFlow.java   |2 +-
 .../beam/examples/complete/TrafficRoutes.java   |2 +-
 .../examples/cookbook/BigQueryTornadoes.java|2 +-
 .../cookbook/CombinePerKeyExamples.java |2 +-
 .../org/apache/beam/examples/cookbook/README.md |   14 +-
 .../beam/examples/cookbook/TriggerExample.java  |4 +-
 .../beam/examples/WindowedWordCountIT.java  |   16 +-
 examples/java8/pom.xml  |2 +-
 .../beam/examples/complete/game/GameStats.java  |7 +-
 .../examples/complete/game/LeaderBoard.java |5 +-
 .../beam/examples/complete/game/UserScore.java  |2 +-
 examples/pom.xml|   16 +-
 pom.xml |   41 +-
 runners/apex/README.md  |4 +-
 runners/apex/pom.xml|3 +-
 .../beam/runners/apex/ApexPipelineOptions.java  |7 +-
 .../apache/beam/runners/apex/ApexRunner.java|   43 +-
 .../beam/runners/apex/ApexYarnLauncher.java |   23 +-
 .../translation/CreateValuesTranslator.java |   18 +-
 .../FlattenPCollectionTranslator.java   |   28 +-
 .../apex/translation/GroupByKeyTranslator.java  |2 +-
 .../translation/ParDoBoundMultiTranslator.java  |   27 +-
 .../apex/translation/ParDoBoundTranslator.java  |4 +-
 .../apex/translation/TranslationContext.java|   27 +-
 .../apex/translation/WindowBoundTranslator.java |8 +-
 .../operators/ApexGroupByKeyOperator.java   |4 +-
 .../operators/ApexParDoOperator.java|6 +-
 .../ApexReadUnboundedInputOperator.java |   17 +-
 .../beam/runners/apex/ApexRunnerTest.java   |   75 ++
 .../beam/runners/apex/ApexYarnLauncherTest.java |9 +-
 .../runners/apex/examples/WordCountTest.java|2 +-
 .../translation/ParDoBoundTranslatorTest.java   |6 +-
 .../translation/ReadUnboundTranslatorTest.java  |8 +-
 .../utils/ApexStateInternalsTest.java   |2 +-
 .../test/resources/beam-runners-apex.properties |   20 +
 runners/core-java/pom.xml   |2 +-
 .../beam/runners/core/AssignWindowsDoFn.java|3 +-
 .../apache/beam/runners/core/DoFnAdapters.java  |  343 ++
 .../apache/beam/runners/core/DoFnRunner.java|   21 -
 .../apache/beam/runners/core/DoFnRunners.java   |  138 +--
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   10 +-
 .../runners/core/GroupAlsoByWindowsDoFn.java|5 +-
 .../beam/runners/core/KeyedWorkItemCoder.java   |4 +-
 .../core/LateDataDroppingDoFnRunner.java|1 -
 .../apache/beam/runners/core/NonEmptyPanes.java |2 +-
 .../org/apache/beam/runners/core/OldDoFn.java   |  472 
 .../runners/core/PerKeyCombineFnRunner.java |   70 --
 .../runners/core/PerKeyCombineFnRunners.java|  101 --
 .../beam/runners/core/SimpleDoFnRunner.java |   63 -
 .../beam/runners/core/SimpleOldDoFnRunner.java  |7 +-
 .../beam/runners/core/SplittableParDo.java  |7 -
 .../core/UnboundedReadFromBoundedSource.java|   14 +-
 .../AfterDelayFromFirstElementStateMachine.java |2 +-
 .../core/triggers/AfterPaneStateMachine.java|2 +-
 .../core/DoFnDelegatingAggregatorTest.java  |  144 +++
 .../core/GroupAlsoByWindowsProperties.java  |2 +-
 .../runners/core/KeyedWorkItemCoderTest.java|6 +
 .../core

[25/50] [abbrv] beam git commit: Revert "Revert "Remove dataflow_test.py""

2017-01-30 Thread davor
Revert "Revert "Remove dataflow_test.py""

This reverts commit 96fcc7d31c2540f867c3a73903c2aa99183a6b8b.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2aa7d47e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2aa7d47e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2aa7d47e

Branch: refs/heads/master
Commit: 2aa7d47e1491e0601b7b4d1476a8f182b2a14dc3
Parents: 4e1028b
Author: Robert Bradshaw 
Authored: Tue Jan 24 16:33:55 2017 -0800
Committer: Robert Bradshaw 
Committed: Wed Jan 25 16:18:09 2017 -0800

--
 sdks/python/apache_beam/dataflow_test.py| 418 ---
 .../apache_beam/transforms/ptransform_test.py   |  67 +++
 .../apache_beam/transforms/sideinputs_test.py   | 208 -
 3 files changed, 274 insertions(+), 419 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/2aa7d47e/sdks/python/apache_beam/dataflow_test.py
--
diff --git a/sdks/python/apache_beam/dataflow_test.py 
b/sdks/python/apache_beam/dataflow_test.py
deleted file mode 100644
index f410230..000
--- a/sdks/python/apache_beam/dataflow_test.py
+++ /dev/null
@@ -1,418 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Integration tests for the dataflow package."""
-
-from __future__ import absolute_import
-
-import logging
-import re
-import unittest
-
-import apache_beam as beam
-from apache_beam.pvalue import AsDict
-from apache_beam.pvalue import AsIter as AllOf
-from apache_beam.pvalue import AsList
-from apache_beam.pvalue import AsSingleton
-from apache_beam.pvalue import EmptySideInput
-from apache_beam.pvalue import SideOutputValue
-from apache_beam.test_pipeline import TestPipeline
-from apache_beam.transforms import Create
-from apache_beam.transforms import DoFn
-from apache_beam.transforms import FlatMap
-from apache_beam.transforms import GroupByKey
-from apache_beam.transforms import Map
-from apache_beam.transforms import ParDo
-from apache_beam.transforms import WindowInto
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
-from apache_beam.transforms.window import IntervalWindow
-from apache_beam.transforms.window import WindowFn
-from nose.plugins.attrib import attr
-
-
-class DataflowTest(unittest.TestCase):
-  """Dataflow integration tests."""
-
-  SAMPLE_DATA = ['aa bb cc aa bb aa \n'] * 10
-  SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)]
-
-  @beam.ptransform_fn
-  def Count(pcoll):  # pylint: disable=invalid-name, no-self-argument
-"""A Count transform: v, ... => (v, n), ..."""
-return (pcoll
-| 'AddCount' >> Map(lambda x: (x, 1))
-| 'GroupCounts' >> GroupByKey()
-| 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones
-
-  @attr('ValidatesRunner')
-  def test_word_count(self):
-pipeline = TestPipeline()
-lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
-result = (
-(lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
-.apply('CountWords', DataflowTest.Count))
-assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
-pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_map(self):
-pipeline = TestPipeline()
-lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
-result = (lines
-  | 'upper' >> Map(str.upper)
-  | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-'))
-assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
-pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_par_do_with_side_input_as_arg(self):
-pipeline = TestPipeline()
-words_list = ['aa', 'bb', 'cc']
-words = pipeline | 'SomeWords' >> Create(words_list)
-prefix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
-suffix = 'zyx'
-result = words | FlatMap(
-'DecorateWords',
-lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
-AsSingleton(prefix), suffix)
-assert_that(result, equal_to(['xyz-%s-zyx' % x for x 

[14/50] [abbrv] beam git commit: Closes #1831

2017-01-30 Thread davor
Closes #1831


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0dc1f37
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0dc1f37
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0dc1f37

Branch: refs/heads/master
Commit: d0dc1f375982bab747eda8ea26f4a41b15a1ec01
Parents: af49908 96fcc7d
Author: Robert Bradshaw 
Authored: Tue Jan 24 10:06:03 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Jan 24 10:06:03 2017 -0800

--
 sdks/python/apache_beam/dataflow_test.py| 418 +++
 .../apache_beam/transforms/ptransform_test.py   |  67 ---
 .../apache_beam/transforms/sideinputs_test.py   | 208 +
 3 files changed, 419 insertions(+), 274 deletions(-)
--




[21/50] [abbrv] beam git commit: Closes #1811

2017-01-30 Thread davor
Closes #1811


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59242205
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59242205
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59242205

Branch: refs/heads/master
Commit: 592422059e21bf72fc7b4842d6fd6df000a7d2a7
Parents: 9540cf1 61d8d3f
Author: Robert Bradshaw 
Authored: Wed Jan 25 12:38:03 2017 -0800
Committer: Robert Bradshaw 
Committed: Wed Jan 25 12:38:03 2017 -0800

--
 sdks/python/apache_beam/pipeline_test.py | 57 ++-
 1 file changed, 21 insertions(+), 36 deletions(-)
--




[11/50] [abbrv] beam git commit: Closes #1812

2017-01-30 Thread davor
Closes #1812


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/af49908b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/af49908b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/af49908b

Branch: refs/heads/master
Commit: af49908b8fb5bb34428343218461660d41ead399
Parents: deb2aea 6cb2f37
Author: Robert Bradshaw 
Authored: Mon Jan 23 14:37:45 2017 -0800
Committer: Robert Bradshaw 
Committed: Mon Jan 23 14:37:45 2017 -0800

--
 sdks/python/apache_beam/runners/common.py | 34 ++
 1 file changed, 19 insertions(+), 15 deletions(-)
--




[40/50] [abbrv] beam git commit: [BEAM-843] Use New DoFn Directly in Flink Runner

2017-01-30 Thread davor
[BEAM-843] Use New DoFn Directly in Flink Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4aaaf8fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4aaaf8fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4aaaf8fb

Branch: refs/heads/master
Commit: 4aaaf8fb94222e6e283fa79d9c7dc8d9a730d278
Parents: 27cf68e
Author: JingsongLi 
Authored: Wed Jan 18 11:34:06 2017 +0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:38:38 2017 -0800

--
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   | 156 +++
 .../wrappers/streaming/DoFnOperator.java|  69 
 .../wrappers/streaming/WindowDoFnOperator.java  | 143 +
 3 files changed, 264 insertions(+), 104 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
new file mode 100644
index 000..cff6e00
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.Collection;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic 
to the
+ * {@link ReduceFnRunner}.
+ */
+@SystemDoFnInternal
+public class GroupAlsoByWindowViaWindowSetNewDoFn<
+K, InputT, OutputT, W extends BoundedWindow, RinT extends 
KeyedWorkItem>
+extends DoFn> {
+
+  private static final long serialVersionUID = 1L;
+
+  public static 
+  DoFn, KV> create(
+  WindowingStrategy strategy,
+  StateInternalsFactory stateInternalsFactory,
+  TimerInternalsFactory timerInternalsFactory,
+  SideInputReader sideInputReader,
+  SystemReduceFn reduceFn,
+  DoFnRunners.OutputManager outputManager,
+  TupleTag> mainTag) {
+return new GroupAlsoByWindowViaWindowSetNewDoFn<>(
+strategy, stateInternalsFactory, timerInternalsFactory, 
sideInputReader,
+reduceFn, outputManager, mainTag);
+  }
+
+  protected final Aggregator droppedDueToClosedWindow =
+  createAggregator(
+  GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, 
Sum.ofLongs());
+  protected final Aggregator droppedDueToLateness =
+  createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, 
Sum.ofLongs());
+  private final WindowingStrategy windowingStrategy;
+  private SystemReduceFn reduceFn;
+  private transient StateInternalsFactory stateInternalsFactory;
+  private transient TimerInternalsFactory timerInternalsFactory;
+  private transient SideInputReader sideInputReader;
+  private transient DoFnRunners.OutputManager outputManager

[02/50] [abbrv] beam git commit: Closes #1809

2017-01-30 Thread davor
Closes #1809


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/946135f6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/946135f6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/946135f6

Branch: refs/heads/master
Commit: 946135f6a955d9e27e7553c4cefef354ecd2535d
Parents: c03e6f3 56512ab
Author: Robert Bradshaw 
Authored: Sat Jan 21 00:30:35 2017 -0800
Committer: Robert Bradshaw 
Committed: Sat Jan 21 00:30:35 2017 -0800

--
 sdks/python/apache_beam/runners/dataflow_runner.py  | 6 --
 sdks/python/apache_beam/runners/dataflow_runner_test.py | 5 ++---
 2 files changed, 6 insertions(+), 5 deletions(-)
--




[18/50] [abbrv] beam git commit: Closes #1820

2017-01-30 Thread davor
Closes #1820


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/43cb4d70
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/43cb4d70
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/43cb4d70

Branch: refs/heads/master
Commit: 43cb4d70980af758bfea9a3c65530ca53a6239ec
Parents: f983123 52fc95d
Author: Robert Bradshaw 
Authored: Tue Jan 24 16:31:03 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Jan 24 16:31:03 2017 -0800

--
 sdks/python/apache_beam/io/fileio.py| 542 +-
 sdks/python/apache_beam/io/fileio_test.py   | 729 +--
 .../runners/direct/transform_evaluator.py   |   5 -
 3 files changed, 3 insertions(+), 1273 deletions(-)
--




[05/50] [abbrv] beam git commit: Add some typing to prevent speed regression for old_dofn.

2017-01-30 Thread davor
Add some typing to prevent speed regression for old_dofn.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f68c9dc8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f68c9dc8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f68c9dc8

Branch: refs/heads/master
Commit: f68c9dc8d17881c43c31922375fec9593265cc5d
Parents: d0474ab
Author: Robert Bradshaw 
Authored: Sat Jan 21 20:52:02 2017 -0800
Committer: Robert Bradshaw 
Committed: Sat Jan 21 20:52:02 2017 -0800

--
 sdks/python/apache_beam/runners/common.pxd | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/f68c9dc8/sdks/python/apache_beam/runners/common.pxd
--
diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index 06fe434..10d1f96 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -36,15 +36,17 @@ cdef class DoFnRunner(Receiver):
   cdef object tagged_receivers
   cdef LoggingContext logging_context
   cdef object step_name
-  cdef object is_new_dofn
+  cdef bint is_new_dofn
   cdef object args
-  cdef object kwargs
+  cdef dict kwargs
   cdef object side_inputs
   cdef bint has_windowed_side_inputs
 
   cdef Receiver main_receivers
 
   cpdef process(self, WindowedValue element)
+  cdef old_dofn_process(self, WindowedValue element)
+  cdef new_dofn_process(self, WindowedValue element)
 
   @cython.locals(windowed_value=WindowedValue)
   cpdef _process_outputs(self, WindowedValue element, results)



[24/50] [abbrv] beam git commit: Use a temp directory for requirements cache in test_with_requirements_file

2017-01-30 Thread davor
Use a temp directory for requirements cache in
test_with_requirements_file

The test fails if there are leftover files in the default folder for
requirements cache either from earlier tests, or from the previous
workspaces.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5787e817
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5787e817
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5787e817

Branch: refs/heads/master
Commit: 5787e817a7eda4859963d535df21f2fa00edf8af
Parents: 5924220
Author: Ahmet Altay 
Authored: Wed Jan 25 09:57:18 2017 -0800
Committer: Robert Bradshaw 
Committed: Wed Jan 25 16:16:52 2017 -0800

--
 .../python/apache_beam/utils/dependency_test.py | 47 +++-
 1 file changed, 27 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/5787e817/sdks/python/apache_beam/utils/dependency_test.py
--
diff --git a/sdks/python/apache_beam/utils/dependency_test.py 
b/sdks/python/apache_beam/utils/dependency_test.py
index a484d60..75a89e2 100644
--- a/sdks/python/apache_beam/utils/dependency_test.py
+++ b/sdks/python/apache_beam/utils/dependency_test.py
@@ -106,27 +106,34 @@ class SetupTest(unittest.TestCase):
 dependency.stage_job_resources(options))
 
   def test_with_requirements_file(self):
-staging_dir = tempfile.mkdtemp()
-source_dir = tempfile.mkdtemp()
+try:
+  staging_dir = tempfile.mkdtemp()
+  requirements_cache_dir = tempfile.mkdtemp()
+  source_dir = tempfile.mkdtemp()
 
-options = PipelineOptions()
-options.view_as(GoogleCloudOptions).staging_location = staging_dir
-self.update_options(options)
-options.view_as(SetupOptions).requirements_file = os.path.join(
-source_dir, dependency.REQUIREMENTS_FILE)
-self.create_temp_file(
-os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
-self.assertEqual(
-sorted([dependency.REQUIREMENTS_FILE,
-'abc.txt', 'def.txt']),
-sorted(dependency.stage_job_resources(
-options,
-populate_requirements_cache=self.populate_requirements_cache)))
-self.assertTrue(
-os.path.isfile(
-os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
-self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
-self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
+  options = PipelineOptions()
+  options.view_as(GoogleCloudOptions).staging_location = staging_dir
+  self.update_options(options)
+  options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
+  options.view_as(SetupOptions).requirements_file = os.path.join(
+  source_dir, dependency.REQUIREMENTS_FILE)
+  self.create_temp_file(
+  os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
+  self.assertEqual(
+  sorted([dependency.REQUIREMENTS_FILE,
+  'abc.txt', 'def.txt']),
+  sorted(dependency.stage_job_resources(
+  options,
+  populate_requirements_cache=self.populate_requirements_cache)))
+  self.assertTrue(
+  os.path.isfile(
+  os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
+  self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
+  self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
+finally:
+  shutil.rmtree(staging_dir)
+  shutil.rmtree(requirements_cache_dir)
+  shutil.rmtree(source_dir)
 
   def test_requirements_file_not_present(self):
 staging_dir = tempfile.mkdtemp()



[29/50] [abbrv] beam git commit: Refactoring metrics infrastructure

2017-01-30 Thread davor
Refactoring metrics infrastructure


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b148f5cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b148f5cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b148f5cc

Branch: refs/heads/master
Commit: b148f5cc9f3e414b9cd1f605b25d50e21f626b7a
Parents: e3849af
Author: Pablo 
Authored: Mon Jan 23 17:50:21 2017 -0800
Committer: Robert Bradshaw 
Committed: Thu Jan 26 15:28:49 2017 -0800

--
 sdks/python/apache_beam/metrics/execution.pxd   | 31 +
 sdks/python/apache_beam/metrics/execution.py| 70 
 sdks/python/apache_beam/runners/common.pxd  |  2 +
 sdks/python/apache_beam/runners/common.py   | 11 ++-
 .../apache_beam/runners/direct/executor.py  | 12 ++--
 .../runners/direct/transform_evaluator.py   | 54 ---
 sdks/python/setup.py|  1 +
 7 files changed, 125 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/metrics/execution.pxd
--
diff --git a/sdks/python/apache_beam/metrics/execution.pxd 
b/sdks/python/apache_beam/metrics/execution.pxd
new file mode 100644
index 000..d89004f
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/execution.pxd
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cimport cython
+
+
+cdef class MetricsContainer(object):
+  cdef object step_name
+  cdef public object counters
+  cdef public object distributions
+
+
+cdef class ScopedMetricsContainer(object):
+  cpdef enter(self)
+  cpdef exit(self)
+  cdef list _stack
+  cdef MetricsContainer _container

http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/metrics/execution.py
--
diff --git a/sdks/python/apache_beam/metrics/execution.py 
b/sdks/python/apache_beam/metrics/execution.py
index 8f04b7b..3ba1735 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -98,36 +98,49 @@ class MetricResult(object):
 self.key, self.committed, self.attempted)
 
 
-class MetricsEnvironment(object):
+class _MetricsEnvironment(object):
   """Holds the MetricsContainer for every thread and other metric information.
 
   This class is not meant to be instantiated, instead being used to keep
   track of global state.
   """
-  METRICS_SUPPORTED = False
-  _METRICS_SUPPORTED_LOCK = threading.Lock()
-
-  PER_THREAD = threading.local()
+  def __init__(self):
+self.METRICS_SUPPORTED = False
+self._METRICS_SUPPORTED_LOCK = threading.Lock()
+self.PER_THREAD = threading.local()
+self.set_container_stack()
+
+  def set_container_stack(self):
+if not hasattr(self.PER_THREAD, 'container'):
+  self.PER_THREAD.container = []
+
+  def container_stack(self):
+self.set_container_stack()
+return self.PER_THREAD.container
+
+  def set_metrics_supported(self, supported):
+self.set_container_stack()
+with self._METRICS_SUPPORTED_LOCK:
+  self.METRICS_SUPPORTED = supported
+
+  def current_container(self):
+self.set_container_stack()
+index = len(self.PER_THREAD.container) - 1
+if index < 0:
+  return None
+else:
+  return self.PER_THREAD.container[index]
 
-  @classmethod
-  def set_metrics_supported(cls, supported):
-with cls._METRICS_SUPPORTED_LOCK:
-  cls.METRICS_SUPPORTED = supported
+  def set_current_container(self, container):
+self.set_container_stack()
+self.PER_THREAD.container.append(container)
 
-  @classmethod
-  def current_container(cls):
-try:
-  return cls.PER_THREAD.container
-except AttributeError:
-  return None
+  def unset_current_container(self):
+self.set_container_stack()
+self.PER_THREAD.container.pop()
 
-  @classmethod
-  def set_current_container(cls, container):
-cls.PER_THREAD.container = container
 
-  @classmethod
-  def unset_current_conta

[50/50] [abbrv] beam git commit: This closes #1872

2017-01-30 Thread davor
This closes #1872


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c3b97a28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c3b97a28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c3b97a28

Branch: refs/heads/master
Commit: c3b97a2878a6ccb7b380cb7724ee0719a1d25d2e
Parents: 847e4e9 2d7ce32
Author: Davor Bonaci 
Authored: Mon Jan 30 14:59:03 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 14:59:03 2017 -0800

--
 .gitignore  |   10 +
 .travis.yml |   21 +-
 pom.xml |   20 +
 sdks/pom.xml|2 +
 sdks/python/.pylintrc   |  164 +
 sdks/python/MANIFEST.in |   19 +
 sdks/python/README.md   |  372 ++
 sdks/python/apache_beam/__init__.py |   82 +
 sdks/python/apache_beam/coders/__init__.py  |   19 +
 sdks/python/apache_beam/coders/coder_impl.pxd   |  143 +
 sdks/python/apache_beam/coders/coder_impl.py|  597 +++
 sdks/python/apache_beam/coders/coders.py|  707 +++
 sdks/python/apache_beam/coders/coders_test.py   |  115 +
 .../apache_beam/coders/coders_test_common.py|  355 ++
 .../apache_beam/coders/fast_coders_test.py  |   37 +
 sdks/python/apache_beam/coders/observable.py|   38 +
 .../apache_beam/coders/observable_test.py   |   57 +
 .../coders/proto2_coder_test_messages_pb2.py|  318 ++
 .../apache_beam/coders/slow_coders_test.py  |   39 +
 sdks/python/apache_beam/coders/slow_stream.py   |  154 +
 .../apache_beam/coders/standard_coders.yaml |   67 +
 .../apache_beam/coders/standard_coders_test.py  |  136 +
 sdks/python/apache_beam/coders/stream.pxd   |   63 +
 sdks/python/apache_beam/coders/stream.pyx   |  215 +
 sdks/python/apache_beam/coders/stream_test.py   |  169 +
 sdks/python/apache_beam/coders/typecoders.py|  186 +
 .../apache_beam/coders/typecoders_test.py   |  117 +
 sdks/python/apache_beam/error.py|   42 +
 sdks/python/apache_beam/examples/__init__.py|   16 +
 .../apache_beam/examples/complete/__init__.py   |   16 +
 .../examples/complete/autocomplete.py   |   89 +
 .../examples/complete/autocomplete_test.py  |   52 +
 .../examples/complete/estimate_pi.py|  125 +
 .../examples/complete/estimate_pi_test.py   |   52 +
 .../examples/complete/juliaset/__init__.py  |   16 +
 .../complete/juliaset/juliaset/__init__.py  |   16 +
 .../complete/juliaset/juliaset/juliaset.py  |  123 +
 .../complete/juliaset/juliaset/juliaset_test.py |   86 +
 .../examples/complete/juliaset/juliaset_main.py |   58 +
 .../examples/complete/juliaset/setup.py |  116 +
 .../apache_beam/examples/complete/tfidf.py  |  208 +
 .../apache_beam/examples/complete/tfidf_test.py |   92 +
 .../examples/complete/top_wikipedia_sessions.py |  180 +
 .../complete/top_wikipedia_sessions_test.py |   62 +
 .../apache_beam/examples/cookbook/__init__.py   |   16 +
 .../examples/cookbook/bigquery_schema.py|  130 +
 .../examples/cookbook/bigquery_side_input.py|  123 +
 .../cookbook/bigquery_side_input_test.py|   54 +
 .../examples/cookbook/bigquery_tornadoes.py |   99 +
 .../cookbook/bigquery_tornadoes_test.py |   44 +
 .../apache_beam/examples/cookbook/bigshuffle.py |   95 +
 .../examples/cookbook/bigshuffle_test.py|   63 +
 .../apache_beam/examples/cookbook/coders.py |  101 +
 .../examples/cookbook/coders_test.py|   49 +
 .../examples/cookbook/combiners_test.py |   74 +
 .../examples/cookbook/custom_ptransform.py  |  134 +
 .../examples/cookbook/custom_ptransform_test.py |   53 +
 .../examples/cookbook/datastore_wordcount.py|  256 ++
 .../apache_beam/examples/cookbook/filters.py|  107 +
 .../examples/cookbook/filters_test.py   |   69 +
 .../examples/cookbook/group_with_coder.py   |  122 +
 .../examples/cookbook/group_with_coder_test.py  |   89 +
 .../examples/cookbook/mergecontacts.py  |  133 +
 .../examples/cookbook/mergecontacts_test.py |  125 +
 .../examples/cookbook/multiple_output_pardo.py  |  181 +
 .../cookbook/multiple_output_pardo_test.py  |   72 +
 .../apache_beam/examples/snippets/__init__.py   |   16 +
 .../apache_beam/examples/snippets/snippets.py   | 1142 +
 .../examples/snippets/snippets_test.py  |  758 
 .../apache_beam/examples/streaming_wordcap.py   |   64 +
 .../apache_beam/examples/streaming_wordcount.py |   74 +
 sdks/python/apache_beam/examples/wordcount.py   |  109 +
 .../apache_beam/examples/wordcount_debugging.py |  166 +
 .../examples/wordcount_debugging_test.py|   59 +
 .../apache_beam/examples/wordcount_it_test.py   |   59 +
 .../apache_beam/examples/wordcount_minimal.py   |  121 

[19/50] [abbrv] beam git commit: Install test dependencies in the post commit script.

2017-01-30 Thread davor
Install test dependencies in the post commit script.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/19789db9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/19789db9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/19789db9

Branch: refs/heads/master
Commit: 19789db9cab031e0891cb67c4ab6b8b03c6a8c09
Parents: 43cb4d7
Author: Ahmet Altay 
Authored: Tue Jan 24 15:13:03 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Jan 24 16:32:36 2017 -0800

--
 sdks/python/run_postcommit.sh | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/19789db9/sdks/python/run_postcommit.sh
--
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 2e419a5..3756075 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -70,6 +70,10 @@ python setup.py sdist
 
 SDK_LOCATION=$(find dist/apache-beam-sdk-*.tar.gz)
 
+# Install test dependencies for ValidatesRunner tests.
+echo "pyhamcrest" > postcommit_requirements.txt
+echo "mock" >> postcommit_requirements.txt
+
 # Run ValidatesRunner tests on Google Cloud Dataflow service
 echo ">>> RUNNING DATAFLOW RUNNER VALIDATESRUNNER TESTS"
 python setup.py nosetests \
@@ -80,6 +84,7 @@ python setup.py nosetests \
 --temp_location=$GCS_LOCATION/temp-validatesrunner-test \
 --sdk_location=$SDK_LOCATION \
 --job_name=$JOBNAME_VR_TEST \
+--requirements_file=postcommit_requirements.txt \
 --num_workers=1"
 
 # Run wordcount on the Google Cloud Dataflow service



[44/50] [abbrv] beam git commit: Add mock time to slow bigquery unit tests.

2017-01-30 Thread davor
Add mock time to slow bigquery unit tests.

Unit tests, testing retries does not need to use real time. This change
reduces the total tox time for unit tests from 235 seconds to 73 seconds
locally.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e02ddac3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e02ddac3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e02ddac3

Branch: refs/heads/master
Commit: e02ddac308b8b1ea0bd0cb0ae4f9ba4908a50595
Parents: 475707f
Author: Ahmet Altay 
Authored: Fri Jan 27 17:35:24 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:44:55 2017 -0800

--
 sdks/python/apache_beam/io/bigquery_test.py | 26 
 1 file changed, 18 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/e02ddac3/sdks/python/apache_beam/io/bigquery_test.py
--
diff --git a/sdks/python/apache_beam/io/bigquery_test.py 
b/sdks/python/apache_beam/io/bigquery_test.py
index b8682d1..14eb035 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -539,7 +539,8 @@ class TestBigQueryReader(unittest.TestCase):
 
 class TestBigQueryWriter(unittest.TestCase):
 
-  def test_no_table_and_create_never(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_no_table_and_create_never(self, patched_time_sleep):
 client = mock.Mock()
 client.tables.Get.side_effect = HttpError(
 response={'status': '404'}, url='', content='')
@@ -572,7 +573,9 @@ class TestBigQueryWriter(unittest.TestCase):
 self.assertTrue(client.tables.Get.called)
 self.assertTrue(client.tables.Insert.called)
 
-  def test_no_table_and_create_if_needed_and_no_schema(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_no_table_and_create_if_needed_and_no_schema(
+  self, patched_time_sleep):
 client = mock.Mock()
 client.tables.Get.side_effect = HttpError(
 response={'status': '404'}, url='', content='')
@@ -587,7 +590,9 @@ class TestBigQueryWriter(unittest.TestCase):
 'Table project:dataset.table requires a schema. None can be inferred '
 'because the table does not exist.')
 
-  def test_table_not_empty_and_write_disposition_empty(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_table_not_empty_and_write_disposition_empty(
+  self, patched_time_sleep):
 client = mock.Mock()
 client.tables.Get.return_value = bigquery.Table(
 tableReference=bigquery.TableReference(
@@ -712,7 +717,8 @@ class TestBigQueryWrapper(unittest.TestCase):
 wrapper._delete_dataset('', '')
 self.assertTrue(client.datasets.Delete.called)
 
-  def test_delete_dataset_retries_fail(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_delete_dataset_retries_fail(self, patched_time_sleep):
 client = mock.Mock()
 client.datasets.Delete.side_effect = ValueError("Cannot delete")
 wrapper = beam.io.bigquery.BigQueryWrapper(client)
@@ -730,7 +736,8 @@ class TestBigQueryWrapper(unittest.TestCase):
 wrapper._delete_table('', '', '')
 self.assertTrue(client.tables.Delete.called)
 
-  def test_delete_table_retries_fail(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_delete_table_retries_fail(self, patched_time_sleep):
 client = mock.Mock()
 client.tables.Delete.side_effect = ValueError("Cannot delete")
 wrapper = beam.io.bigquery.BigQueryWrapper(client)
@@ -738,7 +745,8 @@ class TestBigQueryWrapper(unittest.TestCase):
   wrapper._delete_table('', '', '')
 self.assertTrue(client.tables.Delete.called)
 
-  def test_delete_dataset_retries_for_timeouts(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_delete_dataset_retries_for_timeouts(self, patched_time_sleep):
 client = mock.Mock()
 client.datasets.Delete.side_effect = [
 HttpError(
@@ -749,7 +757,8 @@ class TestBigQueryWrapper(unittest.TestCase):
 wrapper._delete_dataset('', '')
 self.assertTrue(client.datasets.Delete.called)
 
-  def test_delete_table_retries_for_timeouts(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_delete_table_retries_for_timeouts(self, patched_time_sleep):
 client = mock.Mock()
 client.tables.Delete.side_effect = [
 HttpError(
@@ -760,7 +769,8 @@ class TestBigQueryWrapper(unittest.TestCase):
 wrapper._delete_table('', '', '')
 self.assertTrue(client.tables.Delete.called)
 
-  def test_temporary_dataset_is_unique(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_temporary_dataset_is_unique(self, patched_time_sleep):
 client = mock.Mock()
 client.datasets.Get.return_value = bigquery.Dataset(
 datasetReference=bigquery.D

[16/50] [abbrv] beam git commit: Closes #1832

2017-01-30 Thread davor
Closes #1832


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9831236
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9831236
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9831236

Branch: refs/heads/master
Commit: f9831236c4a94af35a0a40a649323b578a3d92e4
Parents: d0dc1f3 9052366
Author: Robert Bradshaw 
Authored: Tue Jan 24 13:48:12 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Jan 24 13:48:12 2017 -0800

--
 sdks/python/apache_beam/runners/common.py | 3 +++
 1 file changed, 3 insertions(+)
--




[03/50] [abbrv] beam git commit: Implement Annotation based NewDoFn in python SDK

2017-01-30 Thread davor
Implement Annotation based NewDoFn in python SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e272ecf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e272ecf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e272ecf

Branch: refs/heads/master
Commit: 9e272ecf639b7b13f23a83868fd101a437159c1c
Parents: 946135f
Author: Sourabh Bajaj 
Authored: Fri Jan 20 17:17:25 2017 -0800
Committer: Robert Bradshaw 
Committed: Sat Jan 21 20:37:07 2017 -0800

--
 sdks/python/apache_beam/pipeline_test.py| 100 -
 sdks/python/apache_beam/runners/common.pxd  |   4 +
 sdks/python/apache_beam/runners/common.py   | 221 +--
 .../runners/direct/transform_evaluator.py   |  15 +-
 sdks/python/apache_beam/transforms/core.py  | 113 +-
 sdks/python/apache_beam/typehints/decorators.py |   2 +-
 sdks/python/apache_beam/typehints/typecheck.py  | 145 
 7 files changed, 531 insertions(+), 69 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/9e272ecf/sdks/python/apache_beam/pipeline_test.py
--
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 336bf54..93b68d1 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -24,15 +24,23 @@ import unittest
 from apache_beam.pipeline import Pipeline
 from apache_beam.pipeline import PipelineOptions
 from apache_beam.pipeline import PipelineVisitor
+from apache_beam.pvalue import AsSingleton
 from apache_beam.runners.dataflow.native_io.iobase import NativeSource
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import CombineGlobally
 from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap
 from apache_beam.transforms import Map
+from apache_beam.transforms import NewDoFn
+from apache_beam.transforms import ParDo
 from apache_beam.transforms import PTransform
 from apache_beam.transforms import Read
-from apache_beam.transforms.util import assert_that, equal_to
+from apache_beam.transforms import WindowInto
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import equal_to
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.transforms.window import WindowFn
+from apache_beam.utils.timestamp import MIN_TIMESTAMP
 
 
 class FakeSource(NativeSource):
@@ -241,6 +249,96 @@ class PipelineTest(unittest.TestCase):
 self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x))
 
 
+class NewDoFnTest(unittest.TestCase):
+
+  def setUp(self):
+self.runner_name = 'DirectRunner'
+
+  def test_element(self):
+class TestDoFn(NewDoFn):
+  def process(self, element):
+yield element + 10
+
+pipeline = TestPipeline(runner=self.runner_name)
+pcoll = pipeline | 'Create' >> Create([1, 2]) | 'Do' >> ParDo(TestDoFn())
+assert_that(pcoll, equal_to([11, 12]))
+pipeline.run()
+
+  def test_context_param(self):
+class TestDoFn(NewDoFn):
+  def process(self, element, context=NewDoFn.ContextParam):
+yield context.element + 10
+
+pipeline = TestPipeline(runner=self.runner_name)
+pcoll = pipeline | 'Create' >> Create([1, 2])| 'Do' >> ParDo(TestDoFn())
+assert_that(pcoll, equal_to([11, 12]))
+pipeline.run()
+
+  def test_side_input_no_tag(self):
+class TestDoFn(NewDoFn):
+  def process(self, element, prefix, suffix):
+return ['%s-%s-%s' % (prefix, element, suffix)]
+
+pipeline = TestPipeline()
+words_list = ['aa', 'bb', 'cc']
+words = pipeline | 'SomeWords' >> Create(words_list)
+prefix = 'zyx'
+suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
+result = words | 'DecorateWordsDoFnNoTag' >> ParDo(
+TestDoFn(), prefix, suffix=AsSingleton(suffix))
+assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
+pipeline.run()
+
+  def test_side_input_tagged(self):
+class TestDoFn(NewDoFn):
+  def process(self, element, prefix, suffix=NewDoFn.SideInputParam):
+return ['%s-%s-%s' % (prefix, element, suffix)]
+
+pipeline = TestPipeline()
+words_list = ['aa', 'bb', 'cc']
+words = pipeline | 'SomeWords' >> Create(words_list)
+prefix = 'zyx'
+suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
+result = words | 'DecorateWordsDoFnNoTag' >> ParDo(
+TestDoFn(), prefix, suffix=AsSingleton(suffix))
+assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
+pipeline.run()
+
+  def test_window_param(self):
+class TestDoFn(NewDoFn):
+  def process(self, element, window=NewDoFn.WindowParam):
+yield (float(window.start)

[31/50] [abbrv] beam git commit: Updating dataflow client protos to add new metrics.

2017-01-30 Thread davor
http://git-wip-us.apache.org/repos/asf/beam/blob/901a14c4/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py
--
diff --git 
a/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py 
b/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py
index 178a542..a42154e 100644
--- 
a/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py
+++ 
b/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py
@@ -24,6 +24,7 @@ and continuous computation.
 
 from apitools.base.protorpclite import messages as _messages
 from apitools.base.py import encoding
+from apitools.base.py import extra_types
 
 
 package = 'dataflow'
@@ -193,6 +194,7 @@ class CounterMetadata(_messages.Message):
   AND: Aggregated value represents the logical 'and' of all contributed
 values.
   SET: Aggregated value is a set of unique contributed values.
+  DISTRIBUTION: Aggregated value captures statistics about a distribution.
 """
 INVALID = 0
 SUM = 1
@@ -202,6 +204,7 @@ class CounterMetadata(_messages.Message):
 OR = 5
 AND = 6
 SET = 7
+DISTRIBUTION = 8
 
   class StandardUnitsValueValuesEnum(_messages.Enum):
 """System defined Units, see above enum.
@@ -308,6 +311,7 @@ class CounterUpdate(_messages.Message):
   aggregate value accumulated since the worker started working on this
   WorkItem. By default this is false, indicating that this counter is
   reported as a delta.
+distribution: Distribution data
 floatingPoint: Floating point value for Sum, Max, Min.
 floatingPointList: List of floating point numbers, for Set.
 floatingPointMean: Floating point mean aggregation value for Mean.
@@ -326,34 +330,38 @@ class CounterUpdate(_messages.Message):
 
   boolean = _messages.BooleanField(1)
   cumulative = _messages.BooleanField(2)
-  floatingPoint = _messages.FloatField(3)
-  floatingPointList = _messages.MessageField('FloatingPointList', 4)
-  floatingPointMean = _messages.MessageField('FloatingPointMean', 5)
-  integer = _messages.MessageField('SplitInt64', 6)
-  integerList = _messages.MessageField('IntegerList', 7)
-  integerMean = _messages.MessageField('IntegerMean', 8)
-  internal = _messages.MessageField('extra_types.JsonValue', 9)
-  nameAndKind = _messages.MessageField('NameAndKind', 10)
-  shortId = _messages.IntegerField(11)
-  stringList = _messages.MessageField('StringList', 12)
-  structuredNameAndMetadata = 
_messages.MessageField('CounterStructuredNameAndMetadata', 13)
+  distribution = _messages.MessageField('DistributionUpdate', 3)
+  floatingPoint = _messages.FloatField(4)
+  floatingPointList = _messages.MessageField('FloatingPointList', 5)
+  floatingPointMean = _messages.MessageField('FloatingPointMean', 6)
+  integer = _messages.MessageField('SplitInt64', 7)
+  integerList = _messages.MessageField('IntegerList', 8)
+  integerMean = _messages.MessageField('IntegerMean', 9)
+  internal = _messages.MessageField('extra_types.JsonValue', 10)
+  nameAndKind = _messages.MessageField('NameAndKind', 11)
+  shortId = _messages.IntegerField(12)
+  stringList = _messages.MessageField('StringList', 13)
+  structuredNameAndMetadata = 
_messages.MessageField('CounterStructuredNameAndMetadata', 14)
 
 
 class CreateJobFromTemplateRequest(_messages.Message):
-  """Request to create a Dataflow job.
+  """A request to create a Cloud Dataflow job from a template.
 
   Messages:
-ParametersValue: Dynamic parameterization of the job's runtime
-  environment.
+ParametersValue: The runtime parameters to pass to the job.
 
   Fields:
-gcsPath: A path to the serialized JSON representation of the job.
-parameters: Dynamic parameterization of the job's runtime environment.
+environment: The runtime environment for the job.
+gcsPath: Required. A Cloud Storage path to the template from which to
+  create the job. Must be a valid Cloud Storage URL, beginning with
+  `gs://`.
+jobName: Required. The job name to use for the created job.
+parameters: The runtime parameters to pass to the job.
   """
 
   @encoding.MapUnrecognizedFields('additionalProperties')
   class ParametersValue(_messages.Message):
-"""Dynamic parameterization of the job's runtime environment.
+"""The runtime parameters to pass to the job.
 
 Messages:
   AdditionalProperty: An additional property for a ParametersValue object.
@@ -375,8 +383,10 @@ class CreateJobFromTemplateRequest(_messages.Message):
 
 additionalProperties = _messages.MessageField('AdditionalProperty', 1, 
repeated=True)
 
-  gcsPath = _messages.StringField(1)
-  parameters = _messages.MessageField('ParametersValue', 2)
+  environment = _messages.MessageField('RuntimeEnvironment', 1)
+  gcsPath = _messages.StringField(2)
+  jobName = _messages.StringField(3)
+  parameters = _messages.MessageField('Paramet

[47/50] [abbrv] beam git commit: This closes #1871

2017-01-30 Thread davor
This closes #1871


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/be0e32e3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/be0e32e3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/be0e32e3

Branch: refs/heads/master
Commit: be0e32e36313390ed04106d57f4c9dfeabb91b4d
Parents: 1390699 0b4ee73
Author: Davor Bonaci 
Authored: Mon Jan 30 13:25:53 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 13:25:53 2017 -0800

--
 .travis.yml  | 15 +++
 .../examples/cookbook/datastore_wordcount.py |  2 +-
 sdks/python/setup.py |  4 ++--
 3 files changed, 18 insertions(+), 3 deletions(-)
--




[42/50] [abbrv] beam git commit: Updates places in SDK that creates thread pools.

2017-01-30 Thread davor
Updates places in SDK that creates thread pools.

Moves ThreadPool creation to a util function.
Records and resets logging level due to this being reset by  apitools when used 
with a ThreadPool.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51afc1cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51afc1cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51afc1cc

Branch: refs/heads/master
Commit: 51afc1ccfe78a0657b5f9bc139d1d4e7938ed672
Parents: f29527f
Author: Chamikara Jayalath 
Authored: Sat Jan 28 08:54:33 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:43:37 2017 -0800

--
 sdks/python/apache_beam/internal/util.py  | 33 ++
 sdks/python/apache_beam/io/filebasedsource.py | 17 +++
 sdks/python/apache_beam/io/fileio.py  | 11 ++--
 3 files changed, 40 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/internal/util.py
--
diff --git a/sdks/python/apache_beam/internal/util.py 
b/sdks/python/apache_beam/internal/util.py
index 2d12d49..5b31e88 100644
--- a/sdks/python/apache_beam/internal/util.py
+++ b/sdks/python/apache_beam/internal/util.py
@@ -17,6 +17,11 @@
 
 """Utility functions used throughout the package."""
 
+import logging
+from multiprocessing.pool import ThreadPool
+import threading
+import weakref
+
 
 class ArgumentPlaceholder(object):
   """A place holder object replacing PValues in argument lists.
@@ -92,3 +97,31 @@ def insert_values_in_args(args, kwargs, values):
   (k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v)
   for k, v in sorted(kwargs.iteritems()))
   return (new_args, new_kwargs)
+
+
+def run_using_threadpool(fn_to_execute, inputs, pool_size):
+  """Runs the given function on given inputs using a thread pool.
+
+  Args:
+fn_to_execute: Function to execute
+inputs: Inputs on which given function will be executed in parallel.
+pool_size: Size of thread pool.
+  Returns:
+Results retrieved after executing the given function on given inputs.
+  """
+
+  # ThreadPool crashes in old versions of Python (< 2.7.5) if created
+  # from a child thread. (http://bugs.python.org/issue10015)
+  if not hasattr(threading.current_thread(), '_children'):
+threading.current_thread()._children = weakref.WeakKeyDictionary()
+  pool = ThreadPool(min(pool_size, len(inputs)))
+  try:
+# We record and reset logging level here since 'apitools' library Beam
+# depends on updates the logging level when used with a threadpool -
+# https://github.com/google/apitools/issues/141
+# TODO: Remove this once above issue in 'apitools' is fixed.
+old_level = logging.getLogger().level
+return pool.map(fn_to_execute, inputs)
+  finally:
+pool.terminate()
+logging.getLogger().setLevel(old_level)

http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/io/filebasedsource.py
--
diff --git a/sdks/python/apache_beam/io/filebasedsource.py 
b/sdks/python/apache_beam/io/filebasedsource.py
index 1bfde25..582d673 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -26,11 +26,9 @@ For an example implementation of ``FileBasedSource`` see 
``avroio.AvroSource``.
 """
 
 import random
-import threading
-import weakref
-from multiprocessing.pool import ThreadPool
 
 from apache_beam.internal import pickler
+from apache_beam.internal import util
 from apache_beam.io import concat_source
 from apache_beam.io import fileio
 from apache_beam.io import iobase
@@ -158,16 +156,9 @@ class FileBasedSource(iobase.BoundedSource):
   return [fileio.ChannelFactory.size_in_bytes(file_names[0])]
 else:
   if pattern is None:
-# ThreadPool crashes in old versions of Python (< 2.7.5) if created
-# from a child thread. (http://bugs.python.org/issue10015)
-if not hasattr(threading.current_thread(), '_children'):
-  threading.current_thread()._children = weakref.WeakKeyDictionary()
-pool = ThreadPool(
-min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names)))
-try:
-  return pool.map(fileio.ChannelFactory.size_in_bytes, file_names)
-finally:
-  pool.terminate()
+return util.run_using_threadpool(
+fileio.ChannelFactory.size_in_bytes, file_names,
+MAX_NUM_THREADS_FOR_SIZE_ESTIMATION)
   else:
 file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern,
  file_names)

http://git-wip-us.apache.org/repos/asf/beam/blob

[10/50] [abbrv] beam git commit: Remove dataflow_test.py

2017-01-30 Thread davor
Remove dataflow_test.py

Many of these tests were redundant with tests elsewhere, and the ones
that weren't were put closer to similar tests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d5b90d83
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d5b90d83
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d5b90d83

Branch: refs/heads/master
Commit: d5b90d8383e662e803ea79b31661250a043bcfd2
Parents: 01b3628
Author: Robert Bradshaw 
Authored: Sat Jan 21 21:53:42 2017 -0800
Committer: Robert Bradshaw 
Committed: Mon Jan 23 14:36:55 2017 -0800

--
 sdks/python/apache_beam/dataflow_test.py| 418 ---
 .../apache_beam/transforms/ptransform_test.py   |  67 +++
 .../apache_beam/transforms/sideinputs_test.py   | 208 -
 3 files changed, 274 insertions(+), 419 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/d5b90d83/sdks/python/apache_beam/dataflow_test.py
--
diff --git a/sdks/python/apache_beam/dataflow_test.py 
b/sdks/python/apache_beam/dataflow_test.py
deleted file mode 100644
index f410230..000
--- a/sdks/python/apache_beam/dataflow_test.py
+++ /dev/null
@@ -1,418 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Integration tests for the dataflow package."""
-
-from __future__ import absolute_import
-
-import logging
-import re
-import unittest
-
-import apache_beam as beam
-from apache_beam.pvalue import AsDict
-from apache_beam.pvalue import AsIter as AllOf
-from apache_beam.pvalue import AsList
-from apache_beam.pvalue import AsSingleton
-from apache_beam.pvalue import EmptySideInput
-from apache_beam.pvalue import SideOutputValue
-from apache_beam.test_pipeline import TestPipeline
-from apache_beam.transforms import Create
-from apache_beam.transforms import DoFn
-from apache_beam.transforms import FlatMap
-from apache_beam.transforms import GroupByKey
-from apache_beam.transforms import Map
-from apache_beam.transforms import ParDo
-from apache_beam.transforms import WindowInto
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
-from apache_beam.transforms.window import IntervalWindow
-from apache_beam.transforms.window import WindowFn
-from nose.plugins.attrib import attr
-
-
-class DataflowTest(unittest.TestCase):
-  """Dataflow integration tests."""
-
-  SAMPLE_DATA = ['aa bb cc aa bb aa \n'] * 10
-  SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)]
-
-  @beam.ptransform_fn
-  def Count(pcoll):  # pylint: disable=invalid-name, no-self-argument
-"""A Count transform: v, ... => (v, n), ..."""
-return (pcoll
-| 'AddCount' >> Map(lambda x: (x, 1))
-| 'GroupCounts' >> GroupByKey()
-| 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones
-
-  @attr('ValidatesRunner')
-  def test_word_count(self):
-pipeline = TestPipeline()
-lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
-result = (
-(lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
-.apply('CountWords', DataflowTest.Count))
-assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
-pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_map(self):
-pipeline = TestPipeline()
-lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
-result = (lines
-  | 'upper' >> Map(str.upper)
-  | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-'))
-assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
-pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_par_do_with_side_input_as_arg(self):
-pipeline = TestPipeline()
-words_list = ['aa', 'bb', 'cc']
-words = pipeline | 'SomeWords' >> Create(words_list)
-prefix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
-suffix = 'zyx'
-result = words | FlatMap(
-'DecorateWords',
-lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
-AsSingleton(prefix), suffix)
-assert_that(resu

[33/50] [abbrv] beam git commit: Closes #1857

2017-01-30 Thread davor
Closes #1857


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52d97e2f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52d97e2f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52d97e2f

Branch: refs/heads/master
Commit: 52d97e2fc2e383a58969447addd45ebe3eed4f5f
Parents: 3d6f20d 901a14c
Author: Robert Bradshaw 
Authored: Fri Jan 27 12:00:25 2017 -0800
Committer: Robert Bradshaw 
Committed: Fri Jan 27 12:00:25 2017 -0800

--
 .../clients/dataflow/dataflow_v1b3_client.py| 578 
 .../clients/dataflow/dataflow_v1b3_messages.py  | 931 +--
 2 files changed, 1075 insertions(+), 434 deletions(-)
--




[23/50] [abbrv] beam git commit: Closes #1844

2017-01-30 Thread davor
Closes #1844


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e1028b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e1028b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e1028b3

Branch: refs/heads/master
Commit: 4e1028b3dfeaf02e51eb9f3b5d1a5e78c1cfcbb9
Parents: 5924220 5787e81
Author: Robert Bradshaw 
Authored: Wed Jan 25 16:16:52 2017 -0800
Committer: Robert Bradshaw 
Committed: Wed Jan 25 16:16:52 2017 -0800

--
 .../python/apache_beam/utils/dependency_test.py | 47 +++-
 1 file changed, 27 insertions(+), 20 deletions(-)
--




[17/50] [abbrv] beam git commit: Removes Dataflow native text source and sink from Beam SDK.

2017-01-30 Thread davor
Removes Dataflow native text source and sink from Beam SDK.

Users should be using Beam text source and sink available in module 'textio.py' 
instead of this.

Also removes Dataflow native file source/sink that is only used by native text 
source/sink.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52fc95dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52fc95dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52fc95dd

Branch: refs/heads/master
Commit: 52fc95ddebceaaf27897c4f6d5b97d08bd4b3a1e
Parents: f983123
Author: Chamikara Jayalath 
Authored: Mon Jan 23 13:23:45 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Jan 24 16:31:02 2017 -0800

--
 sdks/python/apache_beam/io/fileio.py| 542 +-
 sdks/python/apache_beam/io/fileio_test.py   | 729 +--
 .../runners/direct/transform_evaluator.py   |   5 -
 3 files changed, 3 insertions(+), 1273 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/52fc95dd/sdks/python/apache_beam/io/fileio.py
--
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index ebc4fed..52f31c6 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -34,12 +34,10 @@ import weakref
 from apache_beam import coders
 from apache_beam.io import gcsio
 from apache_beam.io import iobase
-from apache_beam.io import range_trackers
-from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms.display import DisplayDataItem
 
 
-__all__ = ['TextFileSource', 'TextFileSink']
+__all__ = ['TextFileSink']
 
 DEFAULT_SHARD_NAME_TEMPLATE = '-S-of-N'
 
@@ -111,326 +109,6 @@ class CompressionTypes(object):
 return cls.UNCOMPRESSED
 
 
-class NativeFileSource(dataflow_io.NativeSource):
-  """A source implemented by Dataflow service from a GCS or local file or 
files.
-
-  This class is to be only inherited by sources natively implemented by Cloud
-  Dataflow service, hence should not be sub-classed by users.
-  """
-
-  def __init__(self,
-   file_path,
-   start_offset=None,
-   end_offset=None,
-   coder=coders.BytesCoder(),
-   compression_type=CompressionTypes.AUTO,
-   mime_type='application/octet-stream'):
-"""Initialize a NativeFileSource.
-
-Args:
-  file_path: The file path to read from as a local file path or a GCS
-gs:// path. The path can contain glob characters (*, ?, and [...]
-sets).
-  start_offset: The byte offset in the source file that the reader
-should start reading. By default is 0 (beginning of file).
-  end_offset: The byte offset in the file that the reader should stop
-reading. By default it is the end of the file.
-  compression_type: Used to handle compressed input files. Typical value
-  is CompressionTypes.AUTO, in which case the file_path's extension 
will
-  be used to detect the compression.
-  coder: Coder used to decode each record.
-
-Raises:
-  TypeError: if file_path is not a string.
-
-If the file_path contains glob characters then the start_offset and
-end_offset must not be specified.
-
-The 'start_offset' and 'end_offset' pair provide a mechanism to divide the
-file into multiple pieces for individual sources. Because the offset
-is measured by bytes, some complication arises when the offset splits in
-the middle of a record. To avoid the scenario where two adjacent sources
-each get a fraction of a line we adopt the following rules:
-
-If start_offset falls inside a record (any character except the first one)
-then the source will skip the record and start with the next one.
-
-If end_offset falls inside a record (any character except the first one)
-then the source will contain that entire record.
-"""
-if not isinstance(file_path, basestring):
-  raise TypeError('%s: file_path must be a string;  got %r instead' %
-  (self.__class__.__name__, file_path))
-
-self.file_path = file_path
-self.start_offset = start_offset
-self.end_offset = end_offset
-self.compression_type = compression_type
-self.coder = coder
-self.mime_type = mime_type
-
-  def display_data(self):
-return {'file_pattern': DisplayDataItem(self.file_path,
-label="File Pattern"),
-'compression': DisplayDataItem(str(self.compression_type),
-   label='Compression')}
-
-  def __eq__(self, other):
-return (self.file_path == other.file_path and
-self.start_offset == o

[48/50] [abbrv] beam git commit: Remove sdks/python/LICENSE

2017-01-30 Thread davor
Remove sdks/python/LICENSE


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0ff9973
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0ff9973
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0ff9973

Branch: refs/heads/master
Commit: e0ff9973940c5585376ac2beb6edd1e20de962ac
Parents: be0e32e
Author: Davor Bonaci 
Authored: Mon Jan 30 14:41:32 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 14:41:32 2017 -0800

--
 sdks/python/LICENSE | 202 ---
 1 file changed, 202 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/e0ff9973/sdks/python/LICENSE
--
diff --git a/sdks/python/LICENSE b/sdks/python/LICENSE
deleted file mode 100644
index d645695..000
--- a/sdks/python/LICENSE
+++ /dev/null
@@ -1,202 +0,0 @@
-
- Apache License
-   Version 2.0, January 2004
-http://www.apache.org/licenses/
-
-   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
-   1. Definitions.
-
-  "License" shall mean the terms and conditions for use, reproduction,
-  and distribution as defined by Sections 1 through 9 of this document.
-
-  "Licensor" shall mean the copyright owner or entity authorized by
-  the copyright owner that is granting the License.
-
-  "Legal Entity" shall mean the union of the acting entity and all
-  other entities that control, are controlled by, or are under common
-  control with that entity. For the purposes of this definition,
-  "control" means (i) the power, direct or indirect, to cause the
-  direction or management of such entity, whether by contract or
-  otherwise, or (ii) ownership of fifty percent (50%) or more of the
-  outstanding shares, or (iii) beneficial ownership of such entity.
-
-  "You" (or "Your") shall mean an individual or Legal Entity
-  exercising permissions granted by this License.
-
-  "Source" form shall mean the preferred form for making modifications,
-  including but not limited to software source code, documentation
-  source, and configuration files.
-
-  "Object" form shall mean any form resulting from mechanical
-  transformation or translation of a Source form, including but
-  not limited to compiled object code, generated documentation,
-  and conversions to other media types.
-
-  "Work" shall mean the work of authorship, whether in Source or
-  Object form, made available under the License, as indicated by a
-  copyright notice that is included in or attached to the work
-  (an example is provided in the Appendix below).
-
-  "Derivative Works" shall mean any work, whether in Source or Object
-  form, that is based on (or derived from) the Work and for which the
-  editorial revisions, annotations, elaborations, or other modifications
-  represent, as a whole, an original work of authorship. For the purposes
-  of this License, Derivative Works shall not include works that remain
-  separable from, or merely link (or bind by name) to the interfaces of,
-  the Work and Derivative Works thereof.
-
-  "Contribution" shall mean any work of authorship, including
-  the original version of the Work and any modifications or additions
-  to that Work or Derivative Works thereof, that is intentionally
-  submitted to Licensor for inclusion in the Work by the copyright owner
-  or by an individual or Legal Entity authorized to submit on behalf of
-  the copyright owner. For the purposes of this definition, "submitted"
-  means any form of electronic, verbal, or written communication sent
-  to the Licensor or its representatives, including but not limited to
-  communication on electronic mailing lists, source code control systems,
-  and issue tracking systems that are managed by, or on behalf of, the
-  Licensor for the purpose of discussing and improving the Work, but
-  excluding communication that is conspicuously marked or otherwise
-  designated in writing by the copyright owner as "Not a Contribution."
-
-  "Contributor" shall mean Licensor and any individual or Legal Entity
-  on behalf of whom a Contribution has been received by Licensor and
-  subsequently incorporated within the Work.
-
-   2. Grant of Copyright License. Subject to the terms and conditions of
-  this License, each Contributor hereby grants to You a perpetual,
-  worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-  copyright license to reproduce, prepare Derivative Works of,
-  publicly display, publicly perform, sublicense, and distribute the
- 

[22/50] [abbrv] beam git commit: Cleanup tests in pipeline_test.

2017-01-30 Thread davor
Cleanup tests in pipeline_test.

Notably, the runner_name parameter has been obsolete since the removal
of DiskCachedRunnerPipelineTest and is an inferior version of what
TestPipeline provides.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/61d8d3f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/61d8d3f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/61d8d3f0

Branch: refs/heads/master
Commit: 61d8d3f0690142f6dc87b1484d3ebd148a706837
Parents: 9540cf1
Author: Robert Bradshaw 
Authored: Sat Jan 21 21:07:39 2017 -0800
Committer: Robert Bradshaw 
Committed: Wed Jan 25 12:38:03 2017 -0800

--
 sdks/python/apache_beam/pipeline_test.py | 57 ++-
 1 file changed, 21 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/61d8d3f0/sdks/python/apache_beam/pipeline_test.py
--
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 93b68d1..833293f 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -38,8 +38,8 @@ from apache_beam.transforms import Read
 from apache_beam.transforms import WindowInto
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
-from apache_beam.transforms.window import IntervalWindow
-from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import SlidingWindows
+from apache_beam.transforms.window import TimestampedValue
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 
 
@@ -70,9 +70,6 @@ class FakeSource(NativeSource):
 
 class PipelineTest(unittest.TestCase):
 
-  def setUp(self):
-self.runner_name = 'DirectRunner'
-
   @staticmethod
   def custom_callable(pcoll):
 return pcoll | '+1' >> FlatMap(lambda x: [x + 1])
@@ -103,7 +100,7 @@ class PipelineTest(unittest.TestCase):
   self.leave_composite.append(transform_node)
 
   def test_create(self):
-pipeline = TestPipeline(runner=self.runner_name)
+pipeline = TestPipeline()
 pcoll = pipeline | 'label1' >> Create([1, 2, 3])
 assert_that(pcoll, equal_to([1, 2, 3]))
 
@@ -114,19 +111,19 @@ class PipelineTest(unittest.TestCase):
 pipeline.run()
 
   def test_create_singleton_pcollection(self):
-pipeline = TestPipeline(runner=self.runner_name)
+pipeline = TestPipeline()
 pcoll = pipeline | 'label' >> Create([[1, 2, 3]])
 assert_that(pcoll, equal_to([[1, 2, 3]]))
 pipeline.run()
 
   def test_read(self):
-pipeline = TestPipeline(runner=self.runner_name)
+pipeline = TestPipeline()
 pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3]))
 assert_that(pcoll, equal_to([1, 2, 3]))
 pipeline.run()
 
   def test_visit_entire_graph(self):
-pipeline = Pipeline(self.runner_name)
+pipeline = Pipeline()
 pcoll1 = pipeline | 'pcoll' >> Create([1, 2, 3])
 pcoll2 = pcoll1 | 'do1' >> FlatMap(lambda x: [x + 1])
 pcoll3 = pcoll2 | 'do2' >> FlatMap(lambda x: [x + 1])
@@ -145,14 +142,14 @@ class PipelineTest(unittest.TestCase):
 self.assertEqual(visitor.leave_composite[0].transform, transform)
 
   def test_apply_custom_transform(self):
-pipeline = TestPipeline(runner=self.runner_name)
+pipeline = TestPipeline()
 pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
 result = pcoll | PipelineTest.CustomTransform()
 assert_that(result, equal_to([2, 3, 4]))
 pipeline.run()
 
   def test_reuse_custom_transform_instance(self):
-pipeline = Pipeline(self.runner_name)
+pipeline = Pipeline()
 pcoll1 = pipeline | 'pcoll1' >> Create([1, 2, 3])
 pcoll2 = pipeline | 'pcoll2' >> Create([4, 5, 6])
 transform = PipelineTest.CustomTransform()
@@ -167,7 +164,7 @@ class PipelineTest(unittest.TestCase):
 'pvalue | "label" >> transform')
 
   def test_reuse_cloned_custom_transform_instance(self):
-pipeline = TestPipeline(runner=self.runner_name)
+pipeline = TestPipeline()
 pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3])
 pcoll2 = pipeline | 'pc2' >> Create([4, 5, 6])
 transform = PipelineTest.CustomTransform()
@@ -240,7 +237,7 @@ class PipelineTest(unittest.TestCase):
 def raise_exception(exn):
   raise exn
 with self.assertRaises(ValueError):
-  with Pipeline(self.runner_name) as p:
+  with Pipeline() as p:
 # pylint: disable=expression-not-assigned
 p | Create([ValueError]) | Map(raise_exception)
 
@@ -251,15 +248,12 @@ class PipelineTest(unittest.TestCase):
 
 class NewDoFnTest(unittest.TestCase):
 
-  def setUp(self):
-self.runner_name = 'DirectRunner'
-
   def test_element(self):
 class TestDoFn(NewDoFn):
   def process(self, element):
 yield elemen

[45/50] [abbrv] beam git commit: This closes #1863

2017-01-30 Thread davor
This closes #1863


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1390699c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1390699c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1390699c

Branch: refs/heads/master
Commit: 1390699c37596ebe34a773627660b6c496375a8e
Parents: 475707f e02ddac
Author: Davor Bonaci 
Authored: Mon Jan 30 12:45:03 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:45:03 2017 -0800

--
 sdks/python/apache_beam/io/bigquery_test.py | 26 
 1 file changed, 18 insertions(+), 8 deletions(-)
--




[27/50] [abbrv] beam git commit: Fix read/write display data

2017-01-30 Thread davor
Fix read/write display data


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e4eda3c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e4eda3c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e4eda3c3

Branch: refs/heads/master
Commit: e4eda3c335b5767bdaf40b56b2dd5d67d7348f20
Parents: c6420df
Author: Pablo 
Authored: Fri Jan 13 11:25:36 2017 -0800
Committer: Robert Bradshaw 
Committed: Thu Jan 26 14:51:56 2017 -0800

--
 sdks/python/apache_beam/io/avroio_test.py |  6 
 sdks/python/apache_beam/io/fileio.py  | 10 ++-
 sdks/python/apache_beam/io/fileio_test.py |  2 --
 sdks/python/apache_beam/io/iobase.py  | 38 +-
 sdks/python/apache_beam/io/textio.py  | 25 +
 sdks/python/apache_beam/io/textio_test.py | 30 
 6 files changed, 47 insertions(+), 64 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/avroio_test.py
--
diff --git a/sdks/python/apache_beam/io/avroio_test.py 
b/sdks/python/apache_beam/io/avroio_test.py
index aed468d..d2fb1d1 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -196,9 +196,6 @@ class TestAvro(unittest.TestCase):
 'file_pattern',
 'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d.end'),
 DisplayDataItemMatcher(
-'shards',
-0),
-DisplayDataItemMatcher(
 'codec',
 'null'),
 DisplayDataItemMatcher(
@@ -219,9 +216,6 @@ class TestAvro(unittest.TestCase):
 'file_pattern',
 'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d'),
 DisplayDataItemMatcher(
-'shards',
-0),
-DisplayDataItemMatcher(
 'codec',
 'deflate'),
 DisplayDataItemMatcher(

http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio.py
--
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 52f31c6..f67dca9 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -547,7 +547,8 @@ class FileSink(iobase.Sink):
 
   def display_data(self):
 return {'shards':
-DisplayDataItem(self.num_shards, label='Number of Shards'),
+DisplayDataItem(self.num_shards,
+label='Number of Shards').drop_if_default(0),
 'compression':
 DisplayDataItem(str(self.compression_type)),
 'file_pattern':
@@ -787,6 +788,13 @@ class TextFileSink(FileSink):
   '\'textio.WriteToText()\' instead of directly '
   'instantiating a TextFileSink object.')
 
+  def display_data(self):
+dd_parent = super(TextFileSink, self).display_data()
+dd_parent['append_newline'] = DisplayDataItem(
+self.append_trailing_newlines,
+label='Append Trailing New Lines')
+return dd_parent
+
   def write_encoded_record(self, file_handle, encoded_value):
 """Writes a single encoded record."""
 file_handle.write(encoded_value)

http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio_test.py
--
diff --git a/sdks/python/apache_beam/io/fileio_test.py 
b/sdks/python/apache_beam/io/fileio_test.py
index ad77dc5..6c33f53 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -142,8 +142,6 @@ class TestFileSink(unittest.TestCase):
 dd = DisplayData.create_from(sink)
 expected_items = [
 DisplayDataItemMatcher(
-'shards', 0),
-DisplayDataItemMatcher(
 'compression', 'auto'),
 DisplayDataItemMatcher(
 'file_pattern',

http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/iobase.py
--
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index 12af3b6..1266ed3 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -759,16 +759,15 @@ class WriteImpl(ptransform.PTransform):
   write_result_coll = (keyed_pcoll
| core.WindowInto(window.GlobalWindows())
| core.GroupByKey()
-   | 'WriteBundles' >> core.Map(
-   _write_keyed_bundle, self.sink,
+   | 'WriteBundles' >> core.ParDo(
+   

[37/50] [abbrv] beam git commit: Update pom.xml for sdks/python.

2017-01-30 Thread davor
Update pom.xml for sdks/python.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1b8679c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1b8679c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1b8679c

Branch: refs/heads/master
Commit: f1b8679c4af283d1e751043e2e765b7f295af0b2
Parents: c2859a5
Author: Ahmet Altay 
Authored: Fri Jan 27 17:04:21 2017 -0800
Committer: Ahmet Altay 
Committed: Fri Jan 27 17:04:21 2017 -0800

--
 sdks/python/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/f1b8679c/sdks/python/pom.xml
--
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index cc90969..615ddc5 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.beam
 beam-sdks-parent
-0.5.0-incubating-SNAPSHOT
+0.6.0-SNAPSHOT
 ../pom.xml
   
 



[43/50] [abbrv] beam git commit: This closes #1866

2017-01-30 Thread davor
This closes #1866


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/475707f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/475707f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/475707f0

Branch: refs/heads/master
Commit: 475707f0ffd7bc82ca78fa3f3c9e78f661478b99
Parents: f29527f 51afc1c
Author: Davor Bonaci 
Authored: Mon Jan 30 12:43:48 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:43:48 2017 -0800

--
 sdks/python/apache_beam/internal/util.py  | 33 ++
 sdks/python/apache_beam/io/filebasedsource.py | 17 +++
 sdks/python/apache_beam/io/fileio.py  | 11 ++--
 3 files changed, 40 insertions(+), 21 deletions(-)
--




[08/50] [abbrv] beam git commit: Closes #1818

2017-01-30 Thread davor
Closes #1818


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/01b36280
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/01b36280
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/01b36280

Branch: refs/heads/master
Commit: 01b362807724b03969775c3a17af0854bb4b29a6
Parents: 894461e 1811458
Author: Robert Bradshaw 
Authored: Mon Jan 23 14:36:00 2017 -0800
Committer: Robert Bradshaw 
Committed: Mon Jan 23 14:36:00 2017 -0800

--
 sdks/python/apache_beam/runners/dataflow_runner.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[38/50] [abbrv] beam git commit: Closes #1861

2017-01-30 Thread davor
Closes #1861


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27cf68ee
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27cf68ee
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27cf68ee

Branch: refs/heads/master
Commit: 27cf68ee72bd58475c170712f7afe20102601606
Parents: 1bc6859 f1b8679
Author: Dan Halperin 
Authored: Sun Jan 29 08:21:18 2017 -0800
Committer: Dan Halperin 
Committed: Sun Jan 29 08:21:18 2017 -0800

--
 .jenkins/common_job_properties.groovy   |9 +-
 ...job_beam_PostCommit_Java_MavenInstall.groovy |2 +-
 .../job_beam_PreCommit_Java_MavenInstall.groovy |2 +-
 .../job_beam_Release_NightlySnapshot.groovy |2 +-
 .jenkins/job_seed.groovy|2 +-
 .travis/README.md   |2 +-
 DISCLAIMER  |   10 -
 NOTICE  |4 +-
 README.md   |   46 +-
 examples/java/README.md |   16 +-
 examples/java/pom.xml   |   21 +-
 .../beam/examples/DebuggingWordCount.java   |4 +-
 .../org/apache/beam/examples/WordCount.java |6 +-
 .../beam/examples/complete/AutoComplete.java|2 +-
 .../org/apache/beam/examples/complete/README.md |   14 +-
 .../apache/beam/examples/complete/TfIdf.java|2 +-
 .../examples/complete/TopWikipediaSessions.java |2 +-
 .../examples/complete/TrafficMaxLaneFlow.java   |2 +-
 .../beam/examples/complete/TrafficRoutes.java   |2 +-
 .../examples/cookbook/BigQueryTornadoes.java|2 +-
 .../cookbook/CombinePerKeyExamples.java |2 +-
 .../org/apache/beam/examples/cookbook/README.md |   14 +-
 .../beam/examples/cookbook/TriggerExample.java  |4 +-
 .../beam/examples/WindowedWordCountIT.java  |   16 +-
 examples/java8/pom.xml  |2 +-
 .../beam/examples/complete/game/GameStats.java  |7 +-
 .../examples/complete/game/LeaderBoard.java |5 +-
 .../beam/examples/complete/game/UserScore.java  |2 +-
 examples/pom.xml|   16 +-
 pom.xml |   41 +-
 runners/apex/README.md  |4 +-
 runners/apex/pom.xml|3 +-
 .../beam/runners/apex/ApexPipelineOptions.java  |7 +-
 .../apache/beam/runners/apex/ApexRunner.java|   43 +-
 .../beam/runners/apex/ApexYarnLauncher.java |   23 +-
 .../translation/CreateValuesTranslator.java |   18 +-
 .../FlattenPCollectionTranslator.java   |   28 +-
 .../apex/translation/GroupByKeyTranslator.java  |2 +-
 .../translation/ParDoBoundMultiTranslator.java  |   27 +-
 .../apex/translation/ParDoBoundTranslator.java  |4 +-
 .../apex/translation/TranslationContext.java|   27 +-
 .../apex/translation/WindowBoundTranslator.java |8 +-
 .../operators/ApexGroupByKeyOperator.java   |4 +-
 .../operators/ApexParDoOperator.java|6 +-
 .../ApexReadUnboundedInputOperator.java |   17 +-
 .../beam/runners/apex/ApexRunnerTest.java   |   75 ++
 .../beam/runners/apex/ApexYarnLauncherTest.java |9 +-
 .../runners/apex/examples/WordCountTest.java|2 +-
 .../translation/ParDoBoundTranslatorTest.java   |6 +-
 .../translation/ReadUnboundTranslatorTest.java  |8 +-
 .../utils/ApexStateInternalsTest.java   |2 +-
 .../test/resources/beam-runners-apex.properties |   20 +
 runners/core-java/pom.xml   |2 +-
 .../beam/runners/core/AssignWindowsDoFn.java|3 +-
 .../apache/beam/runners/core/DoFnAdapters.java  |  343 ++
 .../apache/beam/runners/core/DoFnRunner.java|   21 -
 .../apache/beam/runners/core/DoFnRunners.java   |  138 +--
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   10 +-
 .../runners/core/GroupAlsoByWindowsDoFn.java|5 +-
 .../beam/runners/core/KeyedWorkItemCoder.java   |4 +-
 .../core/LateDataDroppingDoFnRunner.java|1 -
 .../apache/beam/runners/core/NonEmptyPanes.java |2 +-
 .../org/apache/beam/runners/core/OldDoFn.java   |  472 
 .../runners/core/PerKeyCombineFnRunner.java |   70 --
 .../runners/core/PerKeyCombineFnRunners.java|  101 --
 .../beam/runners/core/SimpleDoFnRunner.java |   63 -
 .../beam/runners/core/SimpleOldDoFnRunner.java  |7 +-
 .../beam/runners/core/SplittableParDo.java  |7 -
 .../core/UnboundedReadFromBoundedSource.java|   14 +-
 .../AfterDelayFromFirstElementStateMachine.java |2 +-
 .../core/triggers/AfterPaneStateMachine.java|2 +-
 .../core/DoFnDelegatingAggregatorTest.java  |  144 +++
 .../core/GroupAlsoByWindowsProperties.java  |2 +-
 .../runners/core/KeyedWorkItemCoderTest.java|6 +
 .../core/LateDataDroppingDoFnRunnerTest.java|2 

[35/50] [abbrv] beam git commit: This closes #1807

2017-01-30 Thread davor
This closes #1807


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc68598
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc68598
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc68598

Branch: refs/heads/master
Commit: 1bc685980092e0922504858fa6c08adc8c44acaa
Parents: 52d97e2 e5d8810
Author: Dan Halperin 
Authored: Fri Jan 27 14:30:28 2017 -0800
Committer: Dan Halperin 
Committed: Fri Jan 27 14:30:28 2017 -0800

--
 sdks/python/apache_beam/coders/coders.py |  1 -
 sdks/python/run_pylint.sh| 44 ---
 2 files changed, 12 insertions(+), 33 deletions(-)
--




[20/50] [abbrv] beam git commit: Closes #1836

2017-01-30 Thread davor
Closes #1836


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9540cf17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9540cf17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9540cf17

Branch: refs/heads/master
Commit: 9540cf1762d8595126a1f96301c35524b0a804c2
Parents: 43cb4d7 19789db
Author: Robert Bradshaw 
Authored: Tue Jan 24 16:32:37 2017 -0800
Committer: Robert Bradshaw 
Committed: Tue Jan 24 16:32:37 2017 -0800

--
 sdks/python/run_postcommit.sh | 5 +
 1 file changed, 5 insertions(+)
--




[41/50] [abbrv] beam git commit: This closes #1870

2017-01-30 Thread davor
This closes #1870


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f29527f6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f29527f6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f29527f6

Branch: refs/heads/master
Commit: f29527f68b8de92caf18b183e3a7e97eb190f67e
Parents: 27cf68e 38575a1
Author: Davor Bonaci 
Authored: Mon Jan 30 12:38:53 2017 -0800
Committer: Davor Bonaci 
Committed: Mon Jan 30 12:38:53 2017 -0800

--
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   | 156 +++
 .../wrappers/streaming/DoFnOperator.java|  69 
 .../wrappers/streaming/WindowDoFnOperator.java  | 143 +
 sdks/python/apache_beam/version.py  |   3 +-
 4 files changed, 265 insertions(+), 106 deletions(-)
--




  1   2   >