Re: Need help using AggregateFunction instead of FoldFunction

2019-12-11 Thread Arvid Heise
Hi Devin,

for event-time based windows, you need to give Flink two types of
information:
- timestamp of records, which I assume is in your case already embedded
into the Pulsar records
- and a watermark assigner.

The watermarks help Flink to determine when windows can be closed in
respect to out-of-order and late events. This is highly usecase-specific
and cannot usually be inferred automatically. So you need to specify a
watermark assigner for event time windows to work. Pulsar offers a similar
API to Kafka, so that you can simply refer to the respective documentation
[1]. The other sections of this page give you a more general overview of
the options, which may be interesting for future use cases where you want
to aggregate event time-based records.

Best,

Arvid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition

On Tue, Dec 10, 2019 at 9:45 PM Devin Bost  wrote:

> I did confirm that I got no resulting output after 20 seconds and after
> sending additional data after waiting over a minute between batches of
> data.
>
> My code looks like this:
>
> PulsarSourceBuilder builder = PulsarSourceBuilder
>   .builder(new SimpleStringSchema())
>   .serviceUrl(SERVICE_URL)
>   .topic(INPUT_TOPIC)
>   .subscriptionName(SUBSCRIPTION_NAME);
> SourceFunction src = builder.build();
> DataStream dataStream = env.addSource(src);
>
> DataStream combinedEnvelopes = dataStream
>   .map(new MapFunction>() {
>  @Override
>  public Tuple2 map(String incomingMessage) throws Exception {
> return mapToTuple(incomingMessage);
>  }
>   })
>   .keyBy(0)
>   //.timeWindow(Time.seconds(5))
>   .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>   .aggregate(new JsonConcatenator());
> //dataStream.print();
>
> Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
> logger.info("Ran dataStream. Adding sink next");
> combinedEnvelopes.addSink(new FlinkPulsarProducer<>(
>   SERVICE_URL,
>   OUTPUT_TOPIC,
>   new AuthenticationDisabled(), // probably need to fix //  
> AuthenticationTls()
>   combinedData -> combinedData.toString().getBytes(UTF_8),
>   combinedData -> "test")
> );
> logger.info("Added sink. Executing job.");
> // execute program
> env.execute("Flink Streaming Java API Skeleton");
>
>
> Here is the JsonConcatenator class:
>
> private static class JsonConcatenator
>   implements AggregateFunction, Tuple2 String>, String> {
>Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
>@Override
>public Tuple2 createAccumulator() {
>   return new Tuple2("","");
>}
>
>@Override
>public Tuple2 add(Tuple2 value, 
> Tuple2 accumulator) {
>   logger.info("Running Add on value.f0: " + value.f0 + " and value.f1: " 
> + value.f1);
>   return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
>}
>
>@Override
>public String getResult(Tuple2 accumulator) {
>   logger.info("Running getResult on accumulator.f1: " + accumulator.f1);
>   return "[" + accumulator.f1.substring(1) + "]";
>}
>
>@Override
>public Tuple2 merge(Tuple2 a, 
> Tuple2 b) {
>   // Merge is applied when you allow lateness.
>   logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " + a.f1 + 
> " and b.f1: " + b.f1);
>   if(b.f1.charAt(0) == '['){
>  logger.info("During merge, we detected the right message starts with 
> the '[' character. Removing it.");
>  b.f1 = b.f1.substring(1);
>   }
>   return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
>}
> }
>
>
> Devin G. Bost
>
> Re:
>
> getResult will only be called when the window is triggered. For a
>> fixed-time window, it triggers at the end of the window.
>> However, for EventTimeSessionWindows you need to have gaps in the data.
>> Can you verify that there is actually a 20sec pause inbetween data points
>> for your keys?
>> Additionally, it may also be an issue with extracting the event time from
>> the sources. Could you post the relevant code as well?
>> Best,
>> Arvid
>
>
> On Tue, Dec 10, 2019 at 3:22 AM Arvid Heise  wrote:
>
>> getResult will only be called when the window is triggered. For a
>> fixed-time window, it triggers at the end of the window.
>>
>> However, for EventTimeSessionWindows you need to have gaps in the data.
>> Can you verify that there is actually a 20sec pause inbetween data points
>> for your keys?
>> Additionally, it may also be an issue with extracting the event time from
>> the sources. Could you post the relevant code as well?
>>
>> Best,
>>
>> Arvid
>>
>> On Mon, Dec 9, 2019 at 8:51 AM vino yang  wrote:
>>
>>> Hi dev,
>>>
>>> The time of the window may have different semantics.
>>> In the session window, it's only a time gap, the size of the window is
>>> driven via activity events.
>>> In the tumbling or sliding window, it means the size of the window.
>>>
>>> For more 

