Thanks Lukaz & Aljoscha. Below is a little old link July 2015. Has there been 
any progress on out-of-core state?Seems like a pretty compelling supported 
in-memory state 
management.https://cwiki.apache.org/confluence/display/FLINK/Stateful+Stream+Processing

Is this specific to "Flink" (Runner)? Or Beam supports this regardless of the 
Runner? i.e. We can achieve the same in Spark Runner as well.
Thanks+regardsAmir-

      From: Aljoscha Krettek <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Wednesday, June 22, 2016 2:18 AM
 Subject: Re: Multi-threading implementation equivalence in Beam
   
Lukasz is of course correct in assuming that Flink does nothing to synchronize 
accesses to Redis (or any other external system, for that matter).
On Wed, 22 Jun 2016 at 01:15 Lukasz Cwik <[email protected]> wrote:

I can't imagine that Flink would synchronize writes to the Redis cluster in 
some way such that two competing writes don't impact each other but I would 
need to defer to Flink folks to answer that.
For example if you wrote this code inside your DoFns:processElement(...) {  
string value = read from redis  value += "a";  write to redis (value)}
and was processing 100,000 elements that value would be a string 100,000 
characters longs.For example, processing a bundle may fail and you could get a 
string greater than 100,000 characters long.Or you could process in parallel 
where both DoFn's read "aaa" and write back "aaaa" and then you missed 
appending an "a".
I could imagine that if you are very careful and use append/increment/check and 
set style operations you could maintain consistency but if a bundle fails those 
affects would be applied multiple times.


On Tue, Jun 21, 2016 at 12:38 PM, amir bahmanyari <[email protected]> wrote:

Thanks Lukas.I am executing my fat jar Beam app in a Flink Cluster (2-nodes for 
now).I assume the Job Manager<--->Task manager(s) provide visibility to the 
in-memory db contents to all (ParDo) processes running on both nodes executing 
separate DoFn at the same time.Therefore, the "shared data" are 
synchronized/locked while one node process is making changes to it.I use one 
instance of Redis for one set of data (accessed by both nodes DoFn processes) & 
a concurrentHashMap for another set of dataI assume FlinkCluster maintains the 
thread safety of Redis & concurrentHashMap objects.Is this the right 
assumption? .Thanks again.Amir-

      From: Lukasz Cwik <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Monday, June 20, 2016 4:10 PM
 Subject: Re: Multi-threading implementation equivalence in Beam
   
You are correct, an in memory database is outside the scope/knowledge of the 
runner and it will not be able to move any side effects over. For example, lets 
say your processing some elements X, Y, Z in one bundle on one machine and 
processing Y fails. The bundle may be retried on another machine where your 
changes to X may not exist. Or the bundle may be split such that X and Z is 
processed on one machine and Y on yet another machine.
If the reason for using an in-memory database is just caching and you can 
reload/recreate the cached entries than this should be fine, you'll just suffer 
cache misses elsewhere. 
If the reason is for caching previously seen elements which you will output 
later or write like side effects than this can disappear if the bundle 
processing is moved to another machine.
Its not that in memory databases can't be used, they just can't be relied on 
persistent state.

On Mon, Jun 20, 2016 at 3:36 PM, amir bahmanyari <[email protected]> wrote:

Wonderful. Thanks Lukaz.Have one question. The statement in that page "In 
addition, your DoFn should not rely on any persistent state from invocation to 
invocation.".I am using in-memory db such as Redis or Aerospike for 
intermediate look ups etc.Is this what the above statement referring to: dont 
use in-memory dbs?Thanks again.

      From: Lukasz Cwik <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Monday, June 20, 2016 1:08 PM
 Subject: Re: Multi-threading implementation equivalence in Beam
   
Threading/parallelism is up to the runner and does not map 1-1 to the java 
memory model since most runners will execute in a distributed manner. In 
general, runners will attempt to break up the work as evenly as possible and 
schedule work across multiple machines / cpu cores at all times to maximize the 
throughput / minimize time for execution of the job This is abstracted away 
much by getting users to write DoFns that apply with ParDo. Please take a look 
at this explanation about ParDo 
(https://cloud.google.com/dataflow/model/par-do) to get a better understanding 
of its usage and as a place to look at some examples.
On Mon, Jun 20, 2016 at 12:44 PM, amir bahmanyari <[email protected]> wrote:

Thanks JB.I am executing FlinkPipelineRunner...& later will experirnt the same 
with SparkRunner....any examples pls?Cheers

      From: Jean-Baptiste Onofré <[email protected]>
 To: [email protected] 
 Sent: Monday, June 20, 2016 12:35 PM
 Subject: Re: Multi-threading implementation equivalence in Beam
  
Hi Amir,

the DirectPipelineRunner uses multi-thread to achieve ParDo execution 
for instance.

You mean example of Beam pipeline code ? Or example of runner ?

Regards
JB

On 06/20/2016 09:25 PM, amir bahmanyari wrote:
> Hi Colleagues,
> Hope you all had a great weekend. Another novice question :-)
> Is there a pipeline parallelism/threading model provide by Beam that
> equates the multi-threading model in Java for instance?
> Any examples if so?
> Thanks again,
> Amir

-- 
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com


   



   



   




  

Reply via email to