On thing to add: the Flink KafkaProducer provides only at-least-once if
flush-on-checkpoint is enabled [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.html#setFlushOnCheckpoint-boolean-

2017-01-13 22:02 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Kat,
>
> I did not understand the difference between a case and a trace.
> If I got it right, the goal of your first job is to assemble the
> individual events into cases. Is a case here the last event for a case-id
> or all events of a case-id?
> If a case is the collection of all events (which I assume) what is the
> difference to a trace which is also the list of events (if I got it right)?
>
> In any case, I think your first job can also be solved without a session
> window (which is quite complex internally).
> There are two options:
> 1) use a global window [1] with a custom trigger that triggers for each
> arriving record. A global window does never end, which would be OK since
> your cases do not end as well.
> 2) use a MapFunction with key-partitioned operator state [2]. The map
> function would simply update the state for every new event and emit a new
> result.
>
> Regarding your concerns of losing data when writing to Kafka. Flink's
> KafkaProducer provides at-least-once guarantees, which means that data
> might be written more than once in case of a failure but won't be lost. If
> the Kafka topic is partitioned by case-id and you only need the last record
> per case-id, Kafka's log compaction should give you upsert semantics.
>
> Regarding your question "Is using state in this way a somewhat standard
> practice, or is state intended more for recovery?":
> Many streaming applications require state for their semantics (just like
> yours), i.e., they need to buffer data and wait for more data to arrive. In
> order to guarantee consistent result semantics of an application, the state
> must not be lost and be recovered in case of a failure. So state is not
> intended for recovery, but recovery is needed to guarantee application
> semantics.
>
> As I said before, I did not get the difference between cases and trace, so
> I cannot really comment on the job to analyze traces.
>
> Hope this helps,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/windows.html#global-windows
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface
>
> 2017-01-13 11:04 GMT+01:00 Kathleen Sharp <kathleen.sh...@signavio.com>:
>
>> I have been playing around with Flink for a few weeks to try to
>> ascertain whether or not it meets our use cases, and also what best
>> practices we should be following. I have a few questions I would
>> appreciate answers to.
>>
>>
>> Our scenario is that we want to process a lot of event data into
>> cases. A case is an inorder sequence of events; this event data could
>> be quite old. We never know when a case is complete, so we just want
>> to have the most up to date picture of what a case looks like.
>>
>>
>> The inorder sequence of events of a case is called the trace. Many
>> cases could have an identical trace. We would like to construct these
>> traces, and do some aggregations on those (case count, average/min/max
>> life-cycle time).
>>
>>
>> We then have further downstream processing we will do on a case, some
>> of which would require additional inputs, either from side-inputs of
>> somehow joining data sources.
>>
>>
>> We don’t really care about event time at the moment, because we just
>> want to build cases and traces with all the data we have received.
>>
>>
>> The end results should be available for our web front end via rest api.
>>
>>
>> Based on the above I have the following idea for a first implementation:
>>
>>
>> Kafka source -> key by case id -> session window with rocks db state
>> backend holding case for that key -> postgres sink
>>
>>
>> The reason for a session window is that, as I mentioned above, we just
>> want to build a group with all the data we have received into kafka up
>> until that point in time. We would experiment with what this gap time
>> should be, and in future it might be specific to the type of group,
>> but for the start a naive approach is acceptable. I think this could
>> be better than just doing it, say, every 10 minutes because we really
>> don’t know yet the frequency of the data received. Also, some inputs
>> to kafka come directly from a CSV upload, so we will get “firehose”
>> periods, and periods of nothing.
>>
>> In short: I think what we have closely matches session behaviour.
>>
>>
>> We also have to implement a postgres sink that is capable of doing
>> upserts. The reason for postgres is to service the rest front end.
>>
>>
>> We then have to build our traces and can see two options for it:
>>
>>
>> 1) The most obvious solution would be to use a kafka sink for the
>> keyed case stream, and to do the trace aggregations in a downstream
>> flink job with this kafka topic as a source. However, I have some
>> concerns over losing any data (i.e. how do we know whether or not an
>> event has been successfully pushed into the kafka stream).
>>
>>
>> 2) Another approach might be to use some other type of sink (perhaps
>> postgres), and to use this as a source for the traces job. This would
>> help us guarantee data consistency.
>>
>>
>> 3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?),
>> so:
>>
>> Keyed cases stream -> broadcast -> key by tracehash with rocks db
>> state backend holding trace for that tracehash -> perform
>> aggregrations -> postgres sink
>>
>> Is broadcast an option here? How costly is it?
>>
>>
>> Which of these approaches (or any other), would you recommend?
>>
>>
>> -------------------------------------
>>
>> Another question regarding the state:
>>
>> As we never know when a case is complete this means that the rocksdb
>> backend could grow infinitely (!). Obviously we would need to get a
>> bit smarter here.
>>
>>
>> Is using state in this way a somewhat standard practice, or is state
>> intended more for recovery?
>>
>> Managing growing state: I found some discussion regarding how to clear
>> state here http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-
>> td10391.html#a10402
>> which references https://issues.apache.org/jira/browse/FLINK-3946
>>
>> Thanks,
>>
>> Kat
>>
>
>

Reply via email to