Re: Limited join with stop condition
Hi, Agreed with the others that this does not sound like a good fit... But to explore ideas... One possible (complicated and error prone) way this could be done, ... Beam does not support cycles, but you could use an external unbounded source as a way of sending impulse out and then back into the system to read more data; Assuming you are not using standard Sources IO's and your reading data via a DoFn ( it would not work with the inbuilt Source IO's) : - Create a streaming pipeline that reads from an unbounded source, this source is just used for signals to read more data. - You start the initial read by sending a Start event to the unbounded source - In the pipeline you branch the start event to two DoFns, DoFnReadFromSource1 and DoFnReadFromSource2. These will each read X records, which are then warped in an Event object and sent forward. You will also need to have sequence id's and an EndRead Event object ( in case a source has been exhausted) . - You send the events to a Stateful DoFn (in global window) which does the following: - If Condition not met, send a Start event message back to the unbounded source ( which will result in more data read ) - If Condition is met, send out the joined event and GC data that has been joined. - Keep the other elements around for the next time you send a start event into the unbounded source. I am sure there are many corner cases I have not thought of ... ( for example when both sources are exhausted and you dont have a join condition match, what should it do..) . Also this will result in a pipeline that is always up and running. Cheers Reza On Fri, 11 Oct 2019 at 11:19, Kenneth Knowles wrote: > Interesting! I agree with Luke that it seems not a great fit for Beam in > the most rigorous sense. There are many considerations: > > 1. We assume ParDo has side effects by default. So the model actual > *requires* eager evaluation, not lazy, in order to make all the side > effects happen. But for your case let us assume somehow we know it is > all @Pure. > 2. Lazy evaluation and parallelism are in opposition. In pure computations > like Haskell, literally everything (except monadic sequence) is parallel > for free, but the problem is nothing starts until it is needed so > parallelism requires forcing computations early. > > On the other hand, we can think about ways forward here. A first step is > if the join is a "side lookup join" where we always process all of source 1 > but try to process less of source 2. If source 2 is feeding into a map side > input then this could be lazy in some way. When an element from source 1 > calls the side input lookup it could be a blocking call that triggers reads > from source 2 until a match is found. This computation strategy is > consistent with the model and will read all of source 1 but only the prefix > of source 2 needed to join all of source 1. I think you could implement > this pattern with parallelism on both the main input and side input. Then, > to read less of source 1 you need feedback from the sink to the source. We > have nothing like that... This is all very abstract hypotheticals. > > If we get to practical implementation "today" then every runner pretty > much reads all of a bounded source before even starting the next transform, > no?. I wonder if it makes sense to convert them to unbounded (which is > still allowed to terminate but does not support dynamic splits). Then you > just terminate the pipeline when you have enough output. You will read more > than you need but maybe that is not so bad, and anyhow hard to avoid. Also > a vague idea... > > And I have to ask, though, can you build indices instead of brute force > for the join? > > Kenn > > On Thu, Oct 10, 2019 at 10:47 AM Luke Cwik wrote: > >> This doesn't seem like a good fit for Apache Beam but have you tried: >> * using a StatefulDoFn that performs all the joining and signals the >> service powering the sources to stop sending data once your criteria is met >> (most services powering these sources won't have a way to be controlled >> this way)? >> * using a StatefulDoFn that performs all the joining and to write out the >> data to the output directly and then shutdown the pipeline (you can't have >> any transforms that are after the StatefulDoFn)? >> >> Both of these ideas remove a lot of the parallelism that Apache Beam >> provides. >> >> >> >> On Thu, Oct 10, 2019 at 10:36 AM Alexey Romanenko < >> aromanenko@gmail.com> wrote: >> >>> Hello, >>> >>> We have a use case and it's not clear how it can be solved/implemented >>> with Beam. I count on community help with this, maybe I miss something that >>> lays on the surface. >>> >>> Let’s say, there are two different bounded sources and one join >>> transform (say GBK) downstream. This Join transform is like INNER JOIN >>> which joins elements of two collections only if they have common key >>> (though, it could be any other
Re: Limited join with stop condition
Interesting! I agree with Luke that it seems not a great fit for Beam in the most rigorous sense. There are many considerations: 1. We assume ParDo has side effects by default. So the model actual *requires* eager evaluation, not lazy, in order to make all the side effects happen. But for your case let us assume somehow we know it is all @Pure. 2. Lazy evaluation and parallelism are in opposition. In pure computations like Haskell, literally everything (except monadic sequence) is parallel for free, but the problem is nothing starts until it is needed so parallelism requires forcing computations early. On the other hand, we can think about ways forward here. A first step is if the join is a "side lookup join" where we always process all of source 1 but try to process less of source 2. If source 2 is feeding into a map side input then this could be lazy in some way. When an element from source 1 calls the side input lookup it could be a blocking call that triggers reads from source 2 until a match is found. This computation strategy is consistent with the model and will read all of source 1 but only the prefix of source 2 needed to join all of source 1. I think you could implement this pattern with parallelism on both the main input and side input. Then, to read less of source 1 you need feedback from the sink to the source. We have nothing like that... This is all very abstract hypotheticals. If we get to practical implementation "today" then every runner pretty much reads all of a bounded source before even starting the next transform, no?. I wonder if it makes sense to convert them to unbounded (which is still allowed to terminate but does not support dynamic splits). Then you just terminate the pipeline when you have enough output. You will read more than you need but maybe that is not so bad, and anyhow hard to avoid. Also a vague idea... And I have to ask, though, can you build indices instead of brute force for the join? Kenn On Thu, Oct 10, 2019 at 10:47 AM Luke Cwik wrote: > This doesn't seem like a good fit for Apache Beam but have you tried: > * using a StatefulDoFn that performs all the joining and signals the > service powering the sources to stop sending data once your criteria is met > (most services powering these sources won't have a way to be controlled > this way)? > * using a StatefulDoFn that performs all the joining and to write out the > data to the output directly and then shutdown the pipeline (you can't have > any transforms that are after the StatefulDoFn)? > > Both of these ideas remove a lot of the parallelism that Apache Beam > provides. > > > > On Thu, Oct 10, 2019 at 10:36 AM Alexey Romanenko < > aromanenko@gmail.com> wrote: > >> Hello, >> >> We have a use case and it's not clear how it can be solved/implemented >> with Beam. I count on community help with this, maybe I miss something that >> lays on the surface. >> >> Let’s say, there are two different bounded sources and one join transform >> (say GBK) downstream. This Join transform is like INNER JOIN which joins >> elements of two collections only if they have common key (though, it could >> be any other join logic there, doesn’t matter). What matters is that this >> Join has to return only N records as output and then we have to stop >> pipeline after they have been processed. It means that, in the best case, >> we need to read only N records from every source, join them and move >> downstream and after pipeline should be stopped. In other cases, if some >> records don’t have common key in other collection, we need to read another >> bunch of records and see if it would be enough to have N joined records >> after Join. >> >> Below, here is a simple example of this. Say, every source contains 1M of >> records but after Join we need to have only 1K of joined records. So, we >> don’t want to read all two millions from 2 sources in case if we can have >> an output after reading much less records in the end. So, 1K of joined >> records is a stop condition. >> >> 1M >> — >> | Source 1 | >> — | ——— >> |———> | Join |———> Output 1K and stop >> 1M | ——— >> — | >> | Source 2 | >> — >> >> So, it looks like I need to have ability to read new portion of data "on >> demand” or like to have a back pressure mechanizm which signals from >> downstream to upstream that “please, give me only N elements and then wait >> until I ask for more”. I’m not sure that Beam supports something like this. >> >> As an idea, I was trying to split initial inputs into fixed Windows with >> trigger “AfterPane.elementCountAtLeast(N)” to read data by limited batches >> and use another “AfterPane.elementCountAtLeast(N)” after Join which should >> trigger only once. It doesn’t work and still, it won’t read data “on >> demand” and stop the whole pipeline, I guess. >> >> Do you think it can be feasible to do in Beam?
Re: Spring with Apache Beam
You shouldn't need to call it before running the pipeline as you are doing (you can if you want but its not necessary). Have you created a service META-INF entry for the JvmInitializer you have created or are using @AutoService? This is the relevant bit of the documentation[1]. Here is some good docs for how to use @AutoService[2]. 1: https://github.com/apache/beam/blob/f3ce8669b50837d48ab0d0ee9a1298ce3b5bc61c/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java#L30 2: https://github.com/google/auto/tree/master/service On Thu, Oct 10, 2019 at 5:29 PM Jitendra kumavat wrote: > Hi Luke, > > I tried the JvmIntializer.beforeProccssing method to initialize the spring > application context. But it seems not working. > > > * Below is my class definitions. * > > JvmInitializer class with context initialization. > > public class GcmJvmInitializer implements JvmInitializer { > @Override > public void beforeProcessing(PipelineOptions options){ > System.out.println("Starting Custom GcmJvmInitializer"); > ApplicationContext applicationContext = new > AnnotationConfigApplicationContext( > AppConfig.class); > for (String beanName : applicationContext.getBeanDefinitionNames()) { > System.out.println(beanName); > } > System.out.println("Stopping Custom GcmJvmInitializer"); > } > } > > > *Appconfig* > > @Configuration > @PropertySource("classpath:gcm.properties") > @ComponentScan(basePackages = {"com.liveramp.intl.gcm"}) > public class AppConfig { > > // Bean definitions with @Bean annotation. > > } > > > And i am using JvmInitializers to inject the same. Below is the my main > method for the same. > > > > public static void main(String[] args) { > > PCollection lines = pipeline.apply("Read Lines", > TextIO.read().from(inputPathProvider)); > > //.. follows other pipeline transformations. > > //run pipeline > JvmInitializers.runBeforeProcessing(options); > pipeline.run().waitUntilFinish(); > > } > > > Is this the right way to use it? Or is there other way round? Please let me > know. > > > Thanks, > > Jitendra > > > On Wed, Oct 9, 2019 at 1:32 PM Luke Cwik wrote: > >> 1. won't work since it is happening at pipeline construction time and not >> pipeline execution time. >> 2. only works if your application context is scoped to the DoFn instance >> and doesn't have things you want to possibly share across DoFn instances. >> >> You could also try and make it a PipelineOption that is tagged >> with @JsonIgnore and also has a @Default.InstanceFactory like this[1]. This >> way when it is accessed by your DoFn it will be initialized for the first >> time and shared within your process. Making it a PipelineOption would also >> allow you to pass in preinitialized versions for testing. >> >> 1: >> https://github.com/apache/beam/blob/8267c223425bc201be700babbe596d133b79686e/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java#L127 >> >> On Wed, Oct 9, 2019 at 1:10 PM Jitendra kumavat >> wrote: >> >>> Hi Luke, >>> >>> Thanks a lot for your reply. >>> I tried couple of options which is as follows. >>> >>> 1. Initialise the context in main method only. and use it. Creating the >>> context: >>> new AnnotationConfigApplicationContext(AppConfig.class); >>> 2. Creating the context on DoFn.Startup method. >>> >>> Unfortunately none of the worked perfectly, later works but it has issue >>> with @ComponentScan. >>> Please let me know your comments for the same. >>> >>> I will also try this JvmInitializer for context initialisation. >>> >>> Thanks, >>> Jitendra >>> >>> On Wed, Oct 9, 2019 at 12:48 PM Luke Cwik wrote: >>> -d...@beam.apache.org, +user@beam.apache.org How are you trying to inject your application context? Have you looked at the JvmInitializer.beforeProcessing[1] to create your application context? 1: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java On Fri, Oct 4, 2019 at 12:32 PM Jitendra kumavat < jkumavat1...@gmail.com> wrote: > Hi, > > I want to add Spring framework in my apache beam project. Somehow i > am unable to inject the Spring Application context to executing ParDo > functions. I couldn't find the way to do so? Can you please let me know > how > to integrate Spring runtime application context with Apache Beam pipeline. > > Thanks, > Jitendra >
ETL with Beam?
Hello, all. I still have not been given the tasking to convert my work project to use Beam, but it is still something that I am looking to do in the fairly near future. Our data workflow consists of ingest and transformation, and I was hoping that there are ETL frameworks that work well with Beam. Does anyone have some recommendations and maybe some samples that show how people might use and ETL framework with Beam? Thanks in advance and have a great day!
Re: Joining PCollections to aggregates of themselves
Looking at the naive solution PCollectionView agg = input .apply(Windows.sliding(10mins, 1sec hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1 .apply(Mean.globally()) .apply(View.asSingleton()); PCollection output = input .apply(ParDo.of(new Joiner().withSideInputs(agg))); the constraint (C) is being violated because of your AfterPane.elementCountAtLeast trigger, which will emit averages before having seen all the values that should contribute to the average. It will emit a (speculative) average as soon as the first element comes in, and at each subsequent element, but this order may not correspond to the order in which elements get passed into the Joiner. Removing this trigger would hold back processing in Joiner until this element (as well as all other elements) in the window have been seen to produce the final average. Your computation seems to be sensitive to the ordering that elements arrive. What would you expect the output to be for Time: 00:08:00 Input: Output: Time: 00:13:00 Input: Output: Time: 00:00:00 Input: Output: Time: 00:02:00 Input: Output: Are you really trying to emit elements with the mean of all elements with timestamp up to 10 minutes prior to the current value? That's a bit different than sliding windows. In that a case you could do something with a Stateful DoFn that buffers elements and for each incoming element sets a timer at T which then reads the buffer, computes the output, and discards elements older than 10 minutes. You could also possibly do this with a custom WindowFn. On Thu, Oct 10, 2019 at 10:56 AM rahul patwari wrote: > > With Stateful DoFn, each instance of DoFn will have elements which belong to > the same window and have the same key. So, the parallelism is limited by [no. > of keys * no. of Windows] On a practical note, no runner actually parallelizes across windows (and indeed sometimes, e.g. for merging windows like sessions, it's not actually possible). > In batch mode, as all the elements belong to the same window, i.e. Global > Window, the parallelism will be limited by the [no. of keys]. So, if you only > have one key, only one instance of DoFn will be running. > > AFAIK, it is not possible to pass the elements through the DoFn in the > desired order. That is correct. > On Thu, Oct 10, 2019 at 10:11 PM Sam Stephens wrote: >> >> Hi Rahul, >> >> Thanks for the response. >> >> I did consider State, but actually I was tentative because of a different >> requirement that I didn't specify - the same pipeline should work for batch >> and stream modes. I'm not sure how Stateful DoFn's behave in the batch >> world: can you get Beam to pass the elements through the DoFn in a desired >> order, e.g. by sorted by event time? >> >> Most aggregations we're likely to be running are per-key rather than global, >> so the parallelism issue might not be such a big deal. >> >> Regards, >> Sam >> >> On Thu, Oct 10, 2019 at 5:30 PM rahul patwari >> wrote: >>> >>> Hi Sam, >>> >>> (Assuming all the tuples have the same key) One solution could be to use >>> ParDo with State(to calculate mean) => For each element as they occur, >>> calculate the Mean(store the sum and count as the state) and emit the tuple >>> with the new average value. >>> But it will limit the parallelism count. >>> >>> Regards, >>> Rahul >>> >>> On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens >>> wrote: My team and I have been puzzling for a while how to solve a specific problem. Say you have an input stream of tuples: And you want to output a stream containing: Where the average is an aggregation over a 10 minute sliding window of the "value" field. There are a couple of extra requirements: A) We want to output a single result for each input tuple B) We want to output a result as early as possible after the input arrives (low latency) C) We want the average value in result_i to have *seen* the value from input_i An illustration of the input stream with corresponding output Time: 00:00:00 Input: Output: Time: 00:02:00 Input: Output: Time: 00:08:00 Input: Output: Time: 00:13:00 Input: Output: The issue we have is that without some magic tricks and hacky code, achieving all 3 extra requirements is tough. A naive solution looks like this (beam pseudo-code): PCollectionView agg = input .apply(Windows.sliding(10mins, 1sec hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1 .apply(Mean.globally()) .apply(View.asSingleton()); PCollection output = input .apply(ParDo.of(new Joiner().withSideInputs(agg))); The problem is that theres a race-condition - input elements can pass through the Joiner DoFn before
Re: Joining PCollections to aggregates of themselves
" input elements can pass through the Joiner DoFn before the sideInput corresponding to that element is present" I don't think this is correct. Runners will evaluate a DoFn with side inputs on elements in a given window only after all side inputs are ready (have triggered at least once) in this window, so your code should be safe. However, runners will not rerun the DoFn with side inputs on subsequent triggerings of the side inputs, so you won't be able to update the results. On Thu, Oct 10, 2019 at 8:45 AM Sam Stephens wrote: > My team and I have been puzzling for a while how to solve a specific > problem. > > Say you have an input stream of tuples: > > > > And you want to output a stream containing: > > > > Where the average is an aggregation over a 10 minute sliding window of the > "value" field. > > There are a couple of extra requirements: > A) We want to output a single result for each input tuple > B) We want to output a result as early as possible after the input arrives > (low latency) > C) We want the average value in result_i to have *seen* the value from > input_i > > An illustration of the input stream with corresponding output > > Time: 00:00:00 > Input: > Output: > > Time: 00:02:00 > Input: > Output: > > Time: 00:08:00 > Input: > Output: > > Time: 00:13:00 > Input: > Output: > > The issue we have is that without some magic tricks and hacky code, > achieving all 3 extra requirements is tough. A naive solution looks like > this (beam pseudo-code): > > > PCollectionView agg = input > .apply(Windows.sliding(10mins, 1sec > hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1 > .apply(Mean.globally()) > .apply(View.asSingleton()); > > PCollection output = input > .apply(ParDo.of(new Joiner().withSideInputs(agg))); > > > The problem is that theres a race-condition - input elements can pass > through the Joiner DoFn before the sideInput corresponding to that element > is present. This makes solving the A, B, C requirements listed above > difficult. > > Has anyone solved a similar problem to this before? Any neat ideas? >
Re: Joining PCollections to aggregates of themselves
With Stateful DoFn, each instance of DoFn will have elements which belong to the same window and have the same key. So, the parallelism is limited by [no. of keys * no. of Windows] In batch mode, as all the elements belong to the same window, i.e. Global Window, the parallelism will be limited by the [no. of keys]. So, if you only have one key, only one instance of DoFn will be running. AFAIK, it is not possible to pass the elements through the DoFn in the desired order. On Thu, Oct 10, 2019 at 10:11 PM Sam Stephens wrote: > Hi Rahul, > > Thanks for the response. > > I did consider State, but actually I was tentative because of a different > requirement that I didn't specify - the same pipeline should work for batch > and stream modes. I'm not sure how Stateful DoFn's behave in the batch > world: can you get Beam to pass the elements through the DoFn in a desired > order, e.g. by sorted by event time? > > Most aggregations we're likely to be running are per-key rather than > global, so the parallelism issue might not be such a big deal. > > Regards, > Sam > > On Thu, Oct 10, 2019 at 5:30 PM rahul patwari > wrote: > >> Hi Sam, >> >> (Assuming all the tuples have the same key) One solution could be to use >> ParDo with State(to calculate mean) => For each element as they occur, >> calculate the Mean(store the sum and count as the state) and emit the tuple >> with the new average value. >> But it will limit the parallelism count. >> >> Regards, >> Rahul >> >> On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens >> wrote: >> >>> My team and I have been puzzling for a while how to solve a specific >>> problem. >>> >>> Say you have an input stream of tuples: >>> >>> >>> >>> And you want to output a stream containing: >>> >>> >>> >>> Where the average is an aggregation over a 10 minute sliding window of >>> the "value" field. >>> >>> There are a couple of extra requirements: >>> A) We want to output a single result for each input tuple >>> B) We want to output a result as early as possible after the input >>> arrives (low latency) >>> C) We want the average value in result_i to have *seen* the value from >>> input_i >>> >>> An illustration of the input stream with corresponding output >>> >>> Time: 00:00:00 >>> Input: >>> Output: >>> >>> Time: 00:02:00 >>> Input: >>> Output: >>> >>> Time: 00:08:00 >>> Input: >>> Output: >>> >>> Time: 00:13:00 >>> Input: >>> Output: >>> >>> The issue we have is that without some magic tricks and hacky code, >>> achieving all 3 extra requirements is tough. A naive solution looks like >>> this (beam pseudo-code): >>> >>> >>> PCollectionView agg = input >>> .apply(Windows.sliding(10mins, 1sec >>> hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1 >>> .apply(Mean.globally()) >>> .apply(View.asSingleton()); >>> >>> PCollection output = input >>> .apply(ParDo.of(new Joiner().withSideInputs(agg))); >>> >>> >>> The problem is that theres a race-condition - input elements can pass >>> through the Joiner DoFn before the sideInput corresponding to that element >>> is present. This makes solving the A, B, C requirements listed above >>> difficult. >>> >>> Has anyone solved a similar problem to this before? Any neat ideas? >>> >>
Re: Limited join with stop condition
This doesn't seem like a good fit for Apache Beam but have you tried: * using a StatefulDoFn that performs all the joining and signals the service powering the sources to stop sending data once your criteria is met (most services powering these sources won't have a way to be controlled this way)? * using a StatefulDoFn that performs all the joining and to write out the data to the output directly and then shutdown the pipeline (you can't have any transforms that are after the StatefulDoFn)? Both of these ideas remove a lot of the parallelism that Apache Beam provides. On Thu, Oct 10, 2019 at 10:36 AM Alexey Romanenko wrote: > Hello, > > We have a use case and it's not clear how it can be solved/implemented > with Beam. I count on community help with this, maybe I miss something that > lays on the surface. > > Let’s say, there are two different bounded sources and one join transform > (say GBK) downstream. This Join transform is like INNER JOIN which joins > elements of two collections only if they have common key (though, it could > be any other join logic there, doesn’t matter). What matters is that this > Join has to return only N records as output and then we have to stop > pipeline after they have been processed. It means that, in the best case, > we need to read only N records from every source, join them and move > downstream and after pipeline should be stopped. In other cases, if some > records don’t have common key in other collection, we need to read another > bunch of records and see if it would be enough to have N joined records > after Join. > > Below, here is a simple example of this. Say, every source contains 1M of > records but after Join we need to have only 1K of joined records. So, we > don’t want to read all two millions from 2 sources in case if we can have > an output after reading much less records in the end. So, 1K of joined > records is a stop condition. > > 1M > — > | Source 1 | > — | ——— > |———> | Join |———> Output 1K and stop > 1M | ——— > — | > | Source 2 | > — > > So, it looks like I need to have ability to read new portion of data "on > demand” or like to have a back pressure mechanizm which signals from > downstream to upstream that “please, give me only N elements and then wait > until I ask for more”. I’m not sure that Beam supports something like this. > > As an idea, I was trying to split initial inputs into fixed Windows with > trigger “AfterPane.elementCountAtLeast(N)” to read data by limited batches > and use another “AfterPane.elementCountAtLeast(N)” after Join which should > trigger only once. It doesn’t work and still, it won’t read data “on > demand” and stop the whole pipeline, I guess. > > Do you think it can be feasible to do in Beam? > Any ideas or advices are very welcomed! > > >
Limited join with stop condition
Hello, We have a use case and it's not clear how it can be solved/implemented with Beam. I count on community help with this, maybe I miss something that lays on the surface. Let’s say, there are two different bounded sources and one join transform (say GBK) downstream. This Join transform is like INNER JOIN which joins elements of two collections only if they have common key (though, it could be any other join logic there, doesn’t matter). What matters is that this Join has to return only N records as output and then we have to stop pipeline after they have been processed. It means that, in the best case, we need to read only N records from every source, join them and move downstream and after pipeline should be stopped. In other cases, if some records don’t have common key in other collection, we need to read another bunch of records and see if it would be enough to have N joined records after Join. Below, here is a simple example of this. Say, every source contains 1M of records but after Join we need to have only 1K of joined records. So, we don’t want to read all two millions from 2 sources in case if we can have an output after reading much less records in the end. So, 1K of joined records is a stop condition. 1M — | Source 1 | — | ——— |———> | Join |———> Output 1K and stop 1M | ——— — | | Source 2 | — So, it looks like I need to have ability to read new portion of data "on demand” or like to have a back pressure mechanizm which signals from downstream to upstream that “please, give me only N elements and then wait until I ask for more”. I’m not sure that Beam supports something like this. As an idea, I was trying to split initial inputs into fixed Windows with trigger “AfterPane.elementCountAtLeast(N)” to read data by limited batches and use another “AfterPane.elementCountAtLeast(N)” after Join which should trigger only once. It doesn’t work and still, it won’t read data “on demand” and stop the whole pipeline, I guess. Do you think it can be feasible to do in Beam? Any ideas or advices are very welcomed!
Re: Joining PCollections to aggregates of themselves
Hi Rahul, Thanks for the response. I did consider State, but actually I was tentative because of a different requirement that I didn't specify - the same pipeline should work for batch and stream modes. I'm not sure how Stateful DoFn's behave in the batch world: can you get Beam to pass the elements through the DoFn in a desired order, e.g. by sorted by event time? Most aggregations we're likely to be running are per-key rather than global, so the parallelism issue might not be such a big deal. Regards, Sam On Thu, Oct 10, 2019 at 5:30 PM rahul patwari wrote: > Hi Sam, > > (Assuming all the tuples have the same key) One solution could be to use > ParDo with State(to calculate mean) => For each element as they occur, > calculate the Mean(store the sum and count as the state) and emit the tuple > with the new average value. > But it will limit the parallelism count. > > Regards, > Rahul > > On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens > wrote: > >> My team and I have been puzzling for a while how to solve a specific >> problem. >> >> Say you have an input stream of tuples: >> >> >> >> And you want to output a stream containing: >> >> >> >> Where the average is an aggregation over a 10 minute sliding window of >> the "value" field. >> >> There are a couple of extra requirements: >> A) We want to output a single result for each input tuple >> B) We want to output a result as early as possible after the input >> arrives (low latency) >> C) We want the average value in result_i to have *seen* the value from >> input_i >> >> An illustration of the input stream with corresponding output >> >> Time: 00:00:00 >> Input: >> Output: >> >> Time: 00:02:00 >> Input: >> Output: >> >> Time: 00:08:00 >> Input: >> Output: >> >> Time: 00:13:00 >> Input: >> Output: >> >> The issue we have is that without some magic tricks and hacky code, >> achieving all 3 extra requirements is tough. A naive solution looks like >> this (beam pseudo-code): >> >> >> PCollectionView agg = input >> .apply(Windows.sliding(10mins, 1sec >> hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1 >> .apply(Mean.globally()) >> .apply(View.asSingleton()); >> >> PCollection output = input >> .apply(ParDo.of(new Joiner().withSideInputs(agg))); >> >> >> The problem is that theres a race-condition - input elements can pass >> through the Joiner DoFn before the sideInput corresponding to that element >> is present. This makes solving the A, B, C requirements listed above >> difficult. >> >> Has anyone solved a similar problem to this before? Any neat ideas? >> >
Re: Joining PCollections to aggregates of themselves
Hi Sam, (Assuming all the tuples have the same key) One solution could be to use ParDo with State(to calculate mean) => For each element as they occur, calculate the Mean(store the sum and count as the state) and emit the tuple with the new average value. But it will limit the parallelism count. Regards, Rahul On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens wrote: > My team and I have been puzzling for a while how to solve a specific > problem. > > Say you have an input stream of tuples: > > > > And you want to output a stream containing: > > > > Where the average is an aggregation over a 10 minute sliding window of the > "value" field. > > There are a couple of extra requirements: > A) We want to output a single result for each input tuple > B) We want to output a result as early as possible after the input arrives > (low latency) > C) We want the average value in result_i to have *seen* the value from > input_i > > An illustration of the input stream with corresponding output > > Time: 00:00:00 > Input: > Output: > > Time: 00:02:00 > Input: > Output: > > Time: 00:08:00 > Input: > Output: > > Time: 00:13:00 > Input: > Output: > > The issue we have is that without some magic tricks and hacky code, > achieving all 3 extra requirements is tough. A naive solution looks like > this (beam pseudo-code): > > > PCollectionView agg = input > .apply(Windows.sliding(10mins, 1sec > hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1 > .apply(Mean.globally()) > .apply(View.asSingleton()); > > PCollection output = input > .apply(ParDo.of(new Joiner().withSideInputs(agg))); > > > The problem is that theres a race-condition - input elements can pass > through the Joiner DoFn before the sideInput corresponding to that element > is present. This makes solving the A, B, C requirements listed above > difficult. > > Has anyone solved a similar problem to this before? Any neat ideas? >
Joining PCollections to aggregates of themselves
My team and I have been puzzling for a while how to solve a specific problem. Say you have an input stream of tuples: And you want to output a stream containing: Where the average is an aggregation over a 10 minute sliding window of the "value" field. There are a couple of extra requirements: A) We want to output a single result for each input tuple B) We want to output a result as early as possible after the input arrives (low latency) C) We want the average value in result_i to have *seen* the value from input_i An illustration of the input stream with corresponding output Time: 00:00:00 Input: Output: Time: 00:02:00 Input: Output: Time: 00:08:00 Input: Output: Time: 00:13:00 Input: Output: The issue we have is that without some magic tricks and hacky code, achieving all 3 extra requirements is tough. A naive solution looks like this (beam pseudo-code): PCollectionView agg = input .apply(Windows.sliding(10mins, 1sec hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1 .apply(Mean.globally()) .apply(View.asSingleton()); PCollection output = input .apply(ParDo.of(new Joiner().withSideInputs(agg))); The problem is that theres a race-condition - input elements can pass through the Joiner DoFn before the sideInput corresponding to that element is present. This makes solving the A, B, C requirements listed above difficult. Has anyone solved a similar problem to this before? Any neat ideas?