The records have a property value in common, yes. For example, record[“record_kind”] == “foo” or record[“record_kind”] == “bar.” However, I’d be curious if the answer changes if I wanted to do this globally for the whole stream.
Thanks! Ray From: Lukasz Cwik <[email protected]> Reply-To: "[email protected]" <[email protected]> Date: Wednesday, December 21, 2016 at 5:52 PM To: "[email protected]" <[email protected]> Subject: Re: One-to-many mapping between unbounded input source and pipelines with session windows My first question was about how do you know two or more records are related or is this global for the entire stream? The reason I was asking about whether you can map the qualifiers onto a fixed set of states is because I was wondering if there was a way to either use the State API (WIP https://issues.apache.org/jira/browse/BEAM-25) and timers API (WIP https://issues.apache.org/jira/browse/BEAM-27) and just transition between a fixed number of states or create composite session keys based upon the "id" plus some small set of qualifiers and do a GBK to do a join. In this example, how do you know the two records are related to each other (do the share a common attribute or can a common attribute be computed)? - Any time we see a record with record[“id”] == 1 && record[“field_6”] == “some_value” *not* followed by a record with record[“id”] == 2 && record[“field_7”] == “other_value” in the subsequent 10 minutes. On Wed, Dec 21, 2016 at 2:14 PM, Ray Ruvinskiy <[email protected]> wrote: I’m unsure about your first question. Are you asking whether there’s an attribute that all the records have in common? I think I’m looking for more flexibility than a fixed set of values but perhaps I’m overlooking something. To flesh out the example, let’s say the records are JSON documents, with fields. So, to express my examples again, I want to know: - Any time we see record_1[“type”] == “type1” && record_1[“field1”] == “value1”, followed within no more than a minute by record_2[“type”] == “type1” && record_2[“field2”].contains(“some_substring”), followed within no more than 5 minutes by record_3[“type”] == “type2” && record_3[“field3”] == “value3” - Any time we see N records where record[“id”] == 123 within 5 hours of each other, followed by another record with record[“id”] == 456 no more than an hour later than the group of N records - Any time we see a record with record[“id”] == 1 && record[“field_6”] == “some_value” *not* followed by a record with record[“id”] == 2 && record[“field_7”] == “other_value” in the subsequent 10 minutes. If data is late, *ideally* it’s taken into account, but we don’t need to account for data being late for an arbitrary amount of time. We can say that if a data is, for instance, less than an hour later it should be taken into account, but if it’s more than an hour late we can ignore it. Thanks! Ray From: Lukasz Cwik <[email protected]> Reply-To: "[email protected]" <[email protected]> Date: Wednesday, December 21, 2016 at 4:47 PM To: "[email protected]" <[email protected]> Subject: Re: One-to-many mapping between unbounded input source and pipelines with session windows Do the records have another attribute Z which joins them all together? Are the set of attributes A, B, C, X, Y, K, L are from a fixed set of values like enums or can be mapped onto a certain number of states (like an attribute A > 50 can be mapped onto a state "exceeds threshold")? For your examples, what should occur when there is late data in your three scenarios? On Wed, Dec 21, 2016 at 9:05 AM, Ray Ruvinskiy <[email protected]> wrote: Hello, I am trying to figure out if Apache Beam is the right framework for my use case. I have an unbounded stream, and there are a number of questions I would like to ask regarding the records in the stream: - For example, one question may be trying to find a record with attribute A followed within no more than a minute by a record with attribute B followed within no more than 5 minutes by a record with attribute C. - Another question may be trying to find a sequence of at least N records with attribute X within 5 hours of each other, followed by an record with attribute Y no more than an hour later. - A third question would be seeing if there exist a record with attribute K *not* followed by a record with attribute L in the next 10 minutes. Every time I encounter the pattern of records I’m looking for, I would like to perform an action. If I understand the Beam model correctly, each question would correspond to a separate pipeline I would create, and it also sounds like I’m looking for session windows. However, I’m assuming I would need to tee the input source to all the separate pipelines? I have tried to look for documentation and/or examples on whether Apache Beam can be used to express such a setup and how to do it if so, but I haven’t been able to find anything concrete. Any help would be appreciated. Thanks! Ray
