Lukasz is of course correct in assuming that Flink does nothing to
synchronize accesses to Redis (or any other external system, for that
matter).

On Wed, 22 Jun 2016 at 01:15 Lukasz Cwik <[email protected]> wrote:

> I can't imagine that Flink would synchronize writes to the Redis cluster
> in some way such that two competing writes don't impact each other but I
> would need to defer to Flink folks to answer that.
>
> For example if you wrote this code inside your DoFns:
> processElement(...) {
>   string value = read from redis
>   value += "a";
>   write to redis (value)
> }
>
> and was processing 100,000 elements that value would be a string 100,000
> characters longs.
> For example, processing a bundle may fail and you could get a string
> greater than 100,000 characters long.
> Or you could process in parallel where both DoFn's read "aaa" and write
> back "aaaa" and then you missed appending an "a".
>
> I could imagine that if you are very careful and use
> append/increment/check and set style operations you could maintain
> consistency but if a bundle fails those affects would be applied multiple
> times.
>
>
>
> On Tue, Jun 21, 2016 at 12:38 PM, amir bahmanyari <[email protected]>
> wrote:
>
>> Thanks Lukas.
>> I am executing my fat jar Beam app in a Flink Cluster (2-nodes for now).
>> I assume the Job Manager<--->Task manager(s) provide visibility to the
>> in-memory db contents to all (ParDo) processes running on both nodes
>> executing separate DoFn at the same time.
>> Therefore, the "shared data" are synchronized/locked while one node
>> process is making changes to it.
>> I use one instance of Redis for one set of data (accessed by both nodes
>> DoFn processes) & a concurrentHashMap for another set of data
>> I assume FlinkCluster maintains the thread safety of Redis &
>> concurrentHashMap objects.
>> Is this the right assumption? .
>> Thanks again.
>> Amir-
>>
>>
>> ------------------------------
>> *From:* Lukasz Cwik <[email protected]>
>> *To:* [email protected]; amir bahmanyari <
>> [email protected]>
>> *Sent:* Monday, June 20, 2016 4:10 PM
>>
>> *Subject:* Re: Multi-threading implementation equivalence in Beam
>>
>> You are correct, an in memory database is outside the scope/knowledge of
>> the runner and it will not be able to move any side effects over. For
>> example, lets say your processing some elements X, Y, Z in one bundle on
>> one machine and processing Y fails. The bundle may be retried on another
>> machine where your changes to X may not exist. Or the bundle may be split
>> such that X and Z is processed on one machine and Y on yet another machine.
>>
>> If the reason for using an in-memory database is just caching and you can
>> reload/recreate the cached entries than this should be fine, you'll just
>> suffer cache misses elsewhere.
>> If the reason is for caching previously seen elements which you will
>> output later or write like side effects than this can disappear if the
>> bundle processing is moved to another machine.
>>
>> Its not that in memory databases can't be used, they just can't be relied
>> on persistent state.
>>
>> On Mon, Jun 20, 2016 at 3:36 PM, amir bahmanyari <[email protected]>
>> wrote:
>>
>> Wonderful. Thanks Lukaz.
>> Have one question. The statement in that page "In addition, *your DoFn
>> should not rely on any persistent state from invocation to invocation.*".
>> I am using in-memory db such as Redis or Aerospike for intermediate look
>> ups etc.
>> Is this what the above statement referring to: dont use in-memory dbs?
>> Thanks again.
>>
>>
>> ------------------------------
>> *From:* Lukasz Cwik <[email protected]>
>> *To:* [email protected]; amir bahmanyari <
>> [email protected]>
>> *Sent:* Monday, June 20, 2016 1:08 PM
>>
>> *Subject:* Re: Multi-threading implementation equivalence in Beam
>>
>> Threading/parallelism is up to the runner and does not map 1-1 to the
>> java memory model since most runners will execute in a distributed manner.
>> In general, runners will attempt to break up the work as evenly as possible
>> and schedule work across multiple machines / cpu cores at all times to
>> maximize the throughput / minimize time for execution of the job This is
>> abstracted away much by getting users to write DoFns that apply with ParDo.
>> Please take a look at this explanation about ParDo (
>> https://cloud.google.com/dataflow/model/par-do) to get a better
>> understanding of its usage and as a place to look at some examples.
>>
>> On Mon, Jun 20, 2016 at 12:44 PM, amir bahmanyari <[email protected]>
>> wrote:
>>
>> Thanks JB.
>> I am executing FlinkPipelineRunner...& later will experirnt the same with
>> SparkRunner....any examples pls?
>> Cheers
>>
>> ------------------------------
>> *From:* Jean-Baptiste Onofré <[email protected]>
>> *To:* [email protected]
>> *Sent:* Monday, June 20, 2016 12:35 PM
>> *Subject:* Re: Multi-threading implementation equivalence in Beam
>>
>> Hi Amir,
>>
>> the DirectPipelineRunner uses multi-thread to achieve ParDo execution
>> for instance.
>>
>> You mean example of Beam pipeline code ? Or example of runner ?
>>
>> Regards
>> JB
>>
>> On 06/20/2016 09:25 PM, amir bahmanyari wrote:
>> > Hi Colleagues,
>> > Hope you all had a great weekend. Another novice question :-)
>> > Is there a pipeline parallelism/threading model provide by Beam that
>> > equates the multi-threading model in Java for instance?
>> > Any examples if so?
>> > Thanks again,
>> > Amir
>>
>>
>> --
>> Jean-Baptiste Onofré
>> [email protected]
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>

Reply via email to