Re: In Flink, is there a way to merge two streams in stateful manner

2023-09-05 Thread Muazim Wani
Thank you for the detailed explanation.

On Tue, 29 Aug 2023 at 22:45, Ken Krugler 
wrote:

> If you need a specific output order, then merge the three streams, key by
> a constant (like 1), and run that into a KeyedProcessFunction.
>
> That function can buffer out-of-order records, and set up a timer to fire
> when it gets a MAX_WATERMARK (which indicates that all streams are
> finished) so that it can flush any pending data from state, in the proper
> order.
>
> You’d have a ListState for the reduced data, and a
> ValueState for the footer.
>
> Once you get the header, you can flush all reduced data, and no longer
> buffer it.
>
> When timer fires, you can flush the footer.
>
> — Ken
>
>
>
> On Aug 10, 2023, at 10:14 PM, Muazim Wani  wrote:
>
> Thank you so much for taking the time to provide me with such a detailed
> response. Your assistance has been incredibly helpful in clarifying my
> understanding!
> Let me provide you with the exact scenario ,  I think there might be some
> misunderstanding. All the streams are bounded and parallelism is set to 1.
> I am  writing to 1 file only. *So the main use case Is to provide support
> for dynamic Headers and footers with Aggregated values. *
> e.g if my input is
>
> Static Header
> JohnDoe, 12
> Alice, 21
>
> My dynamic header is "Dynamic Header" and dynamic Footer is "Dynamic
> footer". *These headers are on top of static headers which are already
> present in the DataStream(bounded).* The output would be like
>
> Dynamic Header 33
> Static Header
> JohnDoe, 12
> Alice, 21
> Dynamic footer
>
> In this particular case I am writing to one file only. I have set my
> parallelism to 1. I have 1 InputDataStream on top of that I have one
> dynamic header and footer stream (*which contains some dynamic params
> such as aggregated value on some fields e.g salary etc*) .*Now I am left
> with three transformed streams in Sink Operator. i.e dynamic HeaderStream
> with aggregated Value 2) input DataStream 3) dynamic Footer stream with
> aggregated Value. *
>
> I could have used String for both Dynamic Headers and footers and emitted
> the headers in open() method and footers in close() method of
> TextOutputFormat, That would have solved my useCase.* But as I get a
> DataStream(with only 1 value i.e final sum) back from Aggregated Values
> (Currently I am using reduce function).* I am adding headers to that
> DataStream only and similarly for footers. Now I am not able to merge them
> while maintaining the order.
>
> Below I have provided my implementation of reduce function
> public DataStream sum(
> SumFunction reduceFunction, DataStream stream) {
>
> DataStream inputRecordTransformedDataStream =
> stream.map(this::transform).returns((TypeInformation) Types.GENERIC
> (Number.class));
>
> return inputRecordTransformedDataStream
> .keyBy(value -> "key")
> .reduce(reduceFunction);
> }
>
>
> Below I am adding my Headers to my Sum Stream
>
> public static DataStream getAggregatedStream(
> String header, DataStream sinkStream) {
>
> return sinkStream
> .keyBy(key -> "value")
> .flatMap(
> (FlatMapFunction)
> (value, out) -> out.collect(PojoClass.builder().data(header +
> value).build()))
> .returns(PojoClass.class);
> }
>
> Add HeaderStream is a Bounded DataStream with Dynamic Headers and
> Aggregated value.
> DataStream headerStream = addHeaderRows(sinkStream);
>
> DataStream footerStream = addFooterRows(finalStream);
>
> DataStream sinkStream;
>
> Thanks a lot for your time and the advice.
> Muazim Wani
>
>
> On Fri, 11 Aug 2023 at 07:45, Hang Ruan  wrote:
>
>> ps: Forget the link: Hybrid Source[1]
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/
>>
>> Hang Ruan  于2023年8月11日周五 10:14写道:
>>
>>> Hi, Muazim.
>>>
>>> I think the Hybird Source[1] may be helpful for your case.
>>>
>>> Best,
>>> Hang
>>>
>>> Ken Krugler  于2023年8月11日周五 04:18写道:
>>>
 As (almost) always, the devil is in the details.

 You haven’t said, but I’m assuming you’re writing out multiple files,
 each with a different schema, as otherwise you could just leverage the
 existing Flink support for CSV.

 So then you could combine the header/footer streams (adding a flag for
 header vs. footer), and connect that to the row data stream, then use a
 KeyedCoProcessFunction (I’m assuming you can key by something that
 identifies which schema). You’d buffer the row data & footer (separately in
 state). You would also need to set up a timer to fire at the max watermark,
 to flush out pending rows/footer when all of the input data has been
 processed.

 After that function you could configure the sink to bucket by the
 target schema.

 — Ken


 On Aug 10, 2023, at 10:41 AM, Muazim Wani  wrote:

 Thanks for the response!
 I have a specific use case where I am writing to a TextFile sink. I
 have a Bounded stream of header data and need  to merge it with 

Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-29 Thread Ken Krugler
If you need a specific output order, then merge the three streams, key by a 
constant (like 1), and run that into a KeyedProcessFunction.

