Re: Iterations vs. combo source/sink

2016-09-30 Thread Ken Krugler
Hi Fabian,

Thanks for responding. Comments and questions inline below.

Regards,

— Ken


> On Sep 29, 2016, at 6:10am, Fabian Hueske  wrote:
> 
> Hi Ken,
> 
> you can certainly have partitioned sources and sinks. You can control the 
> parallelism by calling .setParallelism() method.

So I assume I’d implement the ParallelSourceFunction interface.

> If you need a partitioned sink, you can call .keyBy() to hash partition.
> 
> I did not completely understand the requirements of your program. Can you 
> maybe provide pseudo code for how the program should look like.

Just for grins, I’m looking at re-implementing the Bixo web crawler (built on 
top of Cascading/Hadoop MR) as a continuous crawler on top of Flink.

The main issue is the “crawl DB” that has to maintain the state of every URL 
ever seen, and also provide a fast way to generate the “best” URLs to be 
fetched. The logic of figuring out the best URL is complex, depending on 
factors like the anticipated value of the page, refetch rates for pages that 
have already been seen, number of unique URLs per domain vs. the domain “rank”, 
etc.

And it has to scale to something like 30B+ URLs with a small (e.g. 10 
moderately big servers) cluster, so it needs to be very efficient in terms of 
memory/CPU usage.

An additional goal is to not require additional external infrastructure. That 
simplifies the operational overhead of running a continuous crawl.

So this “crawl DB” has to act as both a source (of the best URLs to fetch) and 
as a sink (for updates to fetched URLs, and as new URLs are 
discovered/injected). The state is a mix of in-memory and spilled to disk data.

Given what you mention below about iterative data flows not being fault 
tolerant, it seems like a combo source/sink (if possible) would be best.

Any guidance as to how to implement such a thing? I don’t know enough yet about 
Flink to determine if I can essentially have one task that’s acting as both the 
source & sink.

> Some general comments:
> - Flink's fault tolerance mechanism does not work with iterative data flows 
> yet. This is work in progress see: FLINK-3257 [1]

OK, good to know.

> - Flink's fault tolerance mechanism does only work if you expose all! 
> internal operator state. So you would need to put your Java DB in Flink state 
> to have a recoverable job.

Yes.

> - Is the DB essential in your application? Could you use Flink's 
> key-partitioned state interface instead? That would help to make your job 
> fault-tolerant.

Yes, as per above.


> [1] https://issues.apache.org/jira/browse/FLINK-3257 
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface
>  
> 
> 
> 2016-09-29 1:15 GMT+02:00 Ken Krugler  >:
> Hi all,
> 
> I’ve got a very specialized DB (runs in the JVM) that I need to use to both 
> keep track of state and generate new records to be processed by my Flink 
> streaming workflow. Some of the workflow results are updates to be applied to 
> the DB.
> 
> And the DB needs to be partitioned.
> 
> My initial approach is to wrap it in a regular operator, and have subsequent 
> streams be inputs for updating state. So now I’ve got an IterativeDataStream, 
> which should work.
> 
> But I imagine I could also wrap this DB in a source and a sink, yes? Though 
> I’m not sure how I could partition it as a source, in that case.
> 
> If it is feasible to have a partitioned source/sink, are there general 
> pros/cons to either approach?
> 
> Thanks,
> 
> — Ken
> 
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Re: How can I prove ....

2016-09-30 Thread amir bahmanyari
Hi Stephan,This is from the dashboard. Total Parallelism is set = 1024.259 
tasks per TM. all say Running, but I get *.out log in beam4 server only (bottom 
of the servers list).Does this mean that all nodes are engaged in processing 
the data?Why are these encircled columns having 0's for their data exchange 
report?Thanks+regards,Amir-




  From: Stephan Ewen 
 To: user@flink.apache.org; amir bahmanyari  
Cc: Felix Dreissig 
 Sent: Monday, September 26, 2016 2:18 AM
 Subject: Re: How can I prove 
   
You do not need to create any JSON.
Just click on "Running Jobs" in the UI, and then on the job. The parallelism is 
shown as a number in the boxes of the graph.
On Sat, Sep 24, 2016 at 6:28 PM, amir bahmanyari  wrote:

Thanks Felix.Interesting. I tried to create the JASON but didnt work  according 
to the sample code I found in docs.There is a way to get the same JASON from 
the command line.Is there an example?Thanks+regardsAmir-

  From: Felix Dreissig 
 To: amir bahmanyari  
Cc: user@flink.apache.org
 Sent: Saturday, September 24, 2016 8:18 AM
 Subject: Re: How can I prove 
   
Hi Amir,

On 23 Sep 2016, at 19:57, amir bahmanyari  wrote:
> Currently running with 512 slots all taken as indicated by the dashboard.
> Are we talking about this? Then yes based on no available slots, I assume I 
> am at 512 .

I guess Stephan is referring to the parallelism of single operators as 
displayed in the operator graph, see e.g. https://ci.apache.org/ 
projects/flink/flink-docs- release-0.10/page/img/ webclient_plan_view.png .

Regards,
Felix

   



   

Re: Using Flink and Cassandra with Scala

2016-09-30 Thread Stephan Ewen
How hard would it be to add case class support?

Internally, tuples and case classes are treated quite similar, so I think
it could be a quite simple extension...

On Fri, Sep 30, 2016 at 4:22 PM, Sanne de Roever 
wrote:

> Thanks Chesnay. Have a good weekend.
>
> On Thu, Sep 29, 2016 at 5:03 PM, Chesnay Schepler 
> wrote:
>
>> the cassandra sink only supports java tuples and POJO's.
>>
>>
>> On 29.09.2016 16:33, Sanne de Roever wrote:
>>
>>> Hi,
>>>
>>> Does the Cassandra sink support Scala and case classes? It looks like
>>> using Java is at the moment best practice.
>>>
>>> Cheers,
>>>
>>> Sanne
>>>
>>
>>
>


SVM classification problem.

2016-09-30 Thread Kürşat Kurt
Hi;

 

I am trying to train and predict with the same set. I expect that accuracy
shuld be %100, am i wrong?

If i try to predict with the same set; it is failing, also it classifies
like "-1" which is not in the training set.

What is wrong with this code?

 

Code:

def main(args: Array[String]): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment

val training = Seq(

  new LabeledVector(1.0, new SparseVector(10, Array(0, 2, 3), Array(1.0,
1.0, 1.0))),

  new LabeledVector(1.0, new SparseVector(10, Array(0, 1, 5, 9),
Array(1.0, 1.0, 1.0, 1.0))),

  new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0,
1.0))),

  new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))),

  new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0,
1.0))),

  new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0

 

val trainingDS = env.fromCollection(training)

val testingDS = env.fromCollection(training)

val svm = new SVM().setBlocks(env.getParallelism)

svm.fit(trainingDS)

val predictions = svm.evaluate(testingDS.map(x => (x.vector, x.label)))

predictions.print()



  }

 

Output:

(1.0,1.0)

(1.0,1.0)

(0.0,1.0)

(0.0,-1.0)

(0.0,1.0)

(0.0,-1.0)



Re: Error while adding data to RocksDB: No more bytes left

2016-09-30 Thread Shannon Carey
Implementing a custom serialization approach with Flink's CopyableValue 
(instead of relying on Flink to automatically use Kryo) solved the issue. As a 
side benefit, this also reduced the serialized size of my object by about half.


From: Stephan Ewen >
Date: Friday, September 30, 2016 at 3:58 AM
To: >
Cc: Stephan Ewen >
Subject: Re: Error while adding data to RocksDB: No more bytes left

Agree with Stefan, let's see if the fully async snapshot mode helps. It looks 
suspiciously RocksDB related...

On Fri, Sep 30, 2016 at 10:30 AM, Stefan Richter 
> wrote:
Hi Shannon,

from your new stack trace and the bogus class names, I agree with Stephan that 
either serialization or the database itself is corrupted in some way. Could you 
please check if this problem only happens if checkpointing is enabled? If yes, 
does switching to fully async snapshots change the behavior?

Best,
Stefan

Am 29.09.2016 um 21:12 schrieb Shannon Carey 
>:

Hi Stephan!

The failure appeared to occur every 10 minutes, which is also the interval for 
checkpointing. However, I agree with you that the stack trace appears to be 
independent. Could this perhaps be an issue with multithreading, where the 
checkpoint mechanism is somehow interfering with ongoing operation of the state 
backend? I've never seen this problem until now, so I am a little suspicious 
that it might be due to something in my code, but so far it's been difficult to 
figure out what that might be.

I am using the default, SemiAsync snapshot mode.

The classes of the data flow are a bit too large to put here in their entirety. 
We are using Scala case classes, Java classes generated by Avro, Tuples, Scala 
Option, java.util.UUID and Scala mutable.Map. The majority of these classes 
have been operational in other jobs before. I added a unit test for the class 
which contains a mutable.Map to see whether that was causing a problem. Does 
this look like a reasonable unit test to verify Flink serializability to you?

it("roundtrip serializes in Flink") {
  val millis: Long = TimeUnit.DAYS.toMillis(2)
  val original: PreferredAirportDailySum = new PreferredAirportDailySum(millis)
  original.add("a", TimestampedAirportCount(4, 6))
  original.add("b", TimestampedAirportCount(7, 8))

  val deserialized: PreferredAirportDailySum = serializationRoundTrip(original, 
100)

  deserialized.timestamp shouldBe millis
  deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6)
  deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8)
}

def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, 
expectedMaxBytes: Int): T = {
  val typeInfo = implicitly[TypeInformation[T]]
  val serializer: TypeSerializer[T] = typeInfo.createSerializer(new 
ExecutionConfig)

  val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes)
  val outputView: DataOutputView = new DataOutputViewStreamWrapper(out)
  serializer.serialize(original, outputView)

  out.size() should be <= expectedMaxBytes

  val inputView: DataInputViewStreamWrapper =
new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))
  val deserialized: T = serializer.deserialize(inputView)

  deserialized
}

I tried running my job in a local one-slot cluster with RocksDB enabled but 
checkpointing to local filesystem. Similar errors occur, but are more sporadic. 
I have not yet been able to capture the error while debugging, but if I do I 
will provide additional information.

I noticed that locally, execution only reaches 
DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint 
completes. Also, the timing of checkpointing a bit odd: in the example below 
the checkpoint takes 200s to complete after being triggered even though RocksDB 
reports that it only took ~100ms.

2016-09-29 12:56:17,619 INFO  CheckpointCoordinator - Triggering checkpoint 
2 @ 1475171777619
2016-09-29 12:59:38,079 INFO  RocksDBStateBackend  - RocksDB 
(/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db)
 backup (synchronous part) took 7 ms.
2016-09-29 12:59:38,214 INFO  RocksDBStateBackend  - RocksDB materialization 
from 
/var/folders/…/WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2
 to file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2 (asynchronous 
part) took 96 ms.
2016-09-29 12:59:38,333 INFO  CheckpointCoordinator - Completed checkpoint 
2 (in 200621 ms)

Do you have any other advice?

Exceptions from local execution:

java.lang.RuntimeException: Error while adding data to RocksDB
at 
org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125)
at 

Re: Exceptions from collector.collect after cancelling job

2016-09-30 Thread Shannon Carey
My flat map function is catching & logging the exception. The try block happens 
to encompass the call to Collector#collect().

I will move the call to collect outside of the try. That should silence the log 
message.




On 9/30/16, 3:51 AM, "Ufuk Celebi"  wrote:

>On Thu, Sep 29, 2016 at 9:29 PM, Shannon Carey  wrote:
>> It looks like Flink is disabling the objects that the FlatMap collector
>> relies on before disabling the operator itself. Is that expected/normal? Is
>> there anything I should change in my FlatMap function or job code to account
>> for it?
>
>Hey Shannon,
>
>Flink actually does cancel the tasks *before* cleaning up the network
>resources that throw the root Exception here.
>
>We actually don't log any Exceptions that are thrown during
>cancellation, because it is possible that the user code/operator use
>the closed resources concurrently with cancellation (your stack traces
>essentially), but it looks like in some places we don't respect this.
>
>Can you tell which classes actually log this? Would be good to fix
>this if possible as it is very confusing and looks quite bad. I don't
>expect it to be an actual problem though.
>
>– Ufuk
>


Re: Controlling savepoints from inside an application

2016-09-30 Thread Astrac
Thanks for the answer,

the changes in the FLIP are quite interesting, are they coming in 1.2?

What I mean by "manually reading the savepoint" is that rather than
providing the savepoint path via "the --fromSavepoint
hdfs://some-path/to/savepoint" option I'd like to provide it in the code
that initialises the StreamExecutionEnvironment. This way I can use my
versioning strategy to load a savepoint that is compatible with the current
version of the application (or none if this is a new version of the state,
effectively rebuilding everything from Kafka).

On the other side, i.e. writing the savepoint somewhere, at the moment I
would be happy with triggering savepoints via CLI if it was possible to
configure the path where they are stored via the initialisation code where
we build the StreamExecutionEnvironment rather than via flink-conf.yml;
since I don't see mention of this in the FLIP, is this something you would
be happy to add as well?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Controlling-savepoints-from-inside-an-application-tp9273p9276.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Using Flink and Cassandra with Scala

2016-09-30 Thread Sanne de Roever
Thanks Chesnay. Have a good weekend.

On Thu, Sep 29, 2016 at 5:03 PM, Chesnay Schepler 
wrote:

> the cassandra sink only supports java tuples and POJO's.
>
>
> On 29.09.2016 16:33, Sanne de Roever wrote:
>
>> Hi,
>>
>> Does the Cassandra sink support Scala and case classes? It looks like
>> using Java is at the moment best practice.
>>
>> Cheers,
>>
>> Sanne
>>
>
>


Re: Controlling savepoints from inside an application

2016-09-30 Thread Ufuk Celebi
Hey Aldo,

On Fri, Sep 30, 2016 at 3:17 PM, Astrac  wrote:
> * Configure the savepoint path while we build the StreamExecutionEnvironment
> rather than in flink-conf.yml
> * Manually read a savepoint rather than passing it via the CLI

what you describe is not possible right now, but I'm working on
unifying savepoints and checkpoints [1]. With the upcoming changes for
this, it will be possible to provide the paths in the environment.
What do you mean with manually reading a savepoint?

I would really appreciate some feedback on the FLIP, too. There is a
corresponding [DISCUSS] thread for comments.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-10%3A+Unify+Checkpoints+and+Savepoints


Controlling savepoints from inside an application

2016-09-30 Thread Astrac
In the project I am working on we are versioning all our flink operators in
order to be able to re-build the state from external sources (i.e. Kafka) by
bumping that version number; this works pretty nicely so far, except that we
need to be aware of wether or not we need to load the savepoint before
deploying as we need to provide different command line arguments via our
deployment script.

The thing I'd like to do instead is versioning the savepoint storage
mechanism and store the savepoints in different folders depending on the
version of our application we are running. This way when we bump the version
number we really start from scratch and we don't risk any exception due to
state deserialisation; when we don't bump the number instead we keep the
state from the previous version of the application and start from there.

To do this I would need to control the storage path of the savepoints from
within our application code but I couldn't find a way to do it; if that's
relevant we run on Yarn, keep checkpoint on the FsStateBackend and keep both
savepoints and checkpoints on HDFS. Our main class looks something like
this:

val flinkEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val stateBackend = new
FsStateBackend(s"hdfs://${config.getString("utd.hdfs.namenode.host")}:8020/user/hadoop/flink/checkpoints")
flinkEnvironment.setStateBackend(stateBackend)
// ... define more configurations and the streaming jobs
flinkEnvironment.enableCheckpointing(60).execute()

Is there a way in this initialisation code to achieve the following?

* Configure the savepoint path while we build the StreamExecutionEnvironment
rather than in flink-conf.yml
* Manually read a savepoint rather than passing it via the CLI

Many Thanks!



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Controlling-savepoints-from-inside-an-application-tp9273.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Blobstorage Locally and on HDFS

2016-09-30 Thread Konstantin Knauf
Hi Ufuk,

thanks for your quick answer.

Setup: 2 Servers, each running a JM as well as TM

1) Removing all existing blobstores locally (/tmp) as well as on HDFS
2) Starting a flink streaming job

Now there are the following BLOBs:

Local:

*Leader JM:

4.0K/tmp/blobStore-563a8820-9617-4d89-97a7-fc3cc258dff4/incoming

64M /tmp/blobStore-563a8820-9617-4d89-97a7-fc3cc258dff4

64M /tmp/blobStore-563a8820-9617-4d89-97a7-fc3cc258dff4/cache

64M /tmp/blobStore-c6b93d41-8916-4a8d-b595-6e35f0b10401

64M /tmp/blobStore-c6b93d41-8916-4a8d-b595-6e35f0b10401/cache

*Standby JM:

64M /tmp/blobStore-4cbfd3c0-2a70-4485-8fc0-045ca7f08cea

64M /tmp/blobStore-4cbfd3c0-2a70-4485-8fc0-045ca7f08cea/cache

HDFS:

66595700 2016-09-30 13:03
<..>/flink/blob/cache/blob_da76e12b949a83404f97b6eb59416deaa31a907b


3) Cancelinng both jobs via command line:

Now there are the following BLOBs:

**same as above**

When starting the same job again, no new blobs are created.

Is it a problem to delete local blobStores of running jobs or will the
blobs just be downloaded again from HDFS if needed?

Cheers,

Konstantin



Is it correct, that ea

On 30.09.2016 10:28, Ufuk Celebi wrote:
> On Fri, Sep 30, 2016 at 9:12 AM, Konstantin Knauf
>  wrote:
>> we are running a Flink (1.1.2) Stand-Alone Cluster with JM HA, and HDFS
>> as checkpoint and recovery storage dir. What we see is that blobStores
>> are stored in HDFS as well as under the local Jobmanagers and
>> Taskmanagers /tmp directory.
>>
>> Is this the expected behaviour? Is there any documentation on which
>> blobs are stored locally and which are stored in HDFS in our case? In
>> particular, we would need to know when it is save to delete blobs stored
>> locally because there are not cleanup up by Flink and fill up the /tmp
>> partition eventually.
> 
> BLOBs are copied to another directory in case of HA in order to be
> available for other job managers that might take over.
> 
> On regular termination (cancel, finish) all BLOBs should be cleaned
> up. With hard failures, it can happen that BLOBs are not cleaned up.
> 
> Do you know in which cases you see BLOBs not being cleaned up? If it
> is the first one, that sounds like a bug to me.
> 
> – Ufuk
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc
Description: OpenPGP digital signature


Re: Error while adding data to RocksDB: No more bytes left

2016-09-30 Thread Stephan Ewen
@Shannon Concerning the issue with long checkpoints even though the
snapshot is very short:

I found a critical issue with the Flink Kafka 0.9 Consumer - on
low-throughput topics/partitions, it can lock up for a while, preventing
checkpoints to be triggered (barriers injected).

There is a fix going in now, probably a 1.1.3 release with that fix soon.

On Thu, Sep 29, 2016 at 9:12 PM, Shannon Carey  wrote:

> Hi Stephan!
>
> The failure appeared to occur every 10 minutes, which is also the interval
> for checkpointing. However, I agree with you that the stack trace appears
> to be independent. Could this perhaps be an issue with multithreading,
> where the checkpoint mechanism is somehow interfering with ongoing
> operation of the state backend? I've never seen this problem until now, so
> I am a little suspicious that it might be due to something in my code, but
> so far it's been difficult to figure out what that might be.
>
> I am using the default, SemiAsync snapshot mode.
>
> The classes of the data flow are a bit too large to put here in their
> entirety. We are using Scala case classes, Java classes generated by Avro,
> Tuples, Scala Option, java.util.UUID and Scala mutable.Map. The majority of
> these classes have been operational in other jobs before. I added a unit
> test for the class which contains a mutable.Map to see whether that was
> causing a problem. Does this look like a reasonable unit test to verify
> Flink serializability to you?
>
> it("roundtrip serializes in Flink") {
>   val millis: Long = TimeUnit.DAYS.toMillis(2)
>   val original: PreferredAirportDailySum = new 
> PreferredAirportDailySum(millis)
>   original.add("a", TimestampedAirportCount(4, 6))
>   original.add("b", TimestampedAirportCount(7, 8))
>
>   val deserialized: PreferredAirportDailySum = 
> serializationRoundTrip(original, 100)
>
>   deserialized.timestamp shouldBe millis
>   deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6)
>   deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8)
> }
>
> def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, 
> expectedMaxBytes: Int): T = {
>   val typeInfo = implicitly[TypeInformation[T]]
>   val serializer: TypeSerializer[T] = typeInfo.createSerializer(new 
> ExecutionConfig)
>
>   val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes)
>   val outputView: DataOutputView = new DataOutputViewStreamWrapper(out)
>   serializer.serialize(original, outputView)
>
>   out.size() should be <= expectedMaxBytes
>
>   val inputView: DataInputViewStreamWrapper =
> new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))
>   val deserialized: T = serializer.deserialize(inputView)
>
>   deserialized
> }
>
> I tried running my job in a local one-slot cluster with RocksDB enabled
> but checkpointing to local filesystem. Similar errors occur, but are more
> sporadic. I have not yet been able to capture the error while debugging,
> but if I do I will provide additional information.
>
> I noticed that locally, execution only reaches
> DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint
> completes. Also, the timing of checkpointing a bit odd: in the example
> below the checkpoint takes 200s to complete after being triggered even
> though RocksDB reports that it only took ~100ms.
>
> 2016-09-29 12:56:17,619 INFO  CheckpointCoordinator - Triggering
> checkpoint 2 @ 1475171777619
> 2016-09-29 12:59:38,079 INFO  RocksDBStateBackend  - RocksDB
> (/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db)
> backup (synchronous part) took 7 ms.
> 2016-09-29 12:59:38,214 INFO  RocksDBStateBackend  - RocksDB
> materialization from /var/folders/…/WindowOperator_
> 38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2 to
> file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2
> (asynchronous part) took 96 ms.
> 2016-09-29 12:59:38,333 INFO  CheckpointCoordinator - Completed
> checkpoint 2 (in 200621 ms)
>
> Do you have any other advice?
>
> Exceptions from local execution:
>
> java.lang.RuntimeException: Error while adding data to RocksDB
> at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(
> RocksDBFoldingState.java:125)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:382)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:176)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:66)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:266)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
> 'CLE
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> 

Re: Counting latest state of stateful entities in streaming

2016-09-30 Thread Fabian Hueske
This works with event-time as well. You need to set the right
TimeCharacteristics on the exec env and assign timestamps + watermarks. The
only time depended operation is the window. YourWindowFunction assigns the
timestamp of the window. WindowFunction.apply() has a TimeWindow parameter
that gives access to the window's start and end time. See docs [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#windowfunction-with-incremental-aggregation

2016-09-30 11:00 GMT+02:00 Simone Robutti :

> I'm working with your suggestions, thank you very much. What I'm missing
> here is what YourWindowFunction should do. I have no notion of event time
> there and so I can't assign a timestamp. Also this solution seems to be
> working by processing time, while I care about event time. I couldn't make
> it run yet but for what I got, this is slightly different from what I need.
>
> 2016-09-30 10:04 GMT+02:00 Fabian Hueske :
>
>> Hi Simone,
>>
>> I think I have a solution for your problem:
>>
>> val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time)
>>
>> val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate)
>>   .keyBy(_._1) // key by id
>>   .flatMap(new StateUpdater) // StateUpdater is a stateful
>> FlatMapFunction. It has a keyed state that stores the last state of each
>> id. For each input record it returns two records: (oldState, -1),
>> (newState, +1)
>>
>> stateChanges ensures that counts of previous states are subtracted.
>>
>> val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges //
>> (state, cntUpdate, time)
>>   .keyBy(_._1) // key by state
>>   .window() // your window, should be non-overlapping, so go for instance
>> for Tumbling
>>   .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums
>> the cntUpdates and YourWindowFunction assigns the timestamp of your window
>>
>> this step aggregates all state changes for each state in a window
>>
>> val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count,
>> time)
>>   .keyBy(_._1) // key by state again
>>   .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has
>> a keyed state that stores the current count. For each incoming record, the
>> count is adjusted and a record (state, newCount, time) is emitted.
>>
>> Now you have the new counts for your states in multiple records. If
>> possible, you can update your Elasticsearch index using these. Otherwise,
>> you have to collect them into one record using another window.
>>
>> Also note, that the state size of this program depends on the number of
>> unique ids. That might cause problems if the id space grows very fast.
>>
>> Please let me know, if you have questions or if that works ;-)
>>
>> Cheers, Fabian
>>
>>
>> 2016-09-30 0:32 GMT+02:00 Simone Robutti :
>>
>>> Hello,
>>>
>>> in the last few days I tried to create my first real-time analytics job
>>> in Flink. The approach is kappa-architecture-like, so I have my raw data on
>>> Kafka where we receive a message for every change of state of any entity.
>>>
>>> So the messages are of the form
>>>
>>> (id,newStatus, timestamp)
>>>
>>> We want to compute, for every time window, the count of items in a given
>>> status. So the output should be of the form
>>>
>>> (outputTimestamp, state1:count1,state2:count2 ...)
>>>
>>> or equivalent. These rows should contain, at any given time, the count
>>> of the items in a given status, where the status associated to an Id is the
>>> most recent message observed for that id. The status for an id should be
>>> counted in any case, even if the event is way older than those getting
>>> processed. So the sum of all the counts should be equal to the number of
>>> different IDs observed in the system. The following step could be
>>> forgetting about the items in a final item after a while, but this is not a
>>> strict requirement right now.
>>>
>>> This will be written on elasticsearch and then queried.
>>>
>>> I tried many different paths and none of them completely satisfied the
>>> requirement. Using a sliding window I could easily achieve the expected
>>> behaviour, except that when the beginning of the sliding window surpassed
>>> the timestamp of an event, it was lost for the count, as you may expect.
>>> Others approaches failed to be consistent when working with a backlog
>>> because I did some tricks with keys and timestamps that failed when the
>>> data was processed all at once.
>>>
>>> So I would like to know, even at an high level, how should I approach
>>> this problem. It looks like a relatively common use-case but the fact that
>>> the relevant information for a given ID must be retained indefinitely to
>>> count the entities correctly creates a lot of problems.
>>>
>>> Thank you in advance,
>>>
>>> Simone
>>>
>>>
>>
>


Re: Counting latest state of stateful entities in streaming

2016-09-30 Thread Simone Robutti
I'm working with your suggestions, thank you very much. What I'm missing
here is what YourWindowFunction should do. I have no notion of event time
there and so I can't assign a timestamp. Also this solution seems to be
working by processing time, while I care about event time. I couldn't make
it run yet but for what I got, this is slightly different from what I need.

2016-09-30 10:04 GMT+02:00 Fabian Hueske :

> Hi Simone,
>
> I think I have a solution for your problem:
>
> val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time)
>
> val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate)
>   .keyBy(_._1) // key by id
>   .flatMap(new StateUpdater) // StateUpdater is a stateful
> FlatMapFunction. It has a keyed state that stores the last state of each
> id. For each input record it returns two records: (oldState, -1),
> (newState, +1)
>
> stateChanges ensures that counts of previous states are subtracted.
>
> val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges //
> (state, cntUpdate, time)
>   .keyBy(_._1) // key by state
>   .window() // your window, should be non-overlapping, so go for instance
> for Tumbling
>   .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums
> the cntUpdates and YourWindowFunction assigns the timestamp of your window
>
> this step aggregates all state changes for each state in a window
>
> val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count,
> time)
>   .keyBy(_._1) // key by state again
>   .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has
> a keyed state that stores the current count. For each incoming record, the
> count is adjusted and a record (state, newCount, time) is emitted.
>
> Now you have the new counts for your states in multiple records. If
> possible, you can update your Elasticsearch index using these. Otherwise,
> you have to collect them into one record using another window.
>
> Also note, that the state size of this program depends on the number of
> unique ids. That might cause problems if the id space grows very fast.
>
> Please let me know, if you have questions or if that works ;-)
>
> Cheers, Fabian
>
>
> 2016-09-30 0:32 GMT+02:00 Simone Robutti :
>
>> Hello,
>>
>> in the last few days I tried to create my first real-time analytics job
>> in Flink. The approach is kappa-architecture-like, so I have my raw data on
>> Kafka where we receive a message for every change of state of any entity.
>>
>> So the messages are of the form
>>
>> (id,newStatus, timestamp)
>>
>> We want to compute, for every time window, the count of items in a given
>> status. So the output should be of the form
>>
>> (outputTimestamp, state1:count1,state2:count2 ...)
>>
>> or equivalent. These rows should contain, at any given time, the count of
>> the items in a given status, where the status associated to an Id is the
>> most recent message observed for that id. The status for an id should be
>> counted in any case, even if the event is way older than those getting
>> processed. So the sum of all the counts should be equal to the number of
>> different IDs observed in the system. The following step could be
>> forgetting about the items in a final item after a while, but this is not a
>> strict requirement right now.
>>
>> This will be written on elasticsearch and then queried.
>>
>> I tried many different paths and none of them completely satisfied the
>> requirement. Using a sliding window I could easily achieve the expected
>> behaviour, except that when the beginning of the sliding window surpassed
>> the timestamp of an event, it was lost for the count, as you may expect.
>> Others approaches failed to be consistent when working with a backlog
>> because I did some tricks with keys and timestamps that failed when the
>> data was processed all at once.
>>
>> So I would like to know, even at an high level, how should I approach
>> this problem. It looks like a relatively common use-case but the fact that
>> the relevant information for a given ID must be retained indefinitely to
>> count the entities correctly creates a lot of problems.
>>
>> Thank you in advance,
>>
>> Simone
>>
>>
>


Re: Error while adding data to RocksDB: No more bytes left

2016-09-30 Thread Stephan Ewen
Agree with Stefan, let's see if the fully async snapshot mode helps. It
looks suspiciously RocksDB related...

On Fri, Sep 30, 2016 at 10:30 AM, Stefan Richter <
s.rich...@data-artisans.com> wrote:

> Hi Shannon,
>
> from your new stack trace and the bogus class names, I agree with Stephan
> that either serialization or the database itself is corrupted in some way.
> Could you please check if this problem only happens if checkpointing is
> enabled? If yes, does switching to fully async snapshots change the
> behavior?
>
> Best,
> Stefan
>
> Am 29.09.2016 um 21:12 schrieb Shannon Carey :
>
> Hi Stephan!
>
> The failure appeared to occur every 10 minutes, which is also the interval
> for checkpointing. However, I agree with you that the stack trace appears
> to be independent. Could this perhaps be an issue with multithreading,
> where the checkpoint mechanism is somehow interfering with ongoing
> operation of the state backend? I've never seen this problem until now, so
> I am a little suspicious that it might be due to something in my code, but
> so far it's been difficult to figure out what that might be.
>
> I am using the default, SemiAsync snapshot mode.
>
> The classes of the data flow are a bit too large to put here in their
> entirety. We are using Scala case classes, Java classes generated by Avro,
> Tuples, Scala Option, java.util.UUID and Scala mutable.Map. The majority of
> these classes have been operational in other jobs before. I added a unit
> test for the class which contains a mutable.Map to see whether that was
> causing a problem. Does this look like a reasonable unit test to verify
> Flink serializability to you?
>
> it("roundtrip serializes in Flink") {
>   val millis: Long = TimeUnit.DAYS.toMillis(2)
>   val original: PreferredAirportDailySum = new 
> PreferredAirportDailySum(millis)
>   original.add("a", TimestampedAirportCount(4, 6))
>   original.add("b", TimestampedAirportCount(7, 8))
>
>   val deserialized: PreferredAirportDailySum = 
> serializationRoundTrip(original, 100)
>
>   deserialized.timestamp shouldBe millis
>   deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6)
>   deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8)
> }
>
> def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, 
> expectedMaxBytes: Int): T = {
>   val typeInfo = implicitly[TypeInformation[T]]
>   val serializer: TypeSerializer[T] = typeInfo.createSerializer(new 
> ExecutionConfig)
>
>   val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes)
>   val outputView: DataOutputView = new DataOutputViewStreamWrapper(out)
>   serializer.serialize(original, outputView)
>
>   out.size() should be <= expectedMaxBytes
>
>   val inputView: DataInputViewStreamWrapper =
> new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))
>   val deserialized: T = serializer.deserialize(inputView)
>
>   deserialized
> }
>
> I tried running my job in a local one-slot cluster with RocksDB enabled
> but checkpointing to local filesystem. Similar errors occur, but are more
> sporadic. I have not yet been able to capture the error while debugging,
> but if I do I will provide additional information.
>
> I noticed that locally, execution only reaches
> DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint
> completes. Also, the timing of checkpointing a bit odd: in the example
> below the checkpoint takes 200s to complete after being triggered even
> though RocksDB reports that it only took ~100ms.
>
> 2016-09-29 12:56:17,619 INFO  CheckpointCoordinator - Triggering
> checkpoint 2 @ 1475171777619
> 2016-09-29 12:59:38,079 INFO  RocksDBStateBackend  - RocksDB
> (/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db)
> backup (synchronous part) took 7 ms.
> 2016-09-29 12:59:38,214 INFO  RocksDBStateBackend  - RocksDB
> materialization from /var/folders/…/WindowOperator_
> 38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2 to
> file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2
> (asynchronous part) took 96 ms.
> 2016-09-29 12:59:38,333 INFO  CheckpointCoordinator - Completed
> checkpoint 2 (in 200621 ms)
>
> Do you have any other advice?
>
> Exceptions from local execution:
>
> java.lang.RuntimeException: Error while adding data to RocksDB
> at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(
> RocksDBFoldingState.java:125)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:382)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:176)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:66)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:266)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at 

Re: Exceptions from collector.collect after cancelling job

2016-09-30 Thread Ufuk Celebi
On Thu, Sep 29, 2016 at 9:29 PM, Shannon Carey  wrote:
> It looks like Flink is disabling the objects that the FlatMap collector
> relies on before disabling the operator itself. Is that expected/normal? Is
> there anything I should change in my FlatMap function or job code to account
> for it?

Hey Shannon,

Flink actually does cancel the tasks *before* cleaning up the network
resources that throw the root Exception here.

We actually don't log any Exceptions that are thrown during
cancellation, because it is possible that the user code/operator use
the closed resources concurrently with cancellation (your stack traces
essentially), but it looks like in some places we don't respect this.

Can you tell which classes actually log this? Would be good to fix
this if possible as it is very confusing and looks quite bad. I don't
expect it to be an actual problem though.

– Ufuk


Re: Error while adding data to RocksDB: No more bytes left

2016-09-30 Thread Stefan Richter
Hi Shannon,

from your new stack trace and the bogus class names, I agree with Stephan that 
either serialization or the database itself is corrupted in some way. Could you 
please check if this problem only happens if checkpointing is enabled? If yes, 
does switching to fully async snapshots change the behavior?

Best,
Stefan

> Am 29.09.2016 um 21:12 schrieb Shannon Carey :
> 
> Hi Stephan!
> 
> The failure appeared to occur every 10 minutes, which is also the interval 
> for checkpointing. However, I agree with you that the stack trace appears to 
> be independent. Could this perhaps be an issue with multithreading, where the 
> checkpoint mechanism is somehow interfering with ongoing operation of the 
> state backend? I've never seen this problem until now, so I am a little 
> suspicious that it might be due to something in my code, but so far it's been 
> difficult to figure out what that might be.
> 
> I am using the default, SemiAsync snapshot mode.
> 
> The classes of the data flow are a bit too large to put here in their 
> entirety. We are using Scala case classes, Java classes generated by Avro, 
> Tuples, Scala Option, java.util.UUID and Scala mutable.Map. The majority of 
> these classes have been operational in other jobs before. I added a unit test 
> for the class which contains a mutable.Map to see whether that was causing a 
> problem. Does this look like a reasonable unit test to verify Flink 
> serializability to you?
> it("roundtrip serializes in Flink") {
>   val millis: Long = TimeUnit.DAYS.toMillis(2)
>   val original: PreferredAirportDailySum = new 
> PreferredAirportDailySum(millis)
>   original.add("a", TimestampedAirportCount(4, 6))
>   original.add("b", TimestampedAirportCount(7, 8))
> 
>   val deserialized: PreferredAirportDailySum = 
> serializationRoundTrip(original, 100)
> 
>   deserialized.timestamp shouldBe millis
>   deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6)
>   deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8)
> }
> 
> def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, 
> expectedMaxBytes: Int): T = {
>   val typeInfo = implicitly[TypeInformation[T]]
>   val serializer: TypeSerializer[T] = typeInfo.createSerializer(new 
> ExecutionConfig)
> 
>   val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes)
>   val outputView: DataOutputView = new DataOutputViewStreamWrapper(out)
>   serializer.serialize(original, outputView)
> 
>   out.size() should be <= expectedMaxBytes
> 
>   val inputView: DataInputViewStreamWrapper =
> new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))
>   val deserialized: T = serializer.deserialize(inputView)
> 
>   deserialized
> }
> I tried running my job in a local one-slot cluster with RocksDB enabled but 
> checkpointing to local filesystem. Similar errors occur, but are more 
> sporadic. I have not yet been able to capture the error while debugging, but 
> if I do I will provide additional information.
> 
> I noticed that locally, execution only reaches 
> DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint 
> completes. Also, the timing of checkpointing a bit odd: in the example below 
> the checkpoint takes 200s to complete after being triggered even though 
> RocksDB reports that it only took ~100ms.
> 
> 2016-09-29 12:56:17,619 INFO  CheckpointCoordinator - Triggering 
> checkpoint 2 @ 1475171777619
> 2016-09-29 12:59:38,079 INFO  RocksDBStateBackend  - RocksDB 
> (/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db)
>  backup (synchronous part) took 7 ms.
> 2016-09-29 12:59:38,214 INFO  RocksDBStateBackend  - RocksDB materialization 
> from 
> /var/folders/…/WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2
>  to file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2 
> (asynchronous part) took 96 ms.
> 2016-09-29 12:59:38,333 INFO  CheckpointCoordinator - Completed 
> checkpoint 2 (in 200621 ms)
> 
> Do you have any other advice?
> 
> Exceptions from local execution:
> 
> java.lang.RuntimeException: Error while adding data to RocksDB
> at 
> org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 'CLE
> at 
> 

Re: Blobstorage Locally and on HDFS

2016-09-30 Thread Ufuk Celebi
On Fri, Sep 30, 2016 at 9:12 AM, Konstantin Knauf
 wrote:
> we are running a Flink (1.1.2) Stand-Alone Cluster with JM HA, and HDFS
> as checkpoint and recovery storage dir. What we see is that blobStores
> are stored in HDFS as well as under the local Jobmanagers and
> Taskmanagers /tmp directory.
>
> Is this the expected behaviour? Is there any documentation on which
> blobs are stored locally and which are stored in HDFS in our case? In
> particular, we would need to know when it is save to delete blobs stored
> locally because there are not cleanup up by Flink and fill up the /tmp
> partition eventually.

BLOBs are copied to another directory in case of HA in order to be
available for other job managers that might take over.

On regular termination (cancel, finish) all BLOBs should be cleaned
up. With hard failures, it can happen that BLOBs are not cleaned up.

Do you know in which cases you see BLOBs not being cleaned up? If it
is the first one, that sounds like a bug to me.

– Ufuk


回复:How to Broadcast a very large model object (used in iterative scoring in recommendation system) in Flink

2016-09-30 Thread rimin515
your message is very short,i can not read more.the follow is my guss,
in flink,the dataStream is not for iterative computation,the dataSet would 
be more well.and fink suggest broadcast mini data,not large.

   your can load your model data (it can be from file,or table),before main 
function,andassignment to variable ,like name=yourModel.
 and the dataStream(it is a stream,unscored record,like DataStream[String] or 
DataStream[yourClass]),
and dataStream.map{x=>
  val score = computeScore(x,yourModel) 
}

object YourObject {

load your model 
val yourModel = ;

def main(){
   ...
read unscoreed record,from socket or kafka,or 

 dataStream.map{x=>
  val score = computeScore(x,yourModel) 
}
   ..
}
}
- 原始邮件 -
发件人:Anchit Jatana 
收件人:user@flink.apache.org
主题:How to Broadcast a very large model object (used in iterative scoring in 
recommendation system) in Flink
日期:2016年09月30日 14点15分

Hi All,
Im building a recommendation system streaming application for which I need 
to broadcast a very large model object (used in iterative scoring) among all 
the task managers performing the operation parallely for the operator
Im doing an this operation in map1 of CoMapFunction. Please suggest me 
some way to achieve the broadcasting of the large model variable (something 
similar to what Spark has with broadcast variables).
Thank you
Regards,Anchit



Re: Counting latest state of stateful entities in streaming

2016-09-30 Thread Fabian Hueske
Hi Simone,

I think I have a solution for your problem:

val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time)

val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate)
  .keyBy(_._1) // key by id
  .flatMap(new StateUpdater) // StateUpdater is a stateful FlatMapFunction.
It has a keyed state that stores the last state of each id. For each input
record it returns two records: (oldState, -1), (newState, +1)

stateChanges ensures that counts of previous states are subtracted.

val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges //
(state, cntUpdate, time)
  .keyBy(_._1) // key by state
  .window() // your window, should be non-overlapping, so go for instance
for Tumbling
  .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums the
cntUpdates and YourWindowFunction assigns the timestamp of your window

this step aggregates all state changes for each state in a window

val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count, time)
  .keyBy(_._1) // key by state again
  .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has a
keyed state that stores the current count. For each incoming record, the
count is adjusted and a record (state, newCount, time) is emitted.

Now you have the new counts for your states in multiple records. If
possible, you can update your Elasticsearch index using these. Otherwise,
you have to collect them into one record using another window.

Also note, that the state size of this program depends on the number of
unique ids. That might cause problems if the id space grows very fast.

Please let me know, if you have questions or if that works ;-)

Cheers, Fabian

2016-09-30 0:32 GMT+02:00 Simone Robutti :

> Hello,
>
> in the last few days I tried to create my first real-time analytics job in
> Flink. The approach is kappa-architecture-like, so I have my raw data on
> Kafka where we receive a message for every change of state of any entity.
>
> So the messages are of the form
>
> (id,newStatus, timestamp)
>
> We want to compute, for every time window, the count of items in a given
> status. So the output should be of the form
>
> (outputTimestamp, state1:count1,state2:count2 ...)
>
> or equivalent. These rows should contain, at any given time, the count of
> the items in a given status, where the status associated to an Id is the
> most recent message observed for that id. The status for an id should be
> counted in any case, even if the event is way older than those getting
> processed. So the sum of all the counts should be equal to the number of
> different IDs observed in the system. The following step could be
> forgetting about the items in a final item after a while, but this is not a
> strict requirement right now.
>
> This will be written on elasticsearch and then queried.
>
> I tried many different paths and none of them completely satisfied the
> requirement. Using a sliding window I could easily achieve the expected
> behaviour, except that when the beginning of the sliding window surpassed
> the timestamp of an event, it was lost for the count, as you may expect.
> Others approaches failed to be consistent when working with a backlog
> because I did some tricks with keys and timestamps that failed when the
> data was processed all at once.
>
> So I would like to know, even at an high level, how should I approach this
> problem. It looks like a relatively common use-case but the fact that the
> relevant information for a given ID must be retained indefinitely to count
> the entities correctly creates a lot of problems.
>
> Thank you in advance,
>
> Simone
>
>


How to Broadcast a very large model object (used in iterative scoring in recommendation system) in Flink

2016-09-30 Thread Anchit Jatana
Hi All,

I'm building a recommendation system streaming application for which I need
to broadcast a very large model object (used in iterative scoring) among
all the task managers performing the operation parallely for the operator

I'm doing an this operation in map1 of CoMapFunction. Please suggest me
some way to achieve the broadcasting of the large model variable (something
similar to what Spark has with broadcast variables).

Thank you

Regards,
Anchit