[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=332481=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-332481 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 23/Oct/19 07:43 Start Date: 23/Oct/19 07:43 Worklog Time Spent: 10m Work Description: rezarokni commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-545314318 Dataflow Runner and Flink Runner all pass the tests, as does the current DirectRunner . Will add some more tests to this PR to confirm and reopen so we can start the library extension. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 332481) Time Spent: 7h 50m (was: 7h 40m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 7h 50m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=331110=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331110 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 20/Oct/19 20:55 Start Date: 20/Oct/19 20:55 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331110) Time Spent: 7h 40m (was: 7.5h) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 7h 40m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=331109=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331109 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 20/Oct/19 20:55 Start Date: 20/Oct/19 20:55 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-544291666 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331109) Time Spent: 7.5h (was: 7h 20m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=327560=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327560 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 13/Oct/19 20:02 Start Date: 13/Oct/19 20:02 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-541453259 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 327560) Time Spent: 7h 20m (was: 7h 10m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 7h 20m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294974=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294974 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 14/Aug/19 19:34 Start Date: 14/Aug/19 19:34 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r313952183 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int LEFT_STREAM_GC_LIMIT = 1000; + +Coder leftCoder; +Coder rightCoder; + +TupleTag leftTag; +TupleTag rightTag; + +Duration window; + +private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, Duration window) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.window = window; +} + +public static BiTemporalJoin create( +TupleTag leftTag, TupleTag rightTag, Duration window) { + return new BiTemporalJoin(leftTag, rightTag, window); +} + +public BiTemporalJoin setGCLimit(int gcLimit) { + LEFT_STREAM_GC_LIMIT = gcLimit; + return this; +} + +@Override +public PCollection> expand(KeyedPCollectionTuple input) { + + List> collections = + input.getKeyedCollections(); + + PCollection> leftCollection = null; + PCollection> rightCollection = null; + + for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) { + +if (t.getTupleTag().equals(leftTag)) { + leftCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + +if (t.getTupleTag().equals(rightTag)) { + rightCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + } + + leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder(); + rightCoder = ((KvCoder) rightCollection.getCoder()).getValueCoder(); + + BiTemporalJoinResultCoder
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294816=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294816 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 14/Aug/19 15:54 Start Date: 14/Aug/19 15:54 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r313952183 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int LEFT_STREAM_GC_LIMIT = 1000; + +Coder leftCoder; +Coder rightCoder; + +TupleTag leftTag; +TupleTag rightTag; + +Duration window; + +private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, Duration window) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.window = window; +} + +public static BiTemporalJoin create( +TupleTag leftTag, TupleTag rightTag, Duration window) { + return new BiTemporalJoin(leftTag, rightTag, window); +} + +public BiTemporalJoin setGCLimit(int gcLimit) { + LEFT_STREAM_GC_LIMIT = gcLimit; + return this; +} + +@Override +public PCollection> expand(KeyedPCollectionTuple input) { + + List> collections = + input.getKeyedCollections(); + + PCollection> leftCollection = null; + PCollection> rightCollection = null; + + for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) { + +if (t.getTupleTag().equals(leftTag)) { + leftCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + +if (t.getTupleTag().equals(rightTag)) { + rightCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + } + + leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder(); + rightCoder = ((KvCoder) rightCollection.getCoder()).getValueCoder(); + + BiTemporalJoinResultCoder
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294807=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294807 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 14/Aug/19 15:28 Start Date: 14/Aug/19 15:28 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r313938990 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int LEFT_STREAM_GC_LIMIT = 1000; + +Coder leftCoder; +Coder rightCoder; + +TupleTag leftTag; +TupleTag rightTag; + +Duration window; + +private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, Duration window) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.window = window; +} + +public static BiTemporalJoin create( +TupleTag leftTag, TupleTag rightTag, Duration window) { + return new BiTemporalJoin(leftTag, rightTag, window); +} + +public BiTemporalJoin setGCLimit(int gcLimit) { + LEFT_STREAM_GC_LIMIT = gcLimit; + return this; +} + +@Override +public PCollection> expand(KeyedPCollectionTuple input) { + + List> collections = + input.getKeyedCollections(); + + PCollection> leftCollection = null; + PCollection> rightCollection = null; + + for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) { + +if (t.getTupleTag().equals(leftTag)) { + leftCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + +if (t.getTupleTag().equals(rightTag)) { + rightCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + } + + leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder(); + rightCoder = ((KvCoder) rightCollection.getCoder()).getValueCoder(); + + BiTemporalJoinResultCoder
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294775=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294775 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 14/Aug/19 14:29 Start Date: 14/Aug/19 14:29 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r313900817 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int LEFT_STREAM_GC_LIMIT = 1000; + +Coder leftCoder; +Coder rightCoder; + +TupleTag leftTag; +TupleTag rightTag; + +Duration window; + +private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, Duration window) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.window = window; +} + +public static BiTemporalJoin create( +TupleTag leftTag, TupleTag rightTag, Duration window) { + return new BiTemporalJoin(leftTag, rightTag, window); +} + +public BiTemporalJoin setGCLimit(int gcLimit) { + LEFT_STREAM_GC_LIMIT = gcLimit; + return this; +} + +@Override +public PCollection> expand(KeyedPCollectionTuple input) { + + List> collections = + input.getKeyedCollections(); + + PCollection> leftCollection = null; + PCollection> rightCollection = null; + + for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) { + +if (t.getTupleTag().equals(leftTag)) { + leftCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + +if (t.getTupleTag().equals(rightTag)) { + rightCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + } + + leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder(); + rightCoder = ((KvCoder) rightCollection.getCoder()).getValueCoder(); + + BiTemporalJoinResultCoder
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294771=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294771 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 14/Aug/19 14:29 Start Date: 14/Aug/19 14:29 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r313893768 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int LEFT_STREAM_GC_LIMIT = 1000; + +Coder leftCoder; +Coder rightCoder; + +TupleTag leftTag; +TupleTag rightTag; + +Duration window; + +private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, Duration window) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.window = window; +} + +public static BiTemporalJoin create( +TupleTag leftTag, TupleTag rightTag, Duration window) { + return new BiTemporalJoin(leftTag, rightTag, window); +} + +public BiTemporalJoin setGCLimit(int gcLimit) { + LEFT_STREAM_GC_LIMIT = gcLimit; + return this; +} + +@Override +public PCollection> expand(KeyedPCollectionTuple input) { + + List> collections = + input.getKeyedCollections(); + + PCollection> leftCollection = null; + PCollection> rightCollection = null; + + for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) { + +if (t.getTupleTag().equals(leftTag)) { + leftCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + +if (t.getTupleTag().equals(rightTag)) { + rightCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + } + + leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder(); + rightCoder = ((KvCoder) rightCollection.getCoder()).getValueCoder(); + + BiTemporalJoinResultCoder
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294776=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294776 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 14/Aug/19 14:29 Start Date: 14/Aug/19 14:29 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r313896435 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int LEFT_STREAM_GC_LIMIT = 1000; + +Coder leftCoder; +Coder rightCoder; + +TupleTag leftTag; +TupleTag rightTag; + +Duration window; + +private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, Duration window) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.window = window; +} + +public static BiTemporalJoin create( +TupleTag leftTag, TupleTag rightTag, Duration window) { + return new BiTemporalJoin(leftTag, rightTag, window); Review comment: ```suggestion return new BiTemporalJoin<>(leftTag, rightTag, window); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 294776) Time Spent: 6h 20m (was: 6h 10m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 6h 20m > Remaining
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294774=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294774 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 14/Aug/19 14:29 Start Date: 14/Aug/19 14:29 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r313899337 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int LEFT_STREAM_GC_LIMIT = 1000; + +Coder leftCoder; +Coder rightCoder; + +TupleTag leftTag; +TupleTag rightTag; + +Duration window; + +private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, Duration window) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.window = window; +} + +public static BiTemporalJoin create( +TupleTag leftTag, TupleTag rightTag, Duration window) { + return new BiTemporalJoin(leftTag, rightTag, window); +} + +public BiTemporalJoin setGCLimit(int gcLimit) { + LEFT_STREAM_GC_LIMIT = gcLimit; + return this; +} + +@Override +public PCollection> expand(KeyedPCollectionTuple input) { + + List> collections = + input.getKeyedCollections(); + + PCollection> leftCollection = null; + PCollection> rightCollection = null; + + for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) { + +if (t.getTupleTag().equals(leftTag)) { + leftCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + +if (t.getTupleTag().equals(rightTag)) { + rightCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + } + + leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder(); + rightCoder = ((KvCoder) rightCollection.getCoder()).getValueCoder(); + + BiTemporalJoinResultCoder
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294772=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294772 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 14/Aug/19 14:29 Start Date: 14/Aug/19 14:29 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r313901332 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int LEFT_STREAM_GC_LIMIT = 1000; + +Coder leftCoder; +Coder rightCoder; + +TupleTag leftTag; +TupleTag rightTag; + +Duration window; + +private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, Duration window) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.window = window; +} + +public static BiTemporalJoin create( +TupleTag leftTag, TupleTag rightTag, Duration window) { + return new BiTemporalJoin(leftTag, rightTag, window); +} + +public BiTemporalJoin setGCLimit(int gcLimit) { + LEFT_STREAM_GC_LIMIT = gcLimit; + return this; +} + +@Override +public PCollection> expand(KeyedPCollectionTuple input) { + + List> collections = + input.getKeyedCollections(); + + PCollection> leftCollection = null; + PCollection> rightCollection = null; + + for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) { + +if (t.getTupleTag().equals(leftTag)) { + leftCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + +if (t.getTupleTag().equals(rightTag)) { + rightCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + } + + leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder(); + rightCoder = ((KvCoder) rightCollection.getCoder()).getValueCoder(); + + BiTemporalJoinResultCoder
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294777=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294777 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 14/Aug/19 14:29 Start Date: 14/Aug/19 14:29 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r313897433 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int LEFT_STREAM_GC_LIMIT = 1000; + +Coder leftCoder; +Coder rightCoder; + +TupleTag leftTag; +TupleTag rightTag; + +Duration window; Review comment: Hm, I'm wondering, does this have to work in FixedWindows? I think there should be no blocker for this to work on GlobalWindows as well. Even for any other windows, provided the windowFn is the same on both sides, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 294777) Time Spent: 6.5h (was: 6h 20m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294778=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294778 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 14/Aug/19 14:29 Start Date: 14/Aug/19 14:29 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r313901093 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int LEFT_STREAM_GC_LIMIT = 1000; + +Coder leftCoder; +Coder rightCoder; + +TupleTag leftTag; +TupleTag rightTag; + +Duration window; + +private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, Duration window) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.window = window; +} + +public static BiTemporalJoin create( +TupleTag leftTag, TupleTag rightTag, Duration window) { + return new BiTemporalJoin(leftTag, rightTag, window); +} + +public BiTemporalJoin setGCLimit(int gcLimit) { + LEFT_STREAM_GC_LIMIT = gcLimit; + return this; +} + +@Override +public PCollection> expand(KeyedPCollectionTuple input) { + + List> collections = + input.getKeyedCollections(); + + PCollection> leftCollection = null; + PCollection> rightCollection = null; + + for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) { + +if (t.getTupleTag().equals(leftTag)) { + leftCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + +if (t.getTupleTag().equals(rightTag)) { + rightCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + } + + leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder(); + rightCoder = ((KvCoder) rightCollection.getCoder()).getValueCoder(); + + BiTemporalJoinResultCoder
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=294773=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294773 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 14/Aug/19 14:29 Start Date: 14/Aug/19 14:29 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r313895681 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin Review comment: Should this be called `BiTemporalLeftJoin`? It seems not to work equally in both directions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 294773) Time Spent: 5h 50m (was: 5h 40m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=293688=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-293688 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 13/Aug/19 07:32 Start Date: 13/Aug/19 07:32 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r313254684 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int GC_LIMIT = 1000; Review comment: Looks like one of the keys is no longer getting processed. Thats the symptom, digging into the cause. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 293688) Time Spent: 5h 20m (was: 5h 10m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=284960=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-284960 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 30/Jul/19 14:02 Start Date: 30/Jul/19 14:02 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r308739373 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int GC_LIMIT = 1000; Review comment: @rezarokni I have managed to (hopefully finally) fix the issue with timers. Related PR is #9190, I cherry picked the two commits into PR rezarokni#2 and the failing tests are green on my local machine, with the exception of `BiTemporalCacheTest.cacheTest`, which is still failing (consistently). Don't know what is the issue with that, but I hope it is no longer related to the timer ordering. Could you validate that? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 284960) Time Spent: 5h 10m (was: 5h) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp =
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=284392=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-284392 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 29/Jul/19 16:41 Start Date: 29/Jul/19 16:41 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r308327824 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int GC_LIMIT = 1000; + +Coder leftCoder; +Coder rightCoder; + +TupleTag leftTag; +TupleTag rightTag; + +Duration window; + +private BiTemporalJoin(TupleTag leftTag, TupleTag rightTag, Duration window) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.window = window; +} + +public static BiTemporalJoin create( +TupleTag leftTag, TupleTag rightTag, Duration window) { + return new BiTemporalJoin(leftTag, rightTag, window); +} + +public BiTemporalJoin setGCLimit(int gcLimit) { + GC_LIMIT = gcLimit; + return this; +} + +@Override +public PCollection> expand(KeyedPCollectionTuple input) { + + List> collections = + input.getKeyedCollections(); + + PCollection> leftCollection = null; + PCollection> rightCollection = null; + + for (KeyedPCollectionTuple.TaggedKeyedPCollection t : collections) { + +if (t.getTupleTag().equals(leftTag)) { + leftCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + +if (t.getTupleTag().equals(rightTag)) { + rightCollection = + ((KeyedPCollectionTuple.TaggedKeyedPCollection) t).getCollection(); +} + } + + leftCoder = ((KvCoder) leftCollection.getCoder()).getValueCoder(); + rightCoder = ((KvCoder) rightCollection.getCoder()).getValueCoder(); + + BiTemporalJoinResultCoder biStreamJoinResultCoder = +
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=284363=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-284363 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 29/Jul/19 16:10 Start Date: 29/Jul/19 16:10 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r308314412 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int GC_LIMIT = 1000; Review comment: With regards to Je-ik first comment, @amaliujia @kennknowles had a quick glance at the StatefullDoFnRunner, it uses a Timer to do GC. I dont think all runners use Timers for this operation, but as je-ik mentioned some will do ... If I recall there is some issues with how multiple timers will interact with each other, when a Timer resets itself. Should this be standardized, as I think right now this could potentially mean things will work on some runners and not on others? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 284363) Time Spent: 4h 50m (was: 4h 40m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=284359=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-284359 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 29/Jul/19 16:03 Start Date: 29/Jul/19 16:03 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r308310753 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int GC_LIMIT = 1000; Review comment: OOO at the moment, but the current experimental implementation does not deal with late data ( I think I had a note in the java doc). When I am back I will add a check to throw illegal state if late data is not set to ZERO. Until we have retractions the best way that we can deal with late data is to deadletter it. The GC_LIMIT ( which should have a better name, will fix that when I am back) is actually to reduce the amount of processing needed in sorting the left hand side. As the whole list needs to be sorted across each finish bundle, but once an element is processed it is no longer needed. Once we have Sorted Map State most of the code for this class can be deleted, as there will be no need for the cache that we have to use at the moment. :-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 284359) Time Spent: 4h 40m (was: 4.5h) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement >
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=283817=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-283817 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 27/Jul/19 20:13 Start Date: 27/Jul/19 20:13 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r307975929 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int GC_LIMIT = 1000; Review comment: Do I understand this correctly, that this kind of supplies allowed lateness? So that elements, from right side that are too late will not be joined, because the left side will already have been garbage collected? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 283817) Time Spent: 4.5h (was: 4h 20m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=283816=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-283816 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 27/Jul/19 20:13 Start Date: 27/Jul/19 20:13 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r307976024 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalStreams.java ## @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.util.*; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This sample will take two streams left stream and right stream. It will match the left stream to + * the nearest right stream based on their timestamp. Nearest left stream is either <= to the right + * stream timestamp. + * + * If two values in the right steam have the same timestamp then the results are + * non-deterministic. + */ +@Experimental +public class BiTemporalStreams { + + public static BiTemporalJoin join( + TupleTag leftTag, TupleTag rightTag, Duration window) { +return new BiTemporalJoin<>(leftTag, rightTag, window); + } + + @Experimental + public static class BiTemporalJoin + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(BiTemporalStreams.class); + +// Sets the limit at which point the processed left stream values are garbage collected +static int GC_LIMIT = 1000; Review comment: And if so - would this be working correctly on batch? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 283816) Time Spent: 4h 20m (was: 4h 10m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=283762=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-283762 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 27/Jul/19 08:12 Start Date: 27/Jul/19 08:12 Worklog Time Spent: 10m Work Description: rezarokni commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-515664865 @amaliujia thanx! @kennknowles would you have time for a review please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 283762) Time Spent: 4h 10m (was: 4h) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=282784=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-282784 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 25/Jul/19 17:24 Start Date: 25/Jul/19 17:24 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-515134878 @rezarokni You have to find a committer as well on this PR to review/merge it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 282784) Time Spent: 4h (was: 3h 50m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=281353=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-281353 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 23/Jul/19 22:21 Start Date: 23/Jul/19 22:21 Worklog Time Spent: 10m Work Description: rezarokni commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-514405257 > @rezarokni > > Was seeing not all checks succeeded so thought you worked on making those checks pass. But just clicked into logs and looked like they are not relevant, so will take a closer look on main stream class soon. There are some check style changes that need to be made, if OK I will fixup with your next comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 281353) Time Spent: 3h 50m (was: 3h 40m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=281307=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-281307 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 23/Jul/19 20:45 Start Date: 23/Jul/19 20:45 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-514376941 @rezarokni Was seeing not all checks succeeded so thought you worked on making those checks pass. But just clicked into logs and looked like they are not relevant, so will take a closer look on main stream class soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 281307) Time Spent: 3h 40m (was: 3.5h) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=281304=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-281304 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 23/Jul/19 20:43 Start Date: 23/Jul/19 20:43 Worklog Time Spent: 10m Work Description: rezarokni commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-514376058 @amaliujia Would you be able to have a look at the main stream class this week? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 281304) Time Spent: 3.5h (was: 3h 20m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=281303=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-281303 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 23/Jul/19 20:42 Start Date: 23/Jul/19 20:42 Worklog Time Spent: 10m Work Description: rezarokni commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-514375707 @ruwang Any views on the main stream class :-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 281303) Time Spent: 3h 20m (was: 3h 10m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=281302=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-281302 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 23/Jul/19 20:41 Start Date: 23/Jul/19 20:41 Worklog Time Spent: 10m Work Description: rezarokni commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-514375707 @ruwang Any views on the main stream class :-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 281302) Time Spent: 3h 10m (was: 3h) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=276598=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276598 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 15/Jul/19 10:14 Start Date: 15/Jul/19 10:14 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r303367452 ## File path: sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalCacheTestIT.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.io.Serializable; +import org.apache.beam.sdk.extensions.timeseries.joins.BiTemporalTestUtils.QuoteData; +import org.apache.beam.sdk.extensions.timeseries.joins.BiTemporalTestUtils.TradeData; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BiTemporalCacheTestIT implements Serializable { Review comment: I think I found the way this is done. Changed the gradle.build to accommodate,. it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 276598) Time Spent: 3h (was: 2h 50m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=276597=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276597 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 15/Jul/19 10:12 Start Date: 15/Jul/19 10:12 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r303366582 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiStreamJoinResultCoder.java ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.*; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeParameter; + +public class BiStreamJoinResultCoder +extends StructuredCoder> { + + private final Coder leftCoder; + private final Coder rightCoder; + private final Coder keyCoder; + + public BiStreamJoinResultCoder(Coder keyCoder, Coder leftCoder, Coder rightCoder) { +this.leftCoder = NullableCoder.of(leftCoder); +this.rightCoder = NullableCoder.of(rightCoder); +this.keyCoder = NullableCoder.of(keyCoder); + } + + public Coder getLeftCoder() { +return leftCoder; + } + + public Coder getRightCoder() { +return rightCoder; + } + + public Coder getKeyCoder() { +return keyCoder; + } + + public static BiStreamJoinResultCoder of( + Coder keyCoder, Coder leftCoder, Coder rightCoder) { + +return new BiStreamJoinResultCoder<>(keyCoder, leftCoder, rightCoder); + } + + @Override + public void encode(BiTemporalJoinResult value, OutputStream outStream) + throws IOException { + +if (value == null) { Review comment: BiStreamJoinResultCoder file was left over artifact from rename of class. Thanx for catching. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 276597) Time Spent: 2h 50m (was: 2h 40m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=276593=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276593 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 15/Jul/19 09:59 Start Date: 15/Jul/19 09:59 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r303362169 ## File path: sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalCacheTestIT.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.io.Serializable; +import org.apache.beam.sdk.extensions.timeseries.joins.BiTemporalTestUtils.QuoteData; +import org.apache.beam.sdk.extensions.timeseries.joins.BiTemporalTestUtils.TradeData; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BiTemporalCacheTestIT implements Serializable { Review comment: @lukecwik What is the best way to setup IT tests, does the framework just pick these up based on RequiresRunner? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 276593) Time Spent: 2h 40m (was: 2.5h) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=276591=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276591 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 15/Jul/19 09:57 Start Date: 15/Jul/19 09:57 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r303361737 ## File path: sdks/java/extensions/timeseries/build.gradle ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature() + +description = "Apache Beam :: SDKs :: Java :: Extensions :: Timeseries" +ext.summary = """Beam TIMESERIES provides helper utils to deal with timeseries processing""" + +dependencies { + compile library.java.guava + compile project(path: ":sdks:java:core", configuration: "shadow") + // Needed to run the Example. + compile project(path: ":runners:direct-java") + testCompile library.java.hamcrest_core + testCompile library.java.hamcrest_library + testCompile library.java.junit + //testRuntimeOnly project(path: ":runners:direct-java") Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 276591) Time Spent: 2h 20m (was: 2h 10m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=276592=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276592 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 15/Jul/19 09:57 Start Date: 15/Jul/19 09:57 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r303361809 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiStreamJoinResultCoder.java ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.*; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeParameter; + +public class BiStreamJoinResultCoder Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 276592) Time Spent: 2.5h (was: 2h 20m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=276233=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276233 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 13/Jul/19 05:05 Start Date: 13/Jul/19 05:05 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-511089275 Precommits are back This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 276233) Time Spent: 2h 10m (was: 2h) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275673=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275673 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 12/Jul/19 06:40 Start Date: 12/Jul/19 06:40 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-510766791 I am not seeing Java precommit tests are triggered. I am less familiar on how does that is enabled. Guessing have to set at here: https://github.com/apache/beam/blob/master/build.gradle#L132 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 275673) Time Spent: 2h (was: 1h 50m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275663=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275663 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 12/Jul/19 06:31 Start Date: 12/Jul/19 06:31 Worklog Time Spent: 10m Work Description: amaliujia commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r302838285 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiStreamJoinResultCoder.java ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.*; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeParameter; + +public class BiStreamJoinResultCoder +extends StructuredCoder> { + + private final Coder leftCoder; + private final Coder rightCoder; + private final Coder keyCoder; + + public BiStreamJoinResultCoder(Coder keyCoder, Coder leftCoder, Coder rightCoder) { +this.leftCoder = NullableCoder.of(leftCoder); +this.rightCoder = NullableCoder.of(rightCoder); +this.keyCoder = NullableCoder.of(keyCoder); + } + + public Coder getLeftCoder() { +return leftCoder; + } + + public Coder getRightCoder() { +return rightCoder; + } + + public Coder getKeyCoder() { +return keyCoder; + } + + public static BiStreamJoinResultCoder of( + Coder keyCoder, Coder leftCoder, Coder rightCoder) { + +return new BiStreamJoinResultCoder<>(keyCoder, leftCoder, rightCoder); + } + + @Override + public void encode(BiTemporalJoinResult value, OutputStream outStream) + throws IOException { + +if (value == null) { Review comment: key, left and right coders are already `NullableCoder` which tolerates `NULL`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 275663) Time Spent: 1.5h (was: 1h 20m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275667=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275667 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 12/Jul/19 06:31 Start Date: 12/Jul/19 06:31 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-510764741 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 275667) Time Spent: 1h 50m (was: 1h 40m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275665=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275665 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 12/Jul/19 06:31 Start Date: 12/Jul/19 06:31 Worklog Time Spent: 10m Work Description: amaliujia commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r302843754 ## File path: sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalCacheTestIT.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.io.Serializable; +import org.apache.beam.sdk.extensions.timeseries.joins.BiTemporalTestUtils.QuoteData; +import org.apache.beam.sdk.extensions.timeseries.joins.BiTemporalTestUtils.TradeData; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BiTemporalCacheTestIT implements Serializable { Review comment: How does this IT run? Will `./gradlew test` trigger it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 275665) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275666=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275666 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 12/Jul/19 06:31 Start Date: 12/Jul/19 06:31 Worklog Time Spent: 10m Work Description: amaliujia commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r302798339 ## File path: sdks/java/extensions/timeseries/build.gradle ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature() + +description = "Apache Beam :: SDKs :: Java :: Extensions :: Timeseries" +ext.summary = """Beam TIMESERIES provides helper utils to deal with timeseries processing""" + +dependencies { + compile library.java.guava + compile project(path: ":sdks:java:core", configuration: "shadow") + // Needed to run the Example. + compile project(path: ":runners:direct-java") + testCompile library.java.hamcrest_core + testCompile library.java.hamcrest_library + testCompile library.java.junit + //testRuntimeOnly project(path: ":runners:direct-java") Review comment: remove this line? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 275666) Time Spent: 1h 40m (was: 1.5h) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275664=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275664 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 12/Jul/19 06:31 Start Date: 12/Jul/19 06:31 Worklog Time Spent: 10m Work Description: amaliujia commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r302843161 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/joins/BiStreamJoinResultCoder.java ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.*; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeParameter; + +public class BiStreamJoinResultCoder Review comment: `BiStreamJoinResultCoder` and `BiTemporalJoinResultCoder` are duplicates? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 275664) Time Spent: 1.5h (was: 1h 20m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275469=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275469 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 11/Jul/19 19:08 Start Date: 11/Jul/19 19:08 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-510615471 Thank Kenn! Will take a look soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 275469) Time Spent: 1h 20m (was: 1h 10m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275455=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275455 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 11/Jul/19 18:26 Start Date: 11/Jul/19 18:26 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-510600426 I believe you are looking for @amaliujia This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 275455) Time Spent: 1h 10m (was: 1h) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=275259=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-275259 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 11/Jul/19 08:59 Start Date: 11/Jul/19 08:59 Worklog Time Spent: 10m Work Description: rezarokni commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-510397666 @ruwang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 275259) Time Spent: 1h (was: 50m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=274882=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-274882 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 10/Jul/19 14:11 Start Date: 10/Jul/19 14:11 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#discussion_r302086326 ## File path: sdks/java/extensions/timeseries/src/test/java/org/apache/beam/sdk/extensions/timeseries/joins/BiTemporalCoderTests.java ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.timeseries.joins; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.coders.*; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +public class BiTemporalCoderTests implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(BiTemporalCoderTests.class); + + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Test + public void testBeforeValues() {} + + private static class CoderAndData { +Coder coder; +List data; + } + + private static class AnyCoderAndData { +private CoderAndData coderAndData; + } + + private static AnyCoderAndData coderAndData(Coder coder, List data) { +CoderAndData coderAndData = new CoderAndData<>(); +coderAndData.coder = coder; +coderAndData.data = data; +AnyCoderAndData res = new AnyCoderAndData(); +res.coderAndData = coderAndData; +return res; + } + + private static final List TEST_DATA = + Arrays.asList( + coderAndData( + VarIntCoder.of(), Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, Integer.MIN_VALUE)), + coderAndData( + BigEndianLongCoder.of(), + Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE)), + coderAndData(StringUtf8Coder.of(), Arrays.asList("", "hello", "goodbye", "1")), + coderAndData( + KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), + Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", Integer.MAX_VALUE))), + coderAndData( + ListCoder.of(VarLongCoder.of()), + Arrays.asList(Arrays.asList(1L, 2L, 3L), Collections.emptyList(; + + @Test + @SuppressWarnings("rawtypes") + public void testDecodeEncodeEqual() throws Exception { + +Instant time = new Instant("2000-01-01"); + +for (AnyCoderAndData keyCoderAndData : TEST_DATA) { + Coder keyCoder = keyCoderAndData.coderAndData.coder; + for (Object key : keyCoderAndData.coderAndData.data) { +for (AnyCoderAndData valueCoderAndData : TEST_DATA) { + Coder valueCoder = valueCoderAndData.coderAndData.coder; + for (Object value : valueCoderAndData.coderAndData.data) { +CoderProperties.coderDecodeEncodeEqual( +BiTemporalJoinResultCoder.of(keyCoder, valueCoder, valueCoder), +BiTemporalJoinResult.of() +.setLeftData(KV.of(key, value), time) +.setRightData(KV.of(key, value), time)); + } +} + } +} + } + + @Test + public void testCoderIsSerializableWithWellKnownCoderType() throws Exception { +CoderProperties.coderSerializable( +BiTemporalJoinResultCoder.of( +GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE)); + } + + /**
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=274753=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-274753 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 10/Jul/19 09:13 Start Date: 10/Jul/19 09:13 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032 The extentions/timeseries is intended to be a incubation area for utilities that assist with working with time series data This sample will take two streams left stream and right stream. It will match the left stream to the nearest right stream based on their timestamp. Nearest left stream is either <= to the right stream timestamp. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/) | --- | [![Build
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=274749=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-274749 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 10/Jul/19 09:07 Start Date: 10/Jul/19 09:07 Worklog Time Spent: 10m Work Description: rezarokni commented on issue #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032#issuecomment-509978751 @reuvenlax , @kennknowles This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 274749) Time Spent: 20m (was: 10m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=274750=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-274750 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 10/Jul/19 09:07 Start Date: 10/Jul/19 09:07 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 274750) Time Spent: 0.5h (was: 20m) > Add Utility BiTemporalStreamJoin > > > Key: BEAM-7386 > URL: https://issues.apache.org/jira/browse/BEAM-7386 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.12.0 >Reporter: Reza ardeshir rokni >Assignee: Reza ardeshir rokni >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Add utility class that enables a temporal join between two streams where > Stream A is matched to Stream B where > A.timestamp = (max(b.timestamp) where b.timestamp <= a.timestamp) > This will use the following overall flow: > KV(key, Timestamped) > | Window > | GBK > | Statefull DoFn > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7386) Add Utility BiTemporalStreamJoin
[ https://issues.apache.org/jira/browse/BEAM-7386?focusedWorklogId=274746=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-274746 ] ASF GitHub Bot logged work on BEAM-7386: Author: ASF GitHub Bot Created on: 10/Jul/19 09:05 Start Date: 10/Jul/19 09:05 Worklog Time Spent: 10m Work Description: rezarokni commented on pull request #9032: [BEAM-7386] Bi-Temporal Join URL: https://github.com/apache/beam/pull/9032 The extentions/timeseries is intended to be a incubation area for utilities that assist with working with time series data This sample will take two streams left stream and right stream. It will match the left stream to the nearest right stream based on their timestamp. Nearest left stream is either <= to the right stream timestamp. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/) | --- | [![Build