That function can buffer out-of-order records, and set up a timer to fire when 
it gets a MAX_WATERMARK (which indicates that all streams are finished) so that 
it can flush any pending data from state, in the proper order.

You’d have a ListState for the reduced data, and a 
ValueState for the footer.

Once you get the header, you can flush all reduced data, and no longer buffer 
it.

When timer fires, you can flush the footer.

— Ken



> On Aug 10, 2023, at 10:14 PM, Muazim Wani  wrote:
> 
> Thank you so much for taking the time to provide me with such a detailed 
> response. Your assistance has been incredibly helpful in clarifying my 
> understanding! 
> Let me provide you with the exact scenario ,  I think there might be some 
> misunderstanding. All the streams are bounded and parallelism is set to 1. I 
> am  writing to 1 file only. So the main use case Is to provide support for 
> dynamic Headers and footers with Aggregated values. 
> e.g if my input is
> 
> Static Header
> JohnDoe, 12
> Alice, 21
> 
> My dynamic header is "Dynamic Header" and dynamic Footer is "Dynamic footer". 
> These headers are on top of static headers which are already present in the 
> DataStream(bounded). The output would be like
> 
> Dynamic Header 33
> Static Header
> JohnDoe, 12
> Alice, 21
> Dynamic footer
> 
> In this particular case I am writing to one file only. I have set my 
> parallelism to 1. I have 1 InputDataStream on top of that I have one dynamic 
> header and footer stream (which contains some dynamic params such as 
> aggregated value on some fields e.g salary etc) .Now I am left with three 
> transformed streams in Sink Operator. i.e dynamic HeaderStream with 
> aggregated Value 2) input DataStream 3) dynamic Footer stream with aggregated 
> Value. 
> 
> I could have used String for both Dynamic Headers and footers and emitted the 
> headers in open() method and footers in close() method of TextOutputFormat, 
> That would have solved my useCase. But as I get a DataStream(with only 1 
> value i.e final sum) back from Aggregated Values (Currently I am using reduce 
> function). I am adding headers to that DataStream only and similarly for 
> footers. Now I am not able to merge them while maintaining the order. 
> 
> Below I have provided my implementation of reduce function
> public DataStream sum(
> SumFunction reduceFunction, DataStream stream) {
> 
>   DataStream inputRecordTransformedDataStream =
>   stream.map(this::transform).returns((TypeInformation) 
> Types.GENERIC(Number.class));
> 
>   return inputRecordTransformedDataStream
>   .keyBy(value -> "key")
>   .reduce(reduceFunction);
> }
> 
> 
> Below I am adding my Headers to my Sum Stream
> 
> public static DataStream getAggregatedStream(
> String header, DataStream sinkStream) {
> 
>   return sinkStream
>   .keyBy(key -> "value")
>   .flatMap(
>   (FlatMapFunction)
>   (value, out) -> out.collect(PojoClass.builder().data(header + 
> value).build()))
>   .returns(PojoClass.class);
> }
> 
> Add HeaderStream is a Bounded DataStream with Dynamic Headers and Aggregated 
> value.
> DataStream headerStream = addHeaderRows(sinkStream);
> 
> DataStream footerStream = addFooterRows(finalStream);
> 
> DataStream sinkStream;
> 
> Thanks a lot for your time and the advice.
> Muazim Wani
> 
> 
> On Fri, 11 Aug 2023 at 07:45, Hang Ruan  > wrote:
>> ps: Forget the link: Hybrid Source[1]
>> 
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/
>> 
>> Hang Ruan mailto:ruanhang1...@gmail.com>> 
>> 于2023年8月11日周五 10:14写道:
>>> Hi, Muazim.
>>> 
>>> I think the Hybird Source[1] may be helpful for your case.
>>> 
>>> Best,
>>> Hang
>>> 
>>> Ken Krugler >> > 于2023年8月11日周五 04:18写道:
 As (almost) always, the devil is in the details.
 
 You haven’t said, but I’m assuming you’re writing out multiple files, each 
 with a different schema, as otherwise you could just leverage the existing 
 Flink support for CSV.
 
 So then you could combine the header/footer streams (adding a flag for 
 header vs. footer), and connect that to the row data stream, then use a 
 KeyedCoProcessFunction (I’m assuming you can key by something that 
 identifies which schema). You’d buffer the row data & footer (separately 
 in state). You would also need to set up a timer to fire at the max 
 watermark, to flush out pending rows/footer when all of the input data has 
 been processed.
 
 After that function you could configure the sink to bucket by the target 
 schema.
 
 — Ken
 
 
