The records have a property value in common, yes. For example, 
record[“record_kind”] == “foo” or record[“record_kind”] == “bar.” However, I’d 
be curious if the answer changes if I wanted to do this globally for the whole 
stream.

Thanks!

Ray

From: Lukasz Cwik <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Wednesday, December 21, 2016 at 5:52 PM
To: "[email protected]" <[email protected]>
Subject: Re: One-to-many mapping between unbounded input source and pipelines 
with session windows

My first question was about how do you know two or more records are related or 
is this global for the entire stream? 

The reason I was asking about whether you can map the qualifiers onto a fixed 
set of states is because I was wondering if there was a way to either use the 
State API (WIP https://issues.apache.org/jira/browse/BEAM-25) and timers API 
(WIP https://issues.apache.org/jira/browse/BEAM-27) and just transition between 
a fixed number of states or create composite session keys based upon the "id" 
plus some small set of qualifiers and do a GBK to do a join. 

In this example, how do you know the two records are related to each other (do 
the share a common attribute or can a common attribute be computed)?
- Any time we see a record with record[“id”] == 1 && record[“field_6”] == 
“some_value” *not* followed by a record with record[“id”] == 2 && 
record[“field_7”] == “other_value” in the subsequent 10 minutes.



On Wed, Dec 21, 2016 at 2:14 PM, Ray Ruvinskiy <[email protected]> 
wrote:
I’m unsure about your first question. Are you asking whether there’s an 
attribute that all the records have in common?

I think I’m looking for more flexibility than a fixed set of values but perhaps 
I’m overlooking something. To flesh out the example, let’s say the records are 
JSON documents, with fields. So, to express my examples again, I want to know:
- Any time we see record_1[“type”] == “type1” && record_1[“field1”] == 
“value1”, followed within no more than a minute by record_2[“type”] == “type1” 
&& record_2[“field2”].contains(“some_substring”), followed within no more than 
5 minutes by record_3[“type”] == “type2” && record_3[“field3”] == “value3”
- Any time we see N records where record[“id”] == 123 within 5 hours of each 
other, followed by another record with record[“id”] == 456 no more than an hour 
later than the group of N records
- Any time we see a record with record[“id”] == 1 && record[“field_6”] == 
“some_value” *not* followed by a record with record[“id”] == 2 && 
record[“field_7”] == “other_value” in the subsequent 10 minutes.

If data is late, *ideally* it’s taken into account, but we don’t need to 
account for data being late for an arbitrary amount of time. We can say that if 
a data is, for instance, less than an hour later it should be taken into 
account, but if it’s more than an hour late we can ignore it.

Thanks!

Ray

From: Lukasz Cwik <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Wednesday, December 21, 2016 at 4:47 PM
To: "[email protected]" <[email protected]>
Subject: Re: One-to-many mapping between unbounded input source and pipelines 
with session windows

Do the records have another attribute Z which joins them all together?
Are the set of attributes A, B, C, X, Y, K, L are from a fixed set of values 
like enums or can be mapped onto a certain number of states (like an attribute 
A > 50 can be mapped onto a state "exceeds threshold")?
For your examples, what should occur when there is late data in your three 
scenarios?


On Wed, Dec 21, 2016 at 9:05 AM, Ray Ruvinskiy <[email protected]> 
wrote:
Hello,

I am trying to figure out if Apache Beam is the right framework for my use 
case. I have an unbounded stream, and there are a number of questions I would 
like to ask regarding the records in the stream:

- For example, one question may be trying to find a record with attribute A 
followed within no more than a minute by a record with attribute B followed 
within no more than 5 minutes by a record with attribute C.
- Another question may be trying to find a sequence of at least N records with 
attribute X within 5 hours of each other, followed by an record with attribute 
Y no more than an hour later.
- A third question would be seeing if there exist a record with attribute K 
*not* followed by a record with attribute L in the next 10 minutes.

Every time I encounter the pattern of records I’m looking for, I would like to 
perform an action. If I understand the Beam model correctly, each question 
would correspond to a separate pipeline I would create, and it also sounds like 
I’m looking for session windows. However, I’m assuming I would need to tee the 
input source to all the separate pipelines? I have tried to look for 
documentation and/or examples on whether Apache Beam can be used to express 
such a setup and how to do it if so, but I haven’t been able to find anything 
concrete. Any help would be appreciated.

Thanks!

Ray






Reply via email to