[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-10-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=332481=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-332481
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 23/Oct/19 07:43
Start Date: 23/Oct/19 07:43
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-545314318
 
 
   Dataflow Runner and Flink Runner all pass the tests, as does the current 
DirectRunner . Will add some more tests to this PR to confirm and reopen so we 
can start the library extension. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 332481)
Time Spent: 7h 50m  (was: 7h 40m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-10-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=331110=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331110
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 20/Oct/19 20:55
Start Date: 20/Oct/19 20:55
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 331110)
Time Spent: 7h 40m  (was: 7.5h)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-10-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=331109=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331109
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 20/Oct/19 20:55
Start Date: 20/Oct/19 20:55
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-544291666
 
 
   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 331109)
Time Spent: 7.5h  (was: 7h 20m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-10-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=327560=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327560
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 13/Oct/19 20:02
Start Date: 13/Oct/19 20:02
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-541453259
 
 
   This pull request has been marked as stale due to 60 days of inactivity. It 
will be closed in 1 week if no further activity occurs. If you think that’s 
incorrect or this pull request requires a review, please simply write any 
comment. If closed, you can revive the PR at any time and @mention a reviewer 
or discuss it on the d...@beam.apache.org list. Thank you for your 
contributions.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327560)
Time Spent: 7h 20m  (was: 7h 10m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294974=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294974
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 14/Aug/19 19:34
Start Date: 14/Aug/19 19:34
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r313952183
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int LEFT_STREAM_GC_LIMIT = 1000;
+
+Coder leftCoder;
+Coder rightCoder;
+
+TupleTag leftTag;
+TupleTag rightTag;
+
+Duration window;
+
+private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, 
Duration window) {
+  this.leftTag = leftTag;
+  this.rightTag = rightTag;
+  this.window = window;
+}
+
+public static  BiTemporalJoin create(
+TupleTag leftTag, TupleTag rightTag, Duration window) {
+  return new BiTemporalJoin(leftTag, rightTag, window);
+}
+
+public BiTemporalJoin setGCLimit(int gcLimit) {
+  LEFT_STREAM_GC_LIMIT = gcLimit;
+  return this;
+}
+
+@Override
+public PCollection> 
expand(KeyedPCollectionTuple input) {
+
+  List> collections =
+  input.getKeyedCollections();
+
+  PCollection> leftCollection = null;
+  PCollection> rightCollection = null;
+
+  for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) 
{
+
+if (t.getTupleTag().equals(leftTag)) {
+  leftCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+
+if (t.getTupleTag().equals(rightTag)) {
+  rightCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+  }
+
+  leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder();
+  rightCoder = ((KvCoder) 
rightCollection.getCoder()).getValueCoder();
+
+  BiTemporalJoinResultCoder 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294816=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294816
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 14/Aug/19 15:54
Start Date: 14/Aug/19 15:54
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r313952183
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int LEFT_STREAM_GC_LIMIT = 1000;
+
+Coder leftCoder;
+Coder rightCoder;
+
+TupleTag leftTag;
+TupleTag rightTag;
+
+Duration window;
+
+private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, 
Duration window) {
+  this.leftTag = leftTag;
+  this.rightTag = rightTag;
+  this.window = window;
+}
+
+public static  BiTemporalJoin create(
+TupleTag leftTag, TupleTag rightTag, Duration window) {
+  return new BiTemporalJoin(leftTag, rightTag, window);
+}
+
+public BiTemporalJoin setGCLimit(int gcLimit) {
+  LEFT_STREAM_GC_LIMIT = gcLimit;
+  return this;
+}
+
+@Override
+public PCollection> 
expand(KeyedPCollectionTuple input) {
+
+  List> collections =
+  input.getKeyedCollections();
+
+  PCollection> leftCollection = null;
+  PCollection> rightCollection = null;
+
+  for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) 
{
+
+if (t.getTupleTag().equals(leftTag)) {
+  leftCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+
+if (t.getTupleTag().equals(rightTag)) {
+  rightCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+  }
+
+  leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder();
+  rightCoder = ((KvCoder) 
rightCollection.getCoder()).getValueCoder();
+
+  BiTemporalJoinResultCoder 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294807=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294807
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 14/Aug/19 15:28
Start Date: 14/Aug/19 15:28
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r313938990
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int LEFT_STREAM_GC_LIMIT = 1000;
+
+Coder leftCoder;
+Coder rightCoder;
+
+TupleTag leftTag;
+TupleTag rightTag;
+
+Duration window;
+
+private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, 
Duration window) {
+  this.leftTag = leftTag;
+  this.rightTag = rightTag;
+  this.window = window;
+}
+
+public static  BiTemporalJoin create(
+TupleTag leftTag, TupleTag rightTag, Duration window) {
+  return new BiTemporalJoin(leftTag, rightTag, window);
+}
+
+public BiTemporalJoin setGCLimit(int gcLimit) {
+  LEFT_STREAM_GC_LIMIT = gcLimit;
+  return this;
+}
+
+@Override
+public PCollection> 
expand(KeyedPCollectionTuple input) {
+
+  List> collections =
+  input.getKeyedCollections();
+
+  PCollection> leftCollection = null;
+  PCollection> rightCollection = null;
+
+  for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) 
{
+
+if (t.getTupleTag().equals(leftTag)) {
+  leftCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+
+if (t.getTupleTag().equals(rightTag)) {
+  rightCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+  }
+
+  leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder();
+  rightCoder = ((KvCoder) 
rightCollection.getCoder()).getValueCoder();
+
+  BiTemporalJoinResultCoder 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294775=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294775
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 14/Aug/19 14:29
Start Date: 14/Aug/19 14:29
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r313900817
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int LEFT_STREAM_GC_LIMIT = 1000;
+
+Coder leftCoder;
+Coder rightCoder;
+
+TupleTag leftTag;
+TupleTag rightTag;
+
+Duration window;
+
+private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, 
Duration window) {
+  this.leftTag = leftTag;
+  this.rightTag = rightTag;
+  this.window = window;
+}
+
+public static  BiTemporalJoin create(
+TupleTag leftTag, TupleTag rightTag, Duration window) {
+  return new BiTemporalJoin(leftTag, rightTag, window);
+}
+
+public BiTemporalJoin setGCLimit(int gcLimit) {
+  LEFT_STREAM_GC_LIMIT = gcLimit;
+  return this;
+}
+
+@Override
+public PCollection> 
expand(KeyedPCollectionTuple input) {
+
+  List> collections =
+  input.getKeyedCollections();
+
+  PCollection> leftCollection = null;
+  PCollection> rightCollection = null;
+
+  for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) 
{
+
+if (t.getTupleTag().equals(leftTag)) {
+  leftCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+
+if (t.getTupleTag().equals(rightTag)) {
+  rightCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+  }
+
+  leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder();
+  rightCoder = ((KvCoder) 
rightCollection.getCoder()).getValueCoder();
+
+  BiTemporalJoinResultCoder 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294771=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294771
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 14/Aug/19 14:29
Start Date: 14/Aug/19 14:29
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r313893768
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int LEFT_STREAM_GC_LIMIT = 1000;
+
+Coder leftCoder;
+Coder rightCoder;
+
+TupleTag leftTag;
+TupleTag rightTag;
+
+Duration window;
+
+private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, 
Duration window) {
+  this.leftTag = leftTag;
+  this.rightTag = rightTag;
+  this.window = window;
+}
+
+public static  BiTemporalJoin create(
+TupleTag leftTag, TupleTag rightTag, Duration window) {
+  return new BiTemporalJoin(leftTag, rightTag, window);
+}
+
+public BiTemporalJoin setGCLimit(int gcLimit) {
+  LEFT_STREAM_GC_LIMIT = gcLimit;
+  return this;
+}
+
+@Override
+public PCollection> 
expand(KeyedPCollectionTuple input) {
+
+  List> collections =
+  input.getKeyedCollections();
+
+  PCollection> leftCollection = null;
+  PCollection> rightCollection = null;
+
+  for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) 
{
+
+if (t.getTupleTag().equals(leftTag)) {
+  leftCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+
+if (t.getTupleTag().equals(rightTag)) {
+  rightCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+  }
+
+  leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder();
+  rightCoder = ((KvCoder) 
rightCollection.getCoder()).getValueCoder();
+
+  BiTemporalJoinResultCoder 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294776=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294776
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 14/Aug/19 14:29
Start Date: 14/Aug/19 14:29
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r313896435
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int LEFT_STREAM_GC_LIMIT = 1000;
+
+Coder leftCoder;
+Coder rightCoder;
+
+TupleTag leftTag;
+TupleTag rightTag;
+
+Duration window;
+
+private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, 
Duration window) {
+  this.leftTag = leftTag;
+  this.rightTag = rightTag;
+  this.window = window;
+}
+
+public static  BiTemporalJoin create(
+TupleTag leftTag, TupleTag rightTag, Duration window) {
+  return new BiTemporalJoin(leftTag, rightTag, window);
 
 Review comment:
   ```suggestion
 return new BiTemporalJoin<>(leftTag, rightTag, window);
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 294776)
Time Spent: 6h 20m  (was: 6h 10m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294774=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294774
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 14/Aug/19 14:29
Start Date: 14/Aug/19 14:29
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r313899337
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int LEFT_STREAM_GC_LIMIT = 1000;
+
+Coder leftCoder;
+Coder rightCoder;
+
+TupleTag leftTag;
+TupleTag rightTag;
+
+Duration window;
+
+private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, 
Duration window) {
+  this.leftTag = leftTag;
+  this.rightTag = rightTag;
+  this.window = window;
+}
+
+public static  BiTemporalJoin create(
+TupleTag leftTag, TupleTag rightTag, Duration window) {
+  return new BiTemporalJoin(leftTag, rightTag, window);
+}
+
+public BiTemporalJoin setGCLimit(int gcLimit) {
+  LEFT_STREAM_GC_LIMIT = gcLimit;
+  return this;
+}
+
+@Override
+public PCollection> 
expand(KeyedPCollectionTuple input) {
+
+  List> collections =
+  input.getKeyedCollections();
+
+  PCollection> leftCollection = null;
+  PCollection> rightCollection = null;
+
+  for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) 
{
+
+if (t.getTupleTag().equals(leftTag)) {
+  leftCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+
+if (t.getTupleTag().equals(rightTag)) {
+  rightCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+  }
+
+  leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder();
+  rightCoder = ((KvCoder) 
rightCollection.getCoder()).getValueCoder();
+
+  BiTemporalJoinResultCoder 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294772=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294772
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 14/Aug/19 14:29
Start Date: 14/Aug/19 14:29
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r313901332
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int LEFT_STREAM_GC_LIMIT = 1000;
+
+Coder leftCoder;
+Coder rightCoder;
+
+TupleTag leftTag;
+TupleTag rightTag;
+
+Duration window;
+
+private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, 
Duration window) {
+  this.leftTag = leftTag;
+  this.rightTag = rightTag;
+  this.window = window;
+}
+
+public static  BiTemporalJoin create(
+TupleTag leftTag, TupleTag rightTag, Duration window) {
+  return new BiTemporalJoin(leftTag, rightTag, window);
+}
+
+public BiTemporalJoin setGCLimit(int gcLimit) {
+  LEFT_STREAM_GC_LIMIT = gcLimit;
+  return this;
+}
+
+@Override
+public PCollection> 
expand(KeyedPCollectionTuple input) {
+
+  List> collections =
+  input.getKeyedCollections();
+
+  PCollection> leftCollection = null;
+  PCollection> rightCollection = null;
+
+  for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) 
{
+
+if (t.getTupleTag().equals(leftTag)) {
+  leftCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+
+if (t.getTupleTag().equals(rightTag)) {
+  rightCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+  }
+
+  leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder();
+  rightCoder = ((KvCoder) 
rightCollection.getCoder()).getValueCoder();
+
+  BiTemporalJoinResultCoder 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294777=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294777
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 14/Aug/19 14:29
Start Date: 14/Aug/19 14:29
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r313897433
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int LEFT_STREAM_GC_LIMIT = 1000;
+
+Coder leftCoder;
+Coder rightCoder;
+
+TupleTag leftTag;
+TupleTag rightTag;
+
+Duration window;
 
 Review comment:
   Hm, I'm wondering, does this have to work in FixedWindows? I think there 
should be no blocker for this to work on GlobalWindows as well. Even for any 
other windows, provided the windowFn is the same on both sides, right?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 294777)
Time Spent: 6.5h  (was: 6h 20m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294778=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294778
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 14/Aug/19 14:29
Start Date: 14/Aug/19 14:29
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r313901093
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int LEFT_STREAM_GC_LIMIT = 1000;
+
+Coder leftCoder;
+Coder rightCoder;
+
+TupleTag leftTag;
+TupleTag rightTag;
+
+Duration window;
+
+private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, 
Duration window) {
+  this.leftTag = leftTag;
+  this.rightTag = rightTag;
+  this.window = window;
+}
+
+public static  BiTemporalJoin create(
+TupleTag leftTag, TupleTag rightTag, Duration window) {
+  return new BiTemporalJoin(leftTag, rightTag, window);
+}
+
+public BiTemporalJoin setGCLimit(int gcLimit) {
+  LEFT_STREAM_GC_LIMIT = gcLimit;
+  return this;
+}
+
+@Override
+public PCollection> 
expand(KeyedPCollectionTuple input) {
+
+  List> collections =
+  input.getKeyedCollections();
+
+  PCollection> leftCollection = null;
+  PCollection> rightCollection = null;
+
+  for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) 
{
+
+if (t.getTupleTag().equals(leftTag)) {
+  leftCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+
+if (t.getTupleTag().equals(rightTag)) {
+  rightCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+  }
+
+  leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder();
+  rightCoder = ((KvCoder) 
rightCollection.getCoder()).getValueCoder();
+
+  BiTemporalJoinResultCoder 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294773=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294773
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 14/Aug/19 14:29
Start Date: 14/Aug/19 14:29
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r313895681
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
 
 Review comment:
   Should this be called `BiTemporalLeftJoin`? It seems not to work equally in 
both directions.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 294773)
Time Spent: 5h 50m  (was: 5h 40m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-08-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=293688=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-293688
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 13/Aug/19 07:32
Start Date: 13/Aug/19 07:32
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r313254684
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int GC_LIMIT = 1000;
 
 Review comment:
   Looks like one of the keys is no longer getting processed. Thats the 
symptom, digging into the cause.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 293688)
Time Spent: 5h 20m  (was: 5h 10m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=284960=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-284960
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 30/Jul/19 14:02
Start Date: 30/Jul/19 14:02
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r308739373
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int GC_LIMIT = 1000;
 
 Review comment:
   @rezarokni I have managed to (hopefully finally) fix the issue with timers. 
Related PR is #9190, I cherry picked the two commits into PR rezarokni#2 and 
the failing tests are green on my local machine, with the exception of 
`BiTemporalCacheTest.cacheTest`, which is still failing (consistently). Don't 
know what is the issue with that, but I hope it is no longer related to the 
timer ordering. Could you validate that?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 284960)
Time Spent: 5h 10m  (was: 5h)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=284392=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-284392
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 29/Jul/19 16:41
Start Date: 29/Jul/19 16:41
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r308327824
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int GC_LIMIT = 1000;
+
+Coder leftCoder;
+Coder rightCoder;
+
+TupleTag leftTag;
+TupleTag rightTag;
+
+Duration window;
+
+private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, 
Duration window) {
+  this.leftTag = leftTag;
+  this.rightTag = rightTag;
+  this.window = window;
+}
+
+public static  BiTemporalJoin create(
+TupleTag leftTag, TupleTag rightTag, Duration window) {
+  return new BiTemporalJoin(leftTag, rightTag, window);
+}
+
+public BiTemporalJoin setGCLimit(int gcLimit) {
+  GC_LIMIT = gcLimit;
+  return this;
+}
+
+@Override
+public PCollection> 
expand(KeyedPCollectionTuple input) {
+
+  List> collections =
+  input.getKeyedCollections();
+
+  PCollection> leftCollection = null;
+  PCollection> rightCollection = null;
+
+  for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) 
{
+
+if (t.getTupleTag().equals(leftTag)) {
+  leftCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+
+if (t.getTupleTag().equals(rightTag)) {
+  rightCollection =
+  ((KeyedPCollectionTuple.TaggedKeyedPCollection) 
t).getCollection();
+}
+  }
+
+  leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder();
+  rightCoder = ((KvCoder) 
rightCollection.getCoder()).getValueCoder();
+
+  BiTemporalJoinResultCoder biStreamJoinResultCoder =
+   

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=284363=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-284363
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 29/Jul/19 16:10
Start Date: 29/Jul/19 16:10
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r308314412
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int GC_LIMIT = 1000;
 
 Review comment:
   With regards to Je-ik first comment, 
   @amaliujia @kennknowles had a quick glance at the StatefullDoFnRunner, it 
uses a Timer to do GC. I dont think all runners use Timers for this operation, 
but as je-ik mentioned some will do ... If I recall there is some issues with 
how multiple timers will interact with each other, when a Timer resets itself. 
Should this be standardized, as I think right now this could potentially mean 
things will work on some runners and not on others?  
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 284363)
Time Spent: 4h 50m  (was: 4h 40m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=284359=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-284359
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 29/Jul/19 16:03
Start Date: 29/Jul/19 16:03
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r308310753
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int GC_LIMIT = 1000;
 
 Review comment:
   OOO at the moment, but the current experimental implementation does not deal 
with late data ( I think I had a note in the java doc). When I am back I will 
add a check to throw illegal state if late data is not set to ZERO.  Until we 
have retractions the best way that we can deal with late data is to deadletter 
it.
   
   The GC_LIMIT ( which should have a better name, will fix that when I am 
back) is actually to reduce the amount of processing needed in sorting the left 
hand side. As the whole list needs to be sorted across each finish bundle, but 
once an element is processed it is no longer needed. Once we have Sorted Map 
State most of the code for this class can be deleted, as there will be no need 
for the cache that we have to use at the moment.  :-)
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 284359)
Time Spent: 4h 40m  (was: 4.5h)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=283817=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-283817
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 27/Jul/19 20:13
Start Date: 27/Jul/19 20:13
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r307975929
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int GC_LIMIT = 1000;
 
 Review comment:
   Do I understand this correctly, that this kind of supplies allowed lateness? 
So that elements, from right side that are too late will not be joined, because 
the left side will already have been garbage collected?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 283817)
Time Spent: 4.5h  (was: 4h 20m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=283816=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-283816
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 27/Jul/19 20:13
Start Date: 27/Jul/19 20:13
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r307976024
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java
 ##
 @@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.util.*;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample will take two streams left stream and right stream. It will 
match the left stream to
+ * the nearest right stream based on their timestamp. Nearest left stream is 
either <= to the right
+ * stream timestamp.
+ *
+ * If two values in the right steam have the same timestamp then the 
results are
+ * non-deterministic.
+ */
+@Experimental
+public class BiTemporalStreams {
+
+  public static  BiTemporalJoin join(
+  TupleTag leftTag, TupleTag rightTag, Duration window) {
+return new BiTemporalJoin<>(leftTag, rightTag, window);
+  }
+
+  @Experimental
+  public static class BiTemporalJoin
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalStreams.class);
+
+// Sets the limit at which point the processed left stream values are 
garbage collected
+static int GC_LIMIT = 1000;
 
 Review comment:
   And if so - would this be working correctly on batch?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 283816)
Time Spent: 4h 20m  (was: 4h 10m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=283762=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-283762
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 27/Jul/19 08:12
Start Date: 27/Jul/19 08:12
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-515664865
 
 
   @amaliujia  thanx!
   @kennknowles would you have time for a review please? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 283762)
Time Spent: 4h 10m  (was: 4h)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=282784=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-282784
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 25/Jul/19 17:24
Start Date: 25/Jul/19 17:24
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-515134878
 
 
   @rezarokni
   
   You have to find a committer as well on this PR to review/merge it.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 282784)
Time Spent: 4h  (was: 3h 50m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=281353=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-281353
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 23/Jul/19 22:21
Start Date: 23/Jul/19 22:21
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-514405257
 
 
   > @rezarokni
   > 
   > Was seeing not all checks succeeded so thought you worked on making those 
checks pass. But just clicked into logs and looked like they are not relevant, 
so will take a closer look on main stream class soon.
   
   There are some check style changes that need to be made, if OK I will fixup 
with your next comments.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 281353)
Time Spent: 3h 50m  (was: 3h 40m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=281307=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-281307
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 23/Jul/19 20:45
Start Date: 23/Jul/19 20:45
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-514376941
 
 
   @rezarokni 
   
   Was seeing not all checks succeeded so thought you worked on making those 
checks pass. But just clicked into logs and looked like they are not relevant, 
so will take a closer look on main stream class soon. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 281307)
Time Spent: 3h 40m  (was: 3.5h)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=281304=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-281304
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 23/Jul/19 20:43
Start Date: 23/Jul/19 20:43
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-514376058
 
 
   @amaliujia Would you be able to have a look at the main stream class this 
week?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 281304)
Time Spent: 3.5h  (was: 3h 20m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=281303=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-281303
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 23/Jul/19 20:42
Start Date: 23/Jul/19 20:42
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-514375707
 
 
   @ruwang Any views on the main stream class :-)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 281303)
Time Spent: 3h 20m  (was: 3h 10m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=281302=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-281302
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 23/Jul/19 20:41
Start Date: 23/Jul/19 20:41
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-514375707
 
 
   @ruwang Any views on the main stream class :-)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 281302)
Time Spent: 3h 10m  (was: 3h)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=276598=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276598
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 15/Jul/19 10:14
Start Date: 15/Jul/19 10:14
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r303367452
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalCacheTestIT.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.io.Serializable;
+import 
org.apache.beam.sdk.extensions.timeseries.joins.BiTemporalTestUtils.QuoteData;
+import 
org.apache.beam.sdk.extensions.timeseries.joins.BiTemporalTestUtils.TradeData;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BiTemporalCacheTestIT implements Serializable {
 
 Review comment:
   I think I found  the way this is done. Changed the gradle.build to 
accommodate,. it.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 276598)
Time Spent: 3h  (was: 2h 50m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=276597=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276597
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 15/Jul/19 10:12
Start Date: 15/Jul/19 10:12
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r303366582
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiStreamJoinResultCoder.java
 ##
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.*;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+
+public class BiStreamJoinResultCoder
+extends StructuredCoder> {
+
+  private final Coder leftCoder;
+  private final Coder rightCoder;
+  private final Coder keyCoder;
+
+  public BiStreamJoinResultCoder(Coder keyCoder, Coder leftCoder, 
Coder rightCoder) {
+this.leftCoder = NullableCoder.of(leftCoder);
+this.rightCoder = NullableCoder.of(rightCoder);
+this.keyCoder = NullableCoder.of(keyCoder);
+  }
+
+  public Coder getLeftCoder() {
+return leftCoder;
+  }
+
+  public Coder getRightCoder() {
+return rightCoder;
+  }
+
+  public Coder getKeyCoder() {
+return keyCoder;
+  }
+
+  public static  BiStreamJoinResultCoder of(
+  Coder keyCoder, Coder leftCoder, Coder rightCoder) {
+
+return new BiStreamJoinResultCoder<>(keyCoder, leftCoder, rightCoder);
+  }
+
+  @Override
+  public void encode(BiTemporalJoinResult value, OutputStream 
outStream)
+  throws IOException {
+
+if (value == null) {
 
 Review comment:
   BiStreamJoinResultCoder file was left over artifact from rename of class. 
Thanx for catching. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 276597)
Time Spent: 2h 50m  (was: 2h 40m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=276593=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276593
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 15/Jul/19 09:59
Start Date: 15/Jul/19 09:59
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r303362169
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalCacheTestIT.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.io.Serializable;
+import 
org.apache.beam.sdk.extensions.timeseries.joins.BiTemporalTestUtils.QuoteData;
+import 
org.apache.beam.sdk.extensions.timeseries.joins.BiTemporalTestUtils.TradeData;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BiTemporalCacheTestIT implements Serializable {
 
 Review comment:
   @lukecwik What is the best way to setup IT tests, does the framework just 
pick these up based on RequiresRunner? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 276593)
Time Spent: 2h 40m  (was: 2.5h)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=276591=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276591
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 15/Jul/19 09:57
Start Date: 15/Jul/19 09:57
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r303361737
 
 

 ##
 File path: sdks/java/extensions/timeseries/build.gradle
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature()
+
+description = "Apache Beam :: SDKs :: Java :: Extensions :: Timeseries"
+ext.summary = """Beam TIMESERIES provides helper utils to deal with timeseries 
processing"""
+
+dependencies {
+  compile library.java.guava
+  compile project(path: ":sdks:java:core", configuration: "shadow")
+  // Needed to run the Example.
+  compile project(path: ":runners:direct-java")
+  testCompile library.java.hamcrest_core
+  testCompile library.java.hamcrest_library
+  testCompile library.java.junit
+  //testRuntimeOnly project(path: ":runners:direct-java")
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 276591)
Time Spent: 2h 20m  (was: 2h 10m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=276592=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276592
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 15/Jul/19 09:57
Start Date: 15/Jul/19 09:57
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r303361809
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiStreamJoinResultCoder.java
 ##
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.*;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+
+public class BiStreamJoinResultCoder
 
 Review comment:
   Fixed.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 276592)
Time Spent: 2.5h  (was: 2h 20m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=276233=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276233
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 13/Jul/19 05:05
Start Date: 13/Jul/19 05:05
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-511089275
 
 
   Precommits are back
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 276233)
Time Spent: 2h 10m  (was: 2h)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275673=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275673
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 12/Jul/19 06:40
Start Date: 12/Jul/19 06:40
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-510766791
 
 
   I am not seeing Java precommit tests are triggered. I am less familiar on 
how does that is enabled. Guessing have to set at here: 
https://github.com/apache/beam/blob/master/build.gradle#L132  
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 275673)
Time Spent: 2h  (was: 1h 50m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275663=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275663
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 12/Jul/19 06:31
Start Date: 12/Jul/19 06:31
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r302838285
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiStreamJoinResultCoder.java
 ##
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.*;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+
+public class BiStreamJoinResultCoder
+extends StructuredCoder> {
+
+  private final Coder leftCoder;
+  private final Coder rightCoder;
+  private final Coder keyCoder;
+
+  public BiStreamJoinResultCoder(Coder keyCoder, Coder leftCoder, 
Coder rightCoder) {
+this.leftCoder = NullableCoder.of(leftCoder);
+this.rightCoder = NullableCoder.of(rightCoder);
+this.keyCoder = NullableCoder.of(keyCoder);
+  }
+
+  public Coder getLeftCoder() {
+return leftCoder;
+  }
+
+  public Coder getRightCoder() {
+return rightCoder;
+  }
+
+  public Coder getKeyCoder() {
+return keyCoder;
+  }
+
+  public static  BiStreamJoinResultCoder of(
+  Coder keyCoder, Coder leftCoder, Coder rightCoder) {
+
+return new BiStreamJoinResultCoder<>(keyCoder, leftCoder, rightCoder);
+  }
+
+  @Override
+  public void encode(BiTemporalJoinResult value, OutputStream 
outStream)
+  throws IOException {
+
+if (value == null) {
 
 Review comment:
   key, left and right coders are already `NullableCoder` which tolerates 
`NULL`? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 275663)
Time Spent: 1.5h  (was: 1h 20m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275667=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275667
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 12/Jul/19 06:31
Start Date: 12/Jul/19 06:31
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-510764741
 
 
   Retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 275667)
Time Spent: 1h 50m  (was: 1h 40m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275665=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275665
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 12/Jul/19 06:31
Start Date: 12/Jul/19 06:31
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r302843754
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalCacheTestIT.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.io.Serializable;
+import 
org.apache.beam.sdk.extensions.timeseries.joins.BiTemporalTestUtils.QuoteData;
+import 
org.apache.beam.sdk.extensions.timeseries.joins.BiTemporalTestUtils.TradeData;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BiTemporalCacheTestIT implements Serializable {
 
 Review comment:
   How does this IT run? Will `./gradlew test` trigger it?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 275665)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275666=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275666
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 12/Jul/19 06:31
Start Date: 12/Jul/19 06:31
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r302798339
 
 

 ##
 File path: sdks/java/extensions/timeseries/build.gradle
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature()
+
+description = "Apache Beam :: SDKs :: Java :: Extensions :: Timeseries"
+ext.summary = """Beam TIMESERIES provides helper utils to deal with timeseries 
processing"""
+
+dependencies {
+  compile library.java.guava
+  compile project(path: ":sdks:java:core", configuration: "shadow")
+  // Needed to run the Example.
+  compile project(path: ":runners:direct-java")
+  testCompile library.java.hamcrest_core
+  testCompile library.java.hamcrest_library
+  testCompile library.java.junit
+  //testRuntimeOnly project(path: ":runners:direct-java")
 
 Review comment:
   remove this line? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 275666)
Time Spent: 1h 40m  (was: 1.5h)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275664=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275664
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 12/Jul/19 06:31
Start Date: 12/Jul/19 06:31
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r302843161
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiStreamJoinResultCoder.java
 ##
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.*;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+
+public class BiStreamJoinResultCoder
 
 Review comment:
   `BiStreamJoinResultCoder` and `BiTemporalJoinResultCoder` are duplicates?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 275664)
Time Spent: 1.5h  (was: 1h 20m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275469=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275469
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 11/Jul/19 19:08
Start Date: 11/Jul/19 19:08
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-510615471
 
 
   Thank Kenn! Will take a look soon.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 275469)
Time Spent: 1h 20m  (was: 1h 10m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275455=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275455
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 11/Jul/19 18:26
Start Date: 11/Jul/19 18:26
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-510600426
 
 
   I believe you are looking for @amaliujia 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 275455)
Time Spent: 1h 10m  (was: 1h)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275259=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275259
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 11/Jul/19 08:59
Start Date: 11/Jul/19 08:59
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-510397666
 
 
   @ruwang
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 275259)
Time Spent: 1h  (was: 50m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=274882=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-274882
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 10/Jul/19 14:11
Start Date: 10/Jul/19 14:11
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#discussion_r302086326
 
 

 ##
 File path: 
sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalCoderTests.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries.joins;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.*;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+public class BiTemporalCoderTests implements Serializable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BiTemporalCoderTests.class);
+
+  @Rule public transient TestPipeline p = TestPipeline.create();
+
+  @Test
+  public void testBeforeValues() {}
+
+  private static class CoderAndData {
+Coder coder;
+List data;
+  }
+
+  private static class AnyCoderAndData {
+private CoderAndData coderAndData;
+  }
+
+  private static  AnyCoderAndData coderAndData(Coder coder, List 
data) {
+CoderAndData coderAndData = new CoderAndData<>();
+coderAndData.coder = coder;
+coderAndData.data = data;
+AnyCoderAndData res = new AnyCoderAndData();
+res.coderAndData = coderAndData;
+return res;
+  }
+
+  private static final List TEST_DATA =
+  Arrays.asList(
+  coderAndData(
+  VarIntCoder.of(), Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, 
Integer.MIN_VALUE)),
+  coderAndData(
+  BigEndianLongCoder.of(),
+  Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE)),
+  coderAndData(StringUtf8Coder.of(), Arrays.asList("", "hello", 
"goodbye", "1")),
+  coderAndData(
+  KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
+  Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", 
Integer.MAX_VALUE))),
+  coderAndData(
+  ListCoder.of(VarLongCoder.of()),
+  Arrays.asList(Arrays.asList(1L, 2L, 3L), 
Collections.emptyList(;
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testDecodeEncodeEqual() throws Exception {
+
+Instant time = new Instant("2000-01-01");
+
+for (AnyCoderAndData keyCoderAndData : TEST_DATA) {
+  Coder keyCoder = keyCoderAndData.coderAndData.coder;
+  for (Object key : keyCoderAndData.coderAndData.data) {
+for (AnyCoderAndData valueCoderAndData : TEST_DATA) {
+  Coder valueCoder = valueCoderAndData.coderAndData.coder;
+  for (Object value : valueCoderAndData.coderAndData.data) {
+CoderProperties.coderDecodeEncodeEqual(
+BiTemporalJoinResultCoder.of(keyCoder, valueCoder, valueCoder),
+BiTemporalJoinResult.of()
+.setLeftData(KV.of(key, value), time)
+.setRightData(KV.of(key, value), time));
+  }
+}
+  }
+}
+  }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception 
{
+CoderProperties.coderSerializable(
+BiTemporalJoinResultCoder.of(
+GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE, 
GlobalWindow.Coder.INSTANCE));
+  }
+
+  /** 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=274753=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-274753
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 10/Jul/19 09:13
Start Date: 10/Jul/19 09:13
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032
 
 
   The extentions/timeseries is intended to be a incubation area for utilities 
that assist with working with time series data
   This sample will take two streams left stream and right stream. It will 
match the left stream to the nearest right stream based on their timestamp. 
Nearest left stream is either <= to the right stream timestamp.
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [x] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/)
 | --- | [![Build 

[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=274749=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-274749
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 10/Jul/19 09:07
Start Date: 10/Jul/19 09:07
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on issue #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032#issuecomment-509978751
 
 
   @reuvenlax , @kennknowles 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 274749)
Time Spent: 20m  (was: 10m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=274750=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-274750
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 10/Jul/19 09:07
Start Date: 10/Jul/19 09:07
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 274750)
Time Spent: 0.5h  (was: 20m)

> Add Utility BiTemporalStreamJoin
> 
>
> Key: BEAM-7386
> URL: https://issues.apache.org/jira/browse/BEAM-7386
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-ideas
>Affects Versions: 2.12.0
>Reporter: Reza ardeshir rokni
>Assignee: Reza ardeshir rokni
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Add utility class that enables a temporal join between two streams where 
> Stream A is matched to Stream B where
> A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) 
> This will use the following overall flow:
> KV(key, Timestamped) 
> | Window
> | GBK
> | Statefull DoFn
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin

2019-07-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=274746=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-274746
 ]

ASF GitHub Bot logged work on BEAM-7386:


Author: ASF GitHub Bot
Created on: 10/Jul/19 09:05
Start Date: 10/Jul/19 09:05
Worklog Time Spent: 10m 
  Work Description: rezarokni commented on pull request #9032: [BEAM-7386] 
Bi-Temporal Join
URL: https://github.com/apache/beam/pull/9032
 
 
   The extentions/timeseries is intended to be a incubation area for utilities 
that assist with working with time series data
   This sample will take two streams left stream and right stream. It will 
match the left stream to the nearest right stream based on their timestamp. 
Nearest left stream is either <= to the right stream timestamp.
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/)
 | --- | [![Build