Re: Need help using AggregateFunction instead of FoldFunction

2019-12-10 Thread Devin Bost
I did confirm that I got no resulting output after 20 seconds and after
sending additional data after waiting over a minute between batches of
data.

My code looks like this:

PulsarSourceBuilder builder = PulsarSourceBuilder
  .builder(new SimpleStringSchema())
  .serviceUrl(SERVICE_URL)
  .topic(INPUT_TOPIC)
  .subscriptionName(SUBSCRIPTION_NAME);
SourceFunction src = builder.build();
DataStream dataStream = env.addSource(src);

DataStream combinedEnvelopes = dataStream
  .map(new MapFunction>() {
 @Override
 public Tuple2 map(String incomingMessage) throws Exception {
return mapToTuple(incomingMessage);
 }
  })
  .keyBy(0)
  //.timeWindow(Time.seconds(5))
  .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
  .aggregate(new JsonConcatenator());
//dataStream.print();

Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
logger.info("Ran dataStream. Adding sink next");
combinedEnvelopes.addSink(new FlinkPulsarProducer<>(
  SERVICE_URL,
  OUTPUT_TOPIC,
  new AuthenticationDisabled(), // probably need to fix //
AuthenticationTls()
  combinedData -> combinedData.toString().getBytes(UTF_8),
  combinedData -> "test")
);
logger.info("Added sink. Executing job.");
// execute program
env.execute("Flink Streaming Java API Skeleton");


Here is the JsonConcatenator class:

private static class JsonConcatenator
  implements AggregateFunction,