> On Aug 10, 2023, at 10:41 AM, Muazim Wani  > wrote:
> 
> Thanks for the response!
> 

Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Muazim Wani
Thank you so much for taking the time to provide me with such a detailed
response. Your assistance has been incredibly helpful in clarifying my
understanding!
Let me provide you with the exact scenario ,  I think there might be some
misunderstanding. All the streams are bounded and parallelism is set to 1.
I am  writing to 1 file only. *So the main use case Is to provide support
for dynamic Headers and footers with Aggregated values. *
e.g if my input is

Static Header
JohnDoe, 12
Alice, 21

My dynamic header is "Dynamic Header" and dynamic Footer is "Dynamic
footer". *These headers are on top of static headers which are already
present in the DataStream(bounded).* The output would be like

Dynamic Header 33
Static Header
JohnDoe, 12
Alice, 21
Dynamic footer

In this particular case I am writing to one file only. I have set my
parallelism to 1. I have 1 InputDataStream on top of that I have one
dynamic header and footer stream (*which contains some dynamic params such
as aggregated value on some fields e.g salary etc*) .*Now I am left with
three transformed streams in Sink Operator. i.e dynamic HeaderStream with
aggregated Value 2) input DataStream 3) dynamic Footer stream with
aggregated Value. *

I could have used String for both Dynamic Headers and footers and emitted
the headers in open() method and footers in close() method of
TextOutputFormat, That would have solved my useCase.* But as I get a
DataStream(with only 1 value i.e final sum) back from Aggregated Values
(Currently I am using reduce function).* I am adding headers to that
DataStream only and similarly for footers. Now I am not able to merge them
while maintaining the order.

Below I have provided my implementation of reduce function
public DataStream sum(
SumFunction reduceFunction, DataStream stream) {

DataStream inputRecordTransformedDataStream =
stream.map(this::transform).returns((TypeInformation) Types.GENERIC
(Number.class));

return inputRecordTransformedDataStream
.keyBy(value -> "key")
.reduce(reduceFunction);
}


Below I am adding my Headers to my Sum Stream

public static DataStream getAggregatedStream(
String header, DataStream sinkStream) {

return sinkStream
.keyBy(key -> "value")
.flatMap(
(FlatMapFunction)
(value, out) -> out.collect(PojoClass.builder().data(header +
value).build()))
.returns(PojoClass.class);
}

Add HeaderStream is a Bounded DataStream with Dynamic Headers and
Aggregated value.
DataStream headerStream = addHeaderRows(sinkStream);

DataStream footerStream = addFooterRows(finalStream);

DataStream sinkStream;

Thanks a lot for your time and the advice.
Muazim Wani


On Fri, 11 Aug 2023 at 07:45, Hang Ruan  wrote:

