Arvid,
I’m hoping to get your input on a process I’m working on. Originally I was
using a streaming solution but noticed that the data in the sliding windows
was getting too large over longer intervals and sometimes stopped
processing altogether. Anyway, the total counts should be a fixed number so
a batch process would be more acceptable.
The use case is this: Get counts on keys for 30 minutes of data, take those
totals and take a 30 second time slice on the same data, possibly
consecutive time slices, take the results and run it through one function:
Originally my code looked like this using Sliding Time Windows in streaming
mode:
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<FluentdMessage> stream = env
.addSource(getConsumer(properties))
.name("Kafka Source");
DataStream<Tuple2<String, Long>> keyedCounts = stream
.filter(value -> value.getGrokName() != null)
.map(new MapFunction<FluentdMessage, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(FluentdMessage
value) throws Exception {
return Tuple2.of(value.getGrokName(), 1L);
}
})
.keyBy(value -> value.f0)
.window(SlidingProcessingTimeWindows.of(Time.minutes(30),
Time.seconds(30)))
.trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
//.sum(2);
.reduce((ReduceFunction<Tuple2<String, Long>>) (data1,
data2) -> Tuple2.of(data1.f0, data1.f1 + data2.f1));
keyedCounts
.windowAll(SlidingProcessingTimeWindows.of(Time.minutes(30),
Time.seconds(30)))
.trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
.process(new ProcessAllWindowFunction<Tuple2<String,
Long>, Tuple5<String, Long, Long, String, Long>, TimeWindow>() {
private ValueState<Long> currentCount;
@Override
public void open(Configuration parameters) throws
Exception {
currentCount = getRuntimeContext().getState(
new ValueStateDescriptor<>("count",
Long.class));
}
@Override
public void process(Context context,
Iterable<Tuple2<String, Long>> iterable,
Collector<Tuple5<String, Long,
Long, String, Long>> out) throws Exception {
long count =
StreamSupport.stream(iterable.spliterator(), false).count();
if(currentCount.value() == null) {
currentCount.update(0L);
}
Iterator<Tuple2<String, Long>> iterator =
iterable.iterator();
Map<String, Long> map = new HashMap<>();
Map<String, List<Long>> keyTotalMap = new HashMap<>();
if(currentCount.value() < count) {
while (iterator.hasNext()) {
Tuple2<String, Long> tuple = iterator.next();
map.put(tuple.f0,
keyDifference(tuple.f0, iterable));
keyTotalMap.computeIfAbsent(tuple.f0,
k -> new ArrayList<>()).add(tuple.f1);
//out.collect(Tuple3.of(tuple.f0,
keyDifference(tuple.f0, iterable), sum(iterable)));
}
map.forEach((key, value) -> {
if(value > 0L) {
out.collect(Tuple5.of(
key,
value,
sum(key, keyTotalMap),
getChiSqrLoggerScore(value, sumKeys(map), sum(key, keyTotalMap),
sum(keyTotalMap)),
System.currentTimeMillis()));
}});
//out.collect(Tuple5.of(null, null, null,
null, null));
currentCount.update(count);
} else {
//This is currently the only way to force
the job to end
throw new InterruptedException();
}
}
})
.addSink(new RichChiLoggerInputSink())
.name("Postgres Sink");
//globalCounts.writeAsText("s3://argo-workflow-bucket/output.txt");
env.execute("Flink Kafka Chi Log Runner");
This does not work in batch mode. So I need some guidance. Thanks!
On Tue, Jan 5, 2021 at 11:29 AM Arvid Heise <[email protected]> wrote:
> Sorry Robert for not checking the complete example. New sources are used
> with fromSource instead of addSource. It's not ideal but hopefully we can
> remove the old way rather soonish to avoid confusion.
>
> On Tue, Jan 5, 2021 at 5:23 PM Robert Cullen <[email protected]>
> wrote:
>
>> Arvid,
>>
>> Thank you. Sorry, my last post was not clear. This line:
>>
>> env.addSource(source)
>>
>> does not compile since addSource is expecting a SourceFunction not a
>> KafkaSource type.
>>
>> On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise <[email protected]> wrote:
>>
>>> Robert,
>>>
>>> here I modified your example with some highlights.
>>>
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>
>>> KafkaSource<String> source = KafkaSource
>>> .<String>builder()
>>> .setBootstrapServers("kafka-headless:9092")
>>> .setTopics(Arrays.asList("log-input"))
>>>
>>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>>> .*setBounded*(OffsetsInitializer.latest())
>>> .build();
>>>
>>> env.addSource(source);
>>>
>>> You can also explicitely set but that shouldn't be necessary (and may
>>> make things more complicated once you also want to execute the application
>>> in streaming mode).
>>>
>>> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>>>
>>>
>>> On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <[email protected]>
>>> wrote:
>>>
>>>> Arvid,
>>>>
>>>> Thanks, Can you show me an example of how the source is tied to the
>>>> ExecutionEnivornment.
>>>>
>>>> final StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>>
>>>> KafkaSource<String> source = KafkaSource
>>>> .<String>builder()
>>>> .setBootstrapServers("kafka-headless:9092")
>>>> .setTopics(Arrays.asList("log-input"))
>>>>
>>>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>>>> .setUnbounded(OffsetsInitializer.latest())
>>>> .build();
>>>>
>>>> env.addSource(source);
>>>>
>>>>
>>>> On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <[email protected]> wrote:
>>>>
>>>>> Hi Robert,
>>>>>
>>>>> you basically just (re)write your application with DataStream API, use
>>>>> the new KafkaSource, and then let the automatic batch detection mode work
>>>>> [1].
>>>>> The most important part is that all your sources need to be bounded.
>>>>> Assuming that you just have a Kafka source, you need to use setBounded
>>>>> with the appropriate end offset/timestamp.
>>>>>
>>>>> Note that the rewritten Kafka source still has a couple of issues that
>>>>> should be addressed by the first bugfix release of 1.12 in this month. So
>>>>> while it's safe to use for development, I'd wait for 1.12.1 to roll it out
>>>>> on production.
>>>>>
>>>>> If you have specific questions on the migration from DataSet and
>>>>> DataStream, please let me know.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
>>>>>
>>>>> On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I have a Kafka source that I would like to run a batch job on. Since
>>>>>> Version 1.12.0 is now soft deprecating the DataSet API in favor of the
>>>>>> DataStream API, can someone show me an example of this? (Using
>>>>>> DataStream)
>>>>>>
>>>>>> thanks
>>>>>> --
>>>>>> Robert Cullen
>>>>>> 240-475-4490
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>
>>>>
>>>> --
>>>> Robert Cullen
>>>> 240-475-4490
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
--
Robert Cullen
240-475-4490