Tuple2, String> {
   Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
   @Override
   public Tuple2 createAccumulator() {
  return new Tuple2("","");
   }

   @Override
   public Tuple2 add(Tuple2 value,
Tuple2 accumulator) {
  logger.info("Running Add on value.f0: " + value.f0 + " and
value.f1: " + value.f1);
  return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
   }

   @Override
   public String getResult(Tuple2 accumulator) {
  logger.info("Running getResult on accumulator.f1: " + accumulator.f1);
  return "[" + accumulator.f1.substring(1) + "]";
   }

   @Override
   public Tuple2 merge(Tuple2 a,
Tuple2 b) {
  // Merge is applied when you allow lateness.
  logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
a.f1 + " and b.f1: " + b.f1);
  if(b.f1.charAt(0) == '['){
 logger.info("During merge, we detected the right message
starts with the '[' character. Removing it.");
 b.f1 = b.f1.substring(1);
  }
  return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
   }
}


Devin G. Bost

Re:

getResult will only be called when the window is triggered. For a
> fixed-time window, it triggers at the end of the window.
> However, for EventTimeSessionWindows you need to have gaps in the data.
> Can you verify that there is actually a 20sec pause inbetween data points
> for your keys?
> Additionally, it may also be an issue with extracting the event time from
> the sources. Could you post the relevant code as well?
> Best,
> Arvid


On Tue, Dec 10, 2019 at 3:22 AM Arvid Heise  wrote:

> getResult will only be called when the window is triggered. For a
> fixed-time window, it triggers at the end of the window.
>
> However, for EventTimeSessionWindows you need to have gaps in the data.
> Can you verify that there is actually a 20sec pause inbetween data points
> for your keys?
> Additionally, it may also be an issue with extracting the event time from
> the sources. Could you post the relevant code as well?
>
> Best,
>
> Arvid
>
> On Mon, Dec 9, 2019 at 8:51 AM vino yang  wrote:
>
>> Hi dev,
>>
>> The time of the window may have different semantics.
>> In the session window, it's only a time gap, the size of the window is
>> driven via activity events.
>> In the tumbling or sliding window, it means the size of the window.
>>
>> For more details, please see the official documentation.[1]
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows
>>
>>
>>
>> devinbost  于2019年12月6日周五 下午10:39写道:
>>
>>> I think there might be a bug in
>>> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
>>>  (unless I'm just not using it correctly) because I'm able to get output
>>> when I use the simpler window
>>> `.timeWindow(Time.seconds(5))`
>>> However, I don't get any output when I used the session-based window.
>>>
>>>
>>> devinbost wrote
>>> > I added logging statements everywhere in my code, and I'm able to see
>>> my
>>> > message reach the `add` method in the AggregateFunction that I
>>> > implemented,
>>> > but the getResult method is never called.
>>> >
>>> > In the code below, I also never see the:
>>> >  "Ran dataStream. Adding sink next"
>>> > line appear in my log, and the only log statements from the
>>> > JsonConcatenator
>>> > class come from the `add` method, as shown below.
>>> >
>>> >
>>> > DataStream
>>> > 
>>> >  combinedEnvelopes = dataStream
>>> > .map(new MapFunctionString, 

Re: Need help using AggregateFunction instead of FoldFunction

2019-12-10 Thread Arvid Heise
getResult will only be called when the window is triggered. For a
fixed-time window, it triggers at the end of the window.

However, for EventTimeSessionWindows you need to have gaps in the data. Can
you verify that there is actually a 20sec pause inbetween data points for
your keys?
Additionally, it may also be an issue with extracting the event time from
the sources. Could you post the relevant code as well?

Best,

Arvid

On Mon, Dec 9, 2019 at 8:51 AM vino yang  wrote:

> Hi dev,
>
> The time of the window may have different semantics.
> In the session window, it's only a time gap, the size of the window is
> driven via activity events.
> In the tumbling or sliding window, it means the size of the window.
>
> For more details, please see the official documentation.[1]
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows
>
>
>
> devinbost  于2019年12月6日周五 下午10:39写道:
>
>> I think there might be a bug in
>> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
>>  (unless I'm just not using it correctly) because I'm able to get output
>> when I use the simpler window
>> `.timeWindow(Time.seconds(5))`
>> However, I don't get any output when I used the session-based window.
>>
>>
>> devinbost wrote
>> > I added logging statements everywhere in my code, and I'm able to see my
>> > message reach the `add` method in the AggregateFunction that I
>> > implemented,
>> > but the getResult method is never called.
>> >
>> > In the code below, I also never see the:
>> >  "Ran dataStream. Adding sink next"
>> > line appear in my log, and the only log statements from the
>> > JsonConcatenator
>> > class come from the `add` method, as shown below.
>> >
>> >
>> > DataStream
>> > 
>> >  combinedEnvelopes = dataStream
>> > .map(new MapFunctionString, Tuple2lt;String, String>()
>> {
>> > @Override
>> > public Tuple2 map(String incomingMessage) throws Exception {
>> > return mapToTuple(incomingMessage);
>> > }
>> > })
>> > .keyBy(0)
>> > .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>> > .aggregate(new JsonConcatenator());
>> >
>> > Logger logger = LoggerFactory.getLogger(StreamJob.class);
>> > logger.info("Ran dataStream. Adding sink next")
>> >
>> > -
>> >
>> > private static class JsonConcatenator
>> > implements AggregateFunctionTuple2lt;String,
>> String,
>> > Tuple2String, String, String> {
>> > Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
>> > @Override
>> > public Tuple2String, String createAccumulator() {
>> > return new Tuple2String, String("","");
>> > }
>> >
>> > @Override
>> > public Tuple2String, String add(Tuple2String, String
>> > value,
>> > Tuple2String, String accumulator) {
>> > logger.info("Running Add on value.f0: " + value.f0 + " and
>> > value.f1:
>> > " + value.f1);
>> > return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
>> > }
>> >
>> > @Override
>> > public String getResult(Tuple2String, String accumulator) {
>> > logger.info("Running getResult on accumulator.f1: " +
>> > accumulator.f1);
>> > return "[" + accumulator.f1 + "]";
>> > }
>> >
>> > @Override
>> > public Tuple2String, String merge(Tuple2String,
>> String
>> > a,
>> > Tuple2String, String b) {
>> > logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
>> > a.f1
>> > + " and b.f1: " + b.f1);
>> > return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
>> > }
>> > }
>> >
>> >
>> >
>> >
>> > Any ideas?
>> >
>> >
>> > Chris Miller-2 wrote
>> >> I hit the same problem, as far as I can tell it should be fixed in
>> >> Pulsar 2.4.2. The release of this has already passed voting so I hope
>> it
>> >> should be available in a day or two.
>> >>
>> >> https://github.com/apache/pulsar/pull/5068
>> >
>> >
>> >
>> >
>> >
>> > --
>> > Sent from:
>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Need help using AggregateFunction instead of FoldFunction

2019-12-08 Thread vino yang
Hi dev,

The time of the window may have different semantics.
In the session window, it's only a time gap, the size of the window is
driven via activity events.
In the tumbling or sliding window, it means the size of the window.

For more details, please see the official documentation.[1]

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows



devinbost  于2019年12月6日周五 下午10:39写道:

> I think there might be a bug in
> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
>  (unless I'm just not using it correctly) because I'm able to get output
> when I use the simpler window
> `.timeWindow(Time.seconds(5))`
> However, I don't get any output when I used the session-based window.
>
>
> devinbost wrote
> > I added logging statements everywhere in my code, and I'm able to see my
> > message reach the `add` method in the AggregateFunction that I
> > implemented,
> > but the getResult method is never called.
> >
> > In the code below, I also never see the:
> >  "Ran dataStream. Adding sink next"
> > line appear in my log, and the only log statements from the
> > JsonConcatenator
> > class come from the `add` method, as shown below.
> >
> >
> > DataStream
> > 
> >  combinedEnvelopes = dataStream
> > .map(new MapFunctionString, Tuple2lt;String, String>() {
> > @Override
> > public Tuple2 map(String incomingMessage) throws Exception {
> > return mapToTuple(incomingMessage);
> > }
> > })
> > .keyBy(0)
> > .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
> > .aggregate(new JsonConcatenator());
> >
> > Logger logger = LoggerFactory.getLogger(StreamJob.class);
> > logger.info("Ran dataStream. Adding sink next")
> >
> > -
> >
> > private static class JsonConcatenator
> > implements AggregateFunctionTuple2lt;String, String,
> > Tuple2String, String, String> {
> > Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
> > @Override
> > public Tuple2String, String createAccumulator() {
> > return new Tuple2String, String("","");
> > }
> >
> > @Override
> > public Tuple2String, String add(Tuple2String, String
> > value,
> > Tuple2String, String accumulator) {
> > logger.info("Running Add on value.f0: " + value.f0 + " and
> > value.f1:
> > " + value.f1);
> > return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
> > }
> >
> > @Override
> > public String getResult(Tuple2String, String accumulator) {
> > logger.info("Running getResult on accumulator.f1: " +
> > accumulator.f1);
> > return "[" + accumulator.f1 + "]";
> > }
> >
> > @Override
> > public Tuple2String, String merge(Tuple2String,
> String
> > a,
> > Tuple2String, String b) {
> > logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
> > a.f1
> > + " and b.f1: " + b.f1);
> > return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
> > }
> > }
> >
> >
> >
> >
> > Any ideas?
> >
> >
> > Chris Miller-2 wrote
> >> I hit the same problem, as far as I can tell it should be fixed in
> >> Pulsar 2.4.2. The release of this has already passed voting so I hope
> it
> >> should be available in a day or two.
> >>
> >> https://github.com/apache/pulsar/pull/5068
> >
> >
> >
> >
> >
> > --
> > Sent from:
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Need help using AggregateFunction instead of FoldFunction

2019-12-06 Thread devinbost
I think there might be a bug in
`.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
 (unless I'm just not using it correctly) because I'm able to get output
when I use the simpler window 
`.timeWindow(Time.seconds(5))`
However, I don't get any output when I used the session-based window. 


devinbost wrote
> I added logging statements everywhere in my code, and I'm able to see my
> message reach the `add` method in the AggregateFunction that I
> implemented,
> but the getResult method is never called. 
> 
> In the code below, I also never see the:
>  "Ran dataStream. Adding sink next"
> line appear in my log, and the only log statements from the
> JsonConcatenator
> class come from the `add` method, as shown below. 
> 
> 
> DataStream
> 
>  combinedEnvelopes = dataStream
> .map(new MapFunctionString, Tuple2lt;String, String>() {
> @Override
> public Tuple2 map(String incomingMessage) throws Exception {
> return mapToTuple(incomingMessage);
> }
> })
> .keyBy(0)
> .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
> .aggregate(new JsonConcatenator());
> 
> Logger logger = LoggerFactory.getLogger(StreamJob.class);
> logger.info("Ran dataStream. Adding sink next")
> 
> -
> 
> private static class JsonConcatenator
> implements AggregateFunctionTuple2lt;String, String,
> Tuple2String, String, String> {
> Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
> @Override
> public Tuple2String, String createAccumulator() {
> return new Tuple2String, String("","");
> }
> 
> @Override
> public Tuple2String, String add(Tuple2String, String
> value,
> Tuple2String, String accumulator) {
> logger.info("Running Add on value.f0: " + value.f0 + " and
> value.f1:
> " + value.f1);
> return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
> }
> 
> @Override
> public String getResult(Tuple2String, String accumulator) {
> logger.info("Running getResult on accumulator.f1: " +
> accumulator.f1);
> return "[" + accumulator.f1 + "]";
> }
> 
> @Override
> public Tuple2String, String merge(Tuple2String, String
> a,
> Tuple2String, String b) {
> logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
> a.f1
> + " and b.f1: " + b.f1);
> return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
> }
> }
> 
> 
> 
> 
> Any ideas? 
> 
> 
> Chris Miller-2 wrote
>> I hit the same problem, as far as I can tell it should be fixed in 
>> Pulsar 2.4.2. The release of this has already passed voting so I hope it 
>> should be available in a day or two.
>> 
>> https://github.com/apache/pulsar/pull/5068
> 
> 
> 
> 
> 
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Need help using AggregateFunction instead of FoldFunction

2019-12-05 Thread devinbost
They released Pulsar 2.4.2, and I was able to pull its dependencies and
successfully submit the Flink job. 
It's able to receive messages from the Pulsar topic successfully. However, I
still don't think I'm using the AggregateFunction correctly.

I added logging statements everywhere in my code, and I'm able to see my
message reach the `add` method in the AggregateFunction that I implemented,
but the getResult method is never called. 

In the code below, I also never see the:
 "Ran dataStream. Adding sink next"
line appear in my log, and the only log statements from the JsonConcatenator
class come from the `add` method, as shown below. 



DataStream combinedEnvelopes = dataStream
.map(new MapFunction>() {
@Override
public Tuple2 map(String incomingMessage) throws Exception {
return mapToTuple(incomingMessage);
}
})
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
.aggregate(new JsonConcatenator());

Logger logger = LoggerFactory.getLogger(StreamJob.class);
logger.info("Ran dataStream. Adding sink next")

-

private static class JsonConcatenator
implements AggregateFunction,
Tuple2, String> {
Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
@Override
public Tuple2 createAccumulator() {
return new Tuple2("","");
}

@Override
public Tuple2 add(Tuple2 value,
Tuple2 accumulator) {
logger.info("Running Add on value.f0: " + value.f0 + " and value.f1:
" + value.f1);
return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
}

@Override
public String getResult(Tuple2 accumulator) {
logger.info("Running getResult on accumulator.f1: " +
accumulator.f1);
return "[" + accumulator.f1 + "]";
}

@Override
public Tuple2 merge(Tuple2 a,
Tuple2 b) {
logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " + a.f1
+ " and b.f1: " + b.f1);
return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
}
}




Any ideas? 


Chris Miller-2 wrote
> I hit the same problem, as far as I can tell it should be fixed in 
> Pulsar 2.4.2. The release of this has already passed voting so I hope it 
> should be available in a day or two.
> 
> https://github.com/apache/pulsar/pull/5068





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Need help using AggregateFunction instead of FoldFunction

2019-12-05 Thread Chris Miller
I hit the same problem, as far as I can tell it should be fixed in 
Pulsar 2.4.2. The release of this has already passed voting so I hope it 
should be available in a day or two.


https://github.com/apache/pulsar/pull/5068


-- Original Message --
From: "devinbost" 
To: user@flink.apache.org
Sent: 05/12/2019 04:35:05
Subject: Re: Need help using AggregateFunction instead of FoldFunction


It turns out that the exception that I was getting is actually related to
Pulsar since I'm using the Pulsar Flink connector. I found the exact issue
reported here: https://github.com/apache/pulsar/issues/4721


devinbost wrote

 I was able to make more progress (based on the documentation you
 provided),
 but now I'm getting this exception:

 org.apache.pulsar.client.impl.DefaultBatcherBuilder@3b5fad2d is not
 serializable. The object probably contains or references non serializable
 fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Need help using AggregateFunction instead of FoldFunction

2019-12-04 Thread devinbost
It turns out that the exception that I was getting is actually related to
Pulsar since I'm using the Pulsar Flink connector. I found the exact issue
reported here: https://github.com/apache/pulsar/issues/4721


devinbost wrote
> I was able to make more progress (based on the documentation you
> provided),
> but now I'm getting this exception:
> 
> org.apache.pulsar.client.impl.DefaultBatcherBuilder@3b5fad2d is not
> serializable. The object probably contains or references non serializable
> fields.
>   org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>   org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>   org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>   org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Need help using AggregateFunction instead of FoldFunction

2019-12-04 Thread devinbost
Thanks for the help.
I was able to make more progress (based on the documentation you provided),
but now I'm getting this exception:

org.apache.pulsar.client.impl.DefaultBatcherBuilder@3b5fad2d is not
serializable. The object probably contains or references non serializable
fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)

Here's my code now:

DataStream combinedEnvelopes = dataStream
.map(new MapFunction>() {
@Override
public Tuple2 map(String incomingMessage) throws Exception {
return mapToTuple(incomingMessage);
}
})
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
.aggregate(new JsonConcatenator())
.returns(String.class);

Here's the JsonConcatenator that I'm referencing above:

private static class JsonConcatenator 
implements AggregateFunction, Tuple2, String> {
@Override
public Tuple2 createAccumulator() {
return new Tuple2("","");
}

@Override
public Tuple2 add(Tuple2 value,
Tuple2 accumulator) {
return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
}

@Override
public String getResult(Tuple2 accumulator) {
return "[" + accumulator.f1 + "]";
}

@Override
public Tuple2 merge(Tuple2 a,
Tuple2 b) {
return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
}
}



vino yang wrote
> Hi devinbost,
> 
> Sharing two example links with you :
> 
> 
>- the example code of official documentation[1];
>- a StackOverflow answer of a similar question[2];
> 
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#aggregatefunction
> [2]:
> https://stackoverflow.com/questions/47123785/flink-how-to-convert-the-deprecated-fold-to-aggregrate
> 
> I hope these resources are helpful to you.
> 
> Best,
> Vino





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Need help using AggregateFunction instead of FoldFunction

2019-12-04 Thread vino yang
Hi devinbost,

Sharing two example links with you :


   - the example code of official documentation[1];
   - a StackOverflow answer of a similar question[2];

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#aggregatefunction
[2]:
https://stackoverflow.com/questions/47123785/flink-how-to-convert-the-deprecated-fold-to-aggregrate

I hope these resources are helpful to you.

Best,
Vino

devinbost  于2019年12月5日周四 上午9:38写道:

> Hi,
>
> In my use case, I am attempting to create a keyedStream (on a string) and
> then window that stream (which represents keyed JSON objects) with
> EventTimeSessionWindows (so that I have a separate window for each set of
> JSON messages, according to the key), and then concatenate the JSON objects
> by their keys. (e.g. If message1, message2, and message3 all have the same
> key, they should be concatenated to a JSON array like: [message1,message2,
> message3].)
>
> I think my code expresses my intent conceptually, but I learned that Fold
> is
> deprecated because it can't perform partial aggregations. Instead, I need
> to
> use the AggregateFunction, but I'm having trouble understanding the API
> documentation. How do I convert this code to an implementation that uses
> the
> AggregateFunction instead?
>
> DataStream combinedEnvelopes = dataStream
> .map(new MapFunction>() {
> @Override
> public Tuple2 map(String incomingMessage) throws Exception {
> return mapToTuple(incomingMessage);
> }
> })
> .keyBy(0)
> .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
> .fold("[", new FoldFunction, String>() {
> @Override
> public String fold(String concatenatedJsonArray, Tuple2
> incomingMessage) {
> return concatenatedJsonArray + ", " +
> incomingMessage.f1.toString();
> }
> })
> .map(new MapFunction() {
> @Override
> public String map(String jsonPartialArray) throws Exception {
> return jsonPartialArray + "]";
> }
> })
> .returns(String.class);
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Need help using AggregateFunction instead of FoldFunction

2019-12-04 Thread devinbost
Hi,

In my use case, I am attempting to create a keyedStream (on a string) and
then window that stream (which represents keyed JSON objects) with
EventTimeSessionWindows (so that I have a separate window for each set of
JSON messages, according to the key), and then concatenate the JSON objects
by their keys. (e.g. If message1, message2, and message3 all have the same
key, they should be concatenated to a JSON array like: [message1,message2,
message3].)

I think my code expresses my intent conceptually, but I learned that Fold is
deprecated because it can't perform partial aggregations. Instead, I need to
use the AggregateFunction, but I'm having trouble understanding the API
documentation. How do I convert this code to an implementation that uses the
AggregateFunction instead?

DataStream combinedEnvelopes = dataStream
.map(new MapFunction>() {
@Override
public Tuple2 map(String incomingMessage) throws Exception {
return mapToTuple(incomingMessage);
}
})
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
.fold("[", new FoldFunction, String>() {
@Override
public String fold(String concatenatedJsonArray, Tuple2
incomingMessage) {
return concatenatedJsonArray + ", " +
incomingMessage.f1.toString();
}
})
.map(new MapFunction() {
@Override
public String map(String jsonPartialArray) throws Exception {
return jsonPartialArray + "]";
}
})
.returns(String.class);




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/