> ps: Forget the link: Hybrid Source[1]
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/
>
> Hang Ruan  于2023年8月11日周五 10:14写道:
>
>> Hi, Muazim.
>>
>> I think the Hybird Source[1] may be helpful for your case.
>>
>> Best,
>> Hang
>>
>> Ken Krugler  于2023年8月11日周五 04:18写道:
>>
>>> As (almost) always, the devil is in the details.
>>>
>>> You haven’t said, but I’m assuming you’re writing out multiple files,
>>> each with a different schema, as otherwise you could just leverage the
>>> existing Flink support for CSV.
>>>
>>> So then you could combine the header/footer streams (adding a flag for
>>> header vs. footer), and connect that to the row data stream, then use a
>>> KeyedCoProcessFunction (I’m assuming you can key by something that
>>> identifies which schema). You’d buffer the row data & footer (separately in
>>> state). You would also need to set up a timer to fire at the max watermark,
>>> to flush out pending rows/footer when all of the input data has been
>>> processed.
>>>
>>> After that function you could configure the sink to bucket by the target
>>> schema.
>>>
>>> — Ken
>>>
>>>
>>> On Aug 10, 2023, at 10:41 AM, Muazim Wani  wrote:
>>>
>>> Thanks for the response!
>>> I have a specific use case where I am writing to a TextFile sink. I have
>>> a Bounded stream of header data and need  to merge it with another bounded
>>> stream. While writing the data to a text file the header data should be
>>> written before the original data(from another bounded stream). And also at
>>> last I have another stream of footers where I would repeat the same process.
>>> I tried keeping an identifier for all three streams and based on these
>>> identifiers I added the data in three different ListState
>>> using KeyedProcess functions. So for headers I directly emitted the value
>>> but for main data and footers if I store it in a state . The issue is
>>> Outside KeyedProcess I am not able to emit the data in order.
>>> Is there any way I can achieve the use case of orderding the dataStreams
>>> . I also tried with union but it seems it adds data arbitrarily in both
>>> streams .
>>> Thanks and regards
>>>
>>> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, 
>>> wrote:
>>>
 Hi Muazim,

 In Flink, a stream of data (unless bounded) is assumed 

Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Hang Ruan
ps: Forget the link: Hybrid Source[1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/

Hang Ruan  于2023年8月11日周五 10:14写道:

> Hi, Muazim.
>
> I think the Hybird Source[1] may be helpful for your case.
>
> Best,
> Hang
>
> Ken Krugler  于2023年8月11日周五 04:18写道:
>
>> As (almost) always, the devil is in the details.
>>
>> You haven’t said, but I’m assuming you’re writing out multiple files,
>> each with a different schema, as otherwise you could just leverage the
>> existing Flink support for CSV.
>>
>> So then you could combine the header/footer streams (adding a flag for
>> header vs. footer), and connect that to the row data stream, then use a
>> KeyedCoProcessFunction (I’m assuming you can key by something that
>> identifies which schema). You’d buffer the row data & footer (separately in
>> state). You would also need to set up a timer to fire at the max watermark,
>> to flush out pending rows/footer when all of the input data has been
>> processed.
>>
>> After that function you could configure the sink to bucket by the target
>> schema.
>>
>> — Ken
>>
>>
>> On Aug 10, 2023, at 10:41 AM, Muazim Wani  wrote:
>>
>> Thanks for the response!
>> I have a specific use case where I am writing to a TextFile sink. I have
>> a Bounded stream of header data and need  to merge it with another bounded
>> stream. While writing the data to a text file the header data should be
>> written before the original data(from another bounded stream). And also at
>> last I have another stream of footers where I would repeat the same process.
>> I tried keeping an identifier for all three streams and based on these
>> identifiers I added the data in three different ListState
>> using KeyedProcess functions. So for headers I directly emitted the value
>> but for main data and footers if I store it in a state . The issue is
>> Outside KeyedProcess I am not able to emit the data in order.
>> Is there any way I can achieve the use case of orderding the dataStreams
>> . I also tried with union but it seems it adds data arbitrarily in both
>> streams .
>> Thanks and regards
>>
>> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, 
>> wrote:
>>
>>> Hi Muazim,
>>>
>>> In Flink, a stream of data (unless bounded) is assumed to never end.
>>>
>>> So in your example below, this means stream 2 would NEVER be emitted,
>>> because stream 1 would never end (there is no time at which you know for
>>> sure that stream 1 is done).
>>>
>>> And this in turn means stream 2 would be buffered forever in state, thus
>>> growing unbounded.
>>>
>>> I would suggest elaborating on your use case.
>>>
>>> Regards,
>>>
>>> — Ken
>>>
>>>
>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani  wrote:
>>>
>>> Hi Team,
>>> I have a use case where I have two streams and want to join them in
>>> stateful manner .
>>> E.g data of stream 1 should be emitted before stream2.
>>> I tried to store the data in ListState in KeyedProcessFunction but I am
>>> not able to access state  outside proccessElement().
>>> Is there any way I could achieve this?
>>> Thanks and regards
>>>
>>>
>>> --
>>> Ken Krugler
>>> http://www.scaleunlimited.com
>>> Custom big data solutions
>>> Flink, Pinot, Solr, Elasticsearch
>>>
>>>
>>>
>>>
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>


Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Hang Ruan
Hi, Muazim.

I think the Hybird Source[1] may be helpful for your case.

Best,
Hang

Ken Krugler  于2023年8月11日周五 04:18写道:

> As (almost) always, the devil is in the details.
>
> You haven’t said, but I’m assuming you’re writing out multiple files, each
> with a different schema, as otherwise you could just leverage the existing
> Flink support for CSV.
>
> So then you could combine the header/footer streams (adding a flag for
> header vs. footer), and connect that to the row data stream, then use a
> KeyedCoProcessFunction (I’m assuming you can key by something that
> identifies which schema). You’d buffer the row data & footer (separately in
> state). You would also need to set up a timer to fire at the max watermark,
> to flush out pending rows/footer when all of the input data has been
> processed.
>
> After that function you could configure the sink to bucket by the target
> schema.
>
> — Ken
>
>
> On Aug 10, 2023, at 10:41 AM, Muazim Wani  wrote:
>
> Thanks for the response!
> I have a specific use case where I am writing to a TextFile sink. I have a
> Bounded stream of header data and need  to merge it with another bounded
> stream. While writing the data to a text file the header data should be
> written before the original data(from another bounded stream). And also at
> last I have another stream of footers where I would repeat the same process.
> I tried keeping an identifier for all three streams and based on these
> identifiers I added the data in three different ListState
> using KeyedProcess functions. So for headers I directly emitted the value
> but for main data and footers if I store it in a state . The issue is
> Outside KeyedProcess I am not able to emit the data in order.
> Is there any way I can achieve the use case of orderding the dataStreams .
> I also tried with union but it seems it adds data arbitrarily in both
> streams .
> Thanks and regards
>
> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, 
> wrote:
>
>> Hi Muazim,
>>
>> In Flink, a stream of data (unless bounded) is assumed to never end.
>>
>> So in your example below, this means stream 2 would NEVER be emitted,
>> because stream 1 would never end (there is no time at which you know for
>> sure that stream 1 is done).
>>
>> And this in turn means stream 2 would be buffered forever in state, thus
>> growing unbounded.
>>
>> I would suggest elaborating on your use case.
>>
>> Regards,
>>
>> — Ken
>>
>>
>> On Aug 10, 2023, at 10:11 AM, Muazim Wani  wrote:
>>
>> Hi Team,
>> I have a use case where I have two streams and want to join them in
>> stateful manner .
>> E.g data of stream 1 should be emitted before stream2.
>> I tried to store the data in ListState in KeyedProcessFunction but I am
>> not able to access state  outside proccessElement().
>> Is there any way I could achieve this?
>> Thanks and regards
>>
>>
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>


Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Ken Krugler
As (almost) always, the devil is in the details.

You haven’t said, but I’m assuming you’re writing out multiple files, each with 
a different schema, as otherwise you could just leverage the existing Flink 
support for CSV.

So then you could combine the header/footer streams (adding a flag for header 
vs. footer), and connect that to the row data stream, then use a 
KeyedCoProcessFunction (I’m assuming you can key by something that identifies 
which schema). You’d buffer the row data & footer (separately in state). You 
would also need to set up a timer to fire at the max watermark, to flush out 
pending rows/footer when all of the input data has been processed.

After that function you could configure the sink to bucket by the target schema.

— Ken


> On Aug 10, 2023, at 10:41 AM, Muazim Wani  wrote:
> 
> Thanks for the response!
> I have a specific use case where I am writing to a TextFile sink. I have a 
> Bounded stream of header data and need  to merge it with another bounded 
> stream. While writing the data to a text file the header data should be 
> written before the original data(from another bounded stream). And also at 
> last I have another stream of footers where I would repeat the same process.
> I tried keeping an identifier for all three streams and based on these 
> identifiers I added the data in three different ListState using KeyedProcess 
> functions. So for headers I directly emitted the value but for main data and 
> footers if I store it in a state . The issue is Outside KeyedProcess I am not 
> able to emit the data in order.
> Is there any way I can achieve the use case of orderding the dataStreams . I 
> also tried with union but it seems it adds data arbitrarily in both streams .
> Thanks and regards
> 
> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler,  > wrote:
>> Hi Muazim,
>> 
>> In Flink, a stream of data (unless bounded) is assumed to never end.
>> 
>> So in your example below, this means stream 2 would NEVER be emitted, 
>> because stream 1 would never end (there is no time at which you know for 
>> sure that stream 1 is done).
>> 
>> And this in turn means stream 2 would be buffered forever in state, thus 
>> growing unbounded.
>> 
>> I would suggest elaborating on your use case.
>> 
>> Regards,
>> 
>> — Ken
>> 
>> 
>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani >> > wrote:
>>> 
>>> Hi Team,
>>> I have a use case where I have two streams and want to join them in 
>>> stateful manner . 
>>> E.g data of stream 1 should be emitted before stream2.
>>> I tried to store the data in ListState in KeyedProcessFunction but I am not 
>>> able to access state  outside proccessElement().
>>> Is there any way I could achieve this?
>>> Thanks and regards
>>> 
>> 
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com 
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>> 
>> 
>> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Muazim Wani
Thanks for the response!
I have a specific use case where I am writing to a TextFile sink. I have a
Bounded stream of header data and need  to merge it with another bounded
stream. While writing the data to a text file the header data should be
written before the original data(from another bounded stream). And also at
last I have another stream of footers where I would repeat the same process.
I tried keeping an identifier for all three streams and based on these
identifiers I added the data in three different ListState
using KeyedProcess functions. So for headers I directly emitted the value
but for main data and footers if I store it in a state . The issue is
Outside KeyedProcess I am not able to emit the data in order.
Is there any way I can achieve the use case of orderding the dataStreams .
I also tried with union but it seems it adds data arbitrarily in both
streams .
Thanks and regards

On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, 
wrote:

> Hi Muazim,
>
> In Flink, a stream of data (unless bounded) is assumed to never end.
>
> So in your example below, this means stream 2 would NEVER be emitted,
> because stream 1 would never end (there is no time at which you know for
> sure that stream 1 is done).
>
> And this in turn means stream 2 would be buffered forever in state, thus
> growing unbounded.
>
> I would suggest elaborating on your use case.
>
> Regards,
>
> — Ken
>
>
> On Aug 10, 2023, at 10:11 AM, Muazim Wani  wrote:
>
> Hi Team,
> I have a use case where I have two streams and want to join them in
> stateful manner .
> E.g data of stream 1 should be emitted before stream2.
> I tried to store the data in ListState in KeyedProcessFunction but I am
> not able to access state  outside proccessElement().
> Is there any way I could achieve this?
> Thanks and regards
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>


Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Ken Krugler
Hi Muazim,

In Flink, a stream of data (unless bounded) is assumed to never end.

So in your example below, this means stream 2 would NEVER be emitted, because 
stream 1 would never end (there is no time at which you know for sure that 
stream 1 is done).

And this in turn means stream 2 would be buffered forever in state, thus 
growing unbounded.

I would suggest elaborating on your use case.

Regards,

— Ken


> On Aug 10, 2023, at 10:11 AM, Muazim Wani  wrote:
> 
> Hi Team,
> I have a use case where I have two streams and want to join them in stateful 
> manner . 
> E.g data of stream 1 should be emitted before stream2.
> I tried to store the data in ListState in KeyedProcessFunction but I am not 
> able to access state  outside proccessElement().
> Is there any way I could achieve this?
> Thanks and regards
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch