co-location groups vs slot sharing groups

2018-02-20 Thread Deepak Sharma
Hi,

I have a few questions regarding slot sharing and co-location:

The slotSharingGroup(String name) function sets the slot sharing group, is
there a function to set the co-location group?

Does setting a colocation group exclude tasks of other groups from the
co-located group's task slots?

Thank You,
Deepak


Re: Regarding BucketingSink

2018-02-20 Thread Mu Kong
Hi Aljoscha,

Thanks for confirming that fact that Flink doesn't clean up pending files.
Is that safe to clean(remove) all the pending files after cancel(w/ or w/o
savepointing) or failover?
If we do that, will we lose some data?

Thanks!

Best,
Mu




On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi 
wrote:

> Sorry, but just wanted to confirm that  the assertion "at-least-once"
> delivery  true if there is a dangling pending file ?
>
> On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> That is fine, till flink assure at-least-once semantics ?
>>
>> If the contents of a .pending file, through the turbulence ( restarts etc
>> )  are assured to be in another file than anything starting with "_"
>> underscore will by default ignored by hadoop ( hive or MR etc ).
>>
>>
>>
>> On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> Sorry for the confusion. The framework (Flink) does currently not do any
>>> cleanup of pending files, yes.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 19. Feb 2018, at 17:01, Vishal Santoshi 
>>> wrote:
>>>
>>> >> You should only have these dangling pending files after a
>>> failure-recovery cycle, as you noticed. My suggestion would be to
>>> periodically clean up older pending files.
>>>
>>> A little confused. Is that what the framework should do, or us as part
>>> of some cleanup job ?
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi,

 The BucketingSink does not clean up pending files on purpose. In a
 distributed setting, and especially with rescaling of Flink operators, it
 is sufficiently hard to figure out which of the pending files you actually
 can delete and which of them you have to leave because they will get moved
 to "final" as part of recovering from a checkpoint on some other parallel
 instance of the sink.

 You should only have these dangling pending files after a
 failure-recovery cycle, as you noticed. My suggestion would be to
 periodically clean up older pending files.

 Best,
 Aljoscha


 On 19. Feb 2018, at 16:37, Till Rohrmann  wrote:

 Hi Vishal,

 what pending files should indeed get eventually finalized. This happens
 on a checkpoint complete notification. Thus, what you report seems not
 right. Maybe Aljoscha can shed a bit more light into the problem.

 In order to further debug the problem, it would be really helpful to
 get access to DEBUG log files of a TM which runs the BucketingSink.

 Cheers,
 Till

 On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong  wrote:

> Hi Vishal,
>
> I have the same concern about save pointing in BucketingSink.
> As for your question, I think before the pending files get cleared in
> handleRestoredBucketState .
> They are finalized in notifyCheckpointComplete
>
> https://github.com/apache/flink/blob/master/flink-connectors
> /flink-connector-filesystem/src/main/java/org/apache/flink/s
> treaming/connectors/fs/bucketing/BucketingSink.java#L628
>
> I'm looking into this part of the source code now, since we are
> experiencing some unclosed files after check pointing.
> It would be great if you can share more if you find something new
> about your problem, which might help with our problem.
>
> Best regards,
> Mu
>
> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> -rw-r--r--   3 root hadoop 11 2018-02-14 18:48
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>> -rw-r--r--   3 root hadoop 11 2018-02-14 21:17
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>>
>>
>> This is strange, we had a few retries b'coz of an OOM on one of the
>> TMs and we see this situation. 2 files ( on either sides )  that were 
>> dealt
>> with fine but a dangling .pending file. I am sure this is not what is 
>> meant
>> to be.   We I think have an edge condition and looking at the code it is
>> not obvious. May be some one who wrote the code can shed some light as to
>> how can this happen.
>>
>>
>>
>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> without --allowNonRestoredState, on a suspend/resume we do see the
>>> length file along with the finalized file ( finalized during resume )
>>>
>>> -rw-r--r--   3 root hadoop 10 2018-02-09 13:57
>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>>

Re: SQL materialized upsert tables

2018-02-20 Thread Elias Levy
[ Adding the list back in, as this clarifies my question ]

On Tue, Feb 20, 2018 at 3:42 PM, Darshan Singh 
wrote:

> I am no expert in Flink but I will try my best. Issue you mentioned will
> be with all streaming systems even with Kafka KTable I use them a lot for
> similar sort of requirements.
>
> In Kafka you have KTable on Telemetry with 3 records and join with say
> Scores which could be KTable or Kstrem  and you start your streaming query
> as mentioned above it will give just 1 row as expected. However, if there
> is a new value for the same key with timestamp greater than previous max
> will be added to the Telemetry it will output the new value as well and
> that is main idea about the streaming anyway you want to see the changed
> value. So once you started streaming you will get whatever is the outcome
> of your
>

Darshan,

Thanks for the reply.  I've already implemented this job using Kafka
Streams, so I am aware of how KTables behaves.  I would have helped if I
had included some sample data in my post, so here it is.  If you have this
data coming into Telemetry:

ts, item, score, source
0, item1, 1, source1
1, item1, 1, source1
2, item1, 1, source1

And this comes into Scores:

ts, item, score
3, item1, 3

Flink will output 3 records from the queries I mentioned:

(3, item1, 3, source1)
(3, item1, 3, source1)
(3, item1, 3, source1)


In contrast, if you run the query in Kafka Stream configuring Telemetry as
a KTable keyed by (item, source), the output will be a single record.  In
Telemetry record for key (item1, source1) at time 1 will overwrite the
record at time 0, and the record at time 2 will overwrite the one at time
1.  By the time the record at time 3 comes in via Scores, it will be joined
only with the record from time 2 in Telemetry.

Yes, it is possible for the Kafka Streams query to output multiple records
if the records from the different streams are not time aligned, as Kafka
Streams only guarantees a best effort aligning the streams. But in the
common case the output will be a single record.


I think in fllink you can do the same, from your telemeter stream/table you
> can create the LatestTelemetry table using similar sql(I am sure it should
> give you latest timestamp's data) as you did with the RDBMS and then join
> with scores table. You should get similar results to KTable or any other
> streaming system.
>

Not sure if you missed it, but I actually executed the query to define the
LatestTelemetry table in Flink using that query and joined against it.  The
output was the same three records.


SQL materialized upsert tables

2018-02-20 Thread Elias Levy
I noticed that has been significant work on the SQL / Table subsystem and
decided to evaluate it for one of our use cases.  The use case requires the
joining of two streams, which can be considered a stream of table upserts.
Critically, when joining the streams, we only want to join against the
latest value per key in one of the tables/streams.

Simply performing a join between the stream/tables is not sufficient, as it
will generate result of records other than the latest one.  E.g. if you
have two steam/tables with schema:

Telemetry [
  tstamp: Long
  item: String
  score: Int
  source: String
]

Scores [
  tstamp: Long
  item: String
  score: Int
]

tableEnv.sqlQuery("""
SELECT s.tstamp, s.item, s.score, t.source
FROMTelemetry t INNER JOIN Scores s ON s.item = t.item
WHERE s.score <> t.score AND s.tstamp >= t.tstamp
""")

If the stream receives 3 records from the telemetry stream for the same
source and then a record that matches the item from the score stream that
updates the score, it will generate three output records, even though we
only want the latest record from the source to be considered.

If this were a regular database we could do the following to only get the
latest records with the telemetry table:

tableEnv.sqlQuery("""
SELECT a.tstamp, a.item, a.score, a.source
FROM Telemetry a
  INNER JOIN (
SELECT MAX(tstamp), item, source
FROM Telemetry
GROUP BY item, source
  ) b ON a.item = b.item AND a.source = b.source
""")

and then execute the previous query against this LatestTelemetry table
instead of Telemetry.  But that does not work.  The query executed within
Flink, but still outputs multiple records, regardless of the order the
records come into the source streams.

I am wondering if there is a way to accomplish this within Flink's
SQL/Table abstractions.

Kafka Streams has the concept of a KTable, where records are considered
upserts and update previous records that have the same key.  Thus, when you
join against a KTable, you only join against the latest record for a given
key, rather than all previous records for the key.

Thoughts?


Re: Need to understand the execution model of the Flink

2018-02-20 Thread Darshan Singh
Is there any plans for this in future. I could see at the plans and without
these stats I am bit lost on what to look for like what are pain points
etc. I can see some very obvious things but not too much with these plans.

My question is there a guide or document which describes what your plans
should look like and what needs to look into this?

Also, I would like to know if there is a very complex execution plan(maybe
not expensive but very complex) is it usually beneficial to save the
intermediate datasets/tables and read them back and do the next steps.

Thanks

On Tue, Feb 20, 2018 at 9:34 AM, Fabian Hueske  wrote:

> No, there is no size or cardinality estimation happening at the moment.
>
> Best, Fabian
>
> 2018-02-19 21:56 GMT+01:00 Darshan Singh :
>
>> Thanks , is there a metric or other way to know how much space each
>> task/job is taking? Does execution plan has these details?
>>
>> Thanks
>>
>> On Mon, Feb 19, 2018 at 10:54 AM, Fabian Hueske 
>> wrote:
>>
>>> Hi,
>>>
>>> that's a difficult question without knowing the details of your job.
>>> A NoSpaceLeftOnDevice error occurs when a file system is full.
>>>
>>> This can happen if:
>>> - A Flink algorithm writes to disk, e.g., an external sort or the hash
>>> table of a hybrid hash join. This can happen for GroupBy, Join, Distinct,
>>> or any other operation that requires to group or join data. Filters will
>>> never spill to disk.
>>> - An OutputFormat writes to disk.
>>>
>>> The data is written to a temp directory, that can be configured in the
>>> ./conf/flink-conf.yaml file.
>>>
>>> Did you check how the tasks are distributed across the task managers?
>>> The web UI can help to diagnose such problems.
>>>
>>> Best, Fabian
>>>
>>> 2018-02-19 11:22 GMT+01:00 Darshan Singh :
>>>
 Thanks Fabian for such detailed explanation.

 I am using a datset in between so i guess csv is read once. Now to my
 real issue i have 6 task managers each having 4 cores and i have 2 slots
 per task manager.

 Now my csv file is jus 1 gb and i create table and transform to dataset
 and then run 15 different filters and extra processing which all run in
 almost parallel.

 However it fails with error no space left on device on one of the task
 manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
 it is running out of space. I do use some joins with othrr tables but those
 are few megabytes.

 So i was assuming that somehow all parallel executions were storing
 data in /tmp and were filling it.

 So i would like to know wht could be filling space.

 Thanks

 On 19 Feb 2018 10:10 am, "Fabian Hueske"  wrote:

 Hi,

 this works as follows.

 - Table API and SQL queries are translated into regular DataSet jobs
 (assuming you are running in a batch ExecutionEnvironment).
 - A query is translated into a sequence of DataSet operators when you
 1) transform the Table into a DataSet or 2) write it to a TableSink. In
 both cases, the optimizer is invoked and recursively goes back from the
 converted/emitted Table back to its roots, i.e., a TableSource or a
 DataSet.

 This means, that if you create a Table from a TableSource and apply
 multiple filters on it and write each filter to a TableSink, the CSV file
 will be read 10 times, filtered 10 times and written 10 times. This is not
 efficient, because, you could also just read the file once and apply all
 filters in parallel.
 You can do this by converting the Table that you read with a
 TableSource into a DataSet and register the DataSet again as a Table. In
 that case, the translations of all TableSinks will stop at the DataSet and
 not include the TableSource which reads the file.

 The following figures illustrate the difference:

 1) Without DataSet in the middle:

 TableSource -> Filter1 -> TableSink1
 TableSource -> Filter2 -> TableSink2
 TableSource -> Filter3 -> TableSink3

 2) With DataSet in the middle:

 /-> Filter1 -> TableSink1
 TableSource -<-> Filter2 -> TableSink2
 \-> Filter3 -> TableSink3

 I'll likely add a feature to internally translate an intermediate Table
 to make this a bit easier.
 The underlying problem is that the SQL optimizer cannot translate
 queries with multiple sinks.
 Instead, each sink is individually translated and the optimizer does
 not know that common execution paths could be shared.

 Best,
 Fabian


 2018-02-19 2:19 GMT+01:00 Darshan Singh :

> Thanks for reply.
>
> I guess I am not looking for alternate. I am trying to understand what
> flink does in this scenario and if 10 tasks ar 

Re: Stopping a kafka consumer gracefully (no losing of inflight events, StoppableFunction)

2018-02-20 Thread Christophe Jolif
Hmm, I did not realize that.

I was planning when upgrading a job (consuming from Kafka) to cancel it
with a savepoint and then start it back from the savedpoint. But this
savedpoint thing was giving me the apparently false feeling I would not
lose anything? My understanding was that maybe I would process some events
twice in this case but certainly not miss events entirely.

Did I misunderstand this thread?

If not this sounds like pretty annoying? Do people have some sort of
workaround for that?

Thanks,
--
Christophe



On Mon, Feb 19, 2018 at 5:50 PM, Till Rohrmann  wrote:

> Hi Bart,
>
> you're right that Flink currently does not support a graceful stop
> mechanism for the Kafka source. The community has already a good idea how
> to solve it in the general case and will hopefully soon add it to Flink.
>
> Concerning the StoppableFunction: This interface was introduced quite some
> time ago and currently only works for some batch sources. In order to make
> it work with streaming, we need to add some more functionality to the
> engine in order to properly stop and take a savepoint.
>
> Cheers,
> Till
>
> On Mon, Feb 19, 2018 at 3:36 PM, Bart Kastermans 
> wrote:
>
>> In https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> ops/cli.html it is shown that
>> for gracefully stopping a job you need to implement the StoppableFunction
>> interface.  This
>> appears not (yet) implemented for Kafka consumers.  Am I missing
>> something, or is there a
>> different way to gracefully stop a job using a kafka source so we can
>> restart it later without
>> losing any (in flight) events?
>>
>> - bart
>>
>
>


Re: Stopping a kafka consumer gracefully (no losing of inflight events, StoppableFunction)

2018-02-20 Thread Bart Kastermans
Thanks for the reply; is there a flip for this?

- bart

On Mon, Feb 19, 2018, at 5:50 PM, Till Rohrmann wrote:
> Hi Bart,
> 
> you're right that Flink currently does not support a graceful stop
> mechanism for the Kafka source. The community has already a good
> idea how to solve it in the general case and will hopefully soon add
> it to Flink.> 
> Concerning the StoppableFunction: This interface was introduced quite
> some time ago and currently only works for some batch sources. In
> order to make it work with streaming, we need to add some more
> functionality to the engine in order to properly stop and take a
> savepoint.> 
> Cheers,
> Till
> 
> On Mon, Feb 19, 2018 at 3:36 PM, Bart Kastermans
>  wrote:>> In
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html
>> it is shown that>>  for gracefully stopping a job you need to implement the
>>  StoppableFunction interface.  This>>  appears not (yet) implemented for 
>> Kafka consumers.  Am I missing
>>  something, or is there a>>  different way to gracefully stop a job using a 
>> kafka source so we
>>  can restart it later without>>  losing any (in flight) events?
>> 
>>  - bart



Re: Regarding BucketingSink

2018-02-20 Thread Vishal Santoshi
Sorry, but just wanted to confirm that  the assertion "at-least-once"
delivery  true if there is a dangling pending file ?

On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi 
wrote:

> That is fine, till flink assure at-least-once semantics ?
>
> If the contents of a .pending file, through the turbulence ( restarts etc
> )  are assured to be in another file than anything starting with "_"
> underscore will by default ignored by hadoop ( hive or MR etc ).
>
>
>
> On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> Sorry for the confusion. The framework (Flink) does currently not do any
>> cleanup of pending files, yes.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 19. Feb 2018, at 17:01, Vishal Santoshi 
>> wrote:
>>
>> >> You should only have these dangling pending files after a
>> failure-recovery cycle, as you noticed. My suggestion would be to
>> periodically clean up older pending files.
>>
>> A little confused. Is that what the framework should do, or us as part of
>> some cleanup job ?
>>
>>
>>
>>
>>
>> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> The BucketingSink does not clean up pending files on purpose. In a
>>> distributed setting, and especially with rescaling of Flink operators, it
>>> is sufficiently hard to figure out which of the pending files you actually
>>> can delete and which of them you have to leave because they will get moved
>>> to "final" as part of recovering from a checkpoint on some other parallel
>>> instance of the sink.
>>>
>>> You should only have these dangling pending files after a
>>> failure-recovery cycle, as you noticed. My suggestion would be to
>>> periodically clean up older pending files.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 19. Feb 2018, at 16:37, Till Rohrmann  wrote:
>>>
>>> Hi Vishal,
>>>
>>> what pending files should indeed get eventually finalized. This happens
>>> on a checkpoint complete notification. Thus, what you report seems not
>>> right. Maybe Aljoscha can shed a bit more light into the problem.
>>>
>>> In order to further debug the problem, it would be really helpful to get
>>> access to DEBUG log files of a TM which runs the BucketingSink.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong  wrote:
>>>
 Hi Vishal,

 I have the same concern about save pointing in BucketingSink.
 As for your question, I think before the pending files get cleared in
 handleRestoredBucketState .
 They are finalized in notifyCheckpointComplete

 https://github.com/apache/flink/blob/master/flink-connectors
 /flink-connector-filesystem/src/main/java/org/apache/flink/s
 treaming/connectors/fs/bucketing/BucketingSink.java#L628

 I'm looking into this part of the source code now, since we are
 experiencing some unclosed files after check pointing.
 It would be great if you can share more if you find something new about
 your problem, which might help with our problem.

 Best regards,
 Mu

 On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> -rw-r--r--   3 root hadoop 11 2018-02-14 18:48
> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
> -rw-r--r--   3 root hadoop 11 2018-02-14 21:17
> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>
>
> This is strange, we had a few retries b'coz of an OOM on one of the
> TMs and we see this situation. 2 files ( on either sides )  that were 
> dealt
> with fine but a dangling .pending file. I am sure this is not what is 
> meant
> to be.   We I think have an edge condition and looking at the code it is
> not obvious. May be some one who wrote the code can shed some light as to
> how can this happen.
>
>
>
> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> without --allowNonRestoredState, on a suspend/resume we do see the
>> length file along with the finalized file ( finalized during resume )
>>
>> -rw-r--r--   3 root hadoop 10 2018-02-09 13:57
>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>
>> that does makes much more sense.
>>
>> I guess we should document --allowNonRestoredState better ? It seems
>> it actually drops state ?
>>
>>
>>
>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> This is 1.4 BTW.  I am not sure that I am reading this correctly but
>>> the lifecycle of cancel/resume is 2 steps
>>>
>>>
>>>
>>> 1. Cancel job with SP

Re: Get which key groups are assigned to an operator

2018-02-20 Thread Gerard Garcia
You are right that probably the best solution would be to be able to use
different state backends for different operators, I hope it gets
implemented at some point. Meanwhile I'll take a look at the methods in
org.apache.flink.runtime.state.KeyGroupRangeAssignment, maybe I can find a
workaround good enough for me.

Thanks,

Gerard

On Tue, Feb 20, 2018 at 3:56 PM, Stefan Richter  wrote:

> Hi,
>
> ok, now I understand your goal a bit better. If would still like to point
> out that it may take a bit more than it looks like. Just to name one
> example, you probably also want to support asynchronous snapshots which is
> most likely difficult when using just a hashmap. I think the proper
> solution for you (and also something that we are considering to support in
> the future) is that different backends could be supported for different
> operators in a job. But that is currently not possible. I still want to
> answer your other question: you could currently compute all things about
> key-groups and their assignment to operators by using the methods
> from org.apache.flink.runtime.state.KeyGroupRangeAssignment.
>
> Best,
> Stefan
>
>
> Am 20.02.2018 um 14:52 schrieb Gerard Garcia :
>
> Hi Stefan, thanks
>
> Yes, we are also using keyed state in other operators the problem is that
> serialization is quite expensive and in some of them we would prefer to
> avoid it by storing the state in memory (for our use case one specific
> operator with in memory state gives at least a 30% throughput improvement).
> When we are not operating in a keyed stream is easy, basically all the
> operators have the same in memory state, what we would like to do is the
> same but when we are operating in a keyed stream. Does it make more sense
> now?
>
> We are using rocksdb as state backend and as far as I know elements get
> always serialized when stored in the state and I'm not sure if there is
> even some disk access (maybe not synchronously) that could hurt performance.
>
> Gerard
>
> On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> from what I read, I get the impression that you attempt to implement you
>> own "keyed state" with a hashmap? Why not using the keyed state that is
>> already provided by Flink and gives you efficient rescaling etc. out of the
>> box? Please see [1] for the details.
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/stream/state/state.html#using-managed-keyed-state
>>
>> Am 20.02.2018 um 13:44 schrieb gerardg :
>>
>> Hello,
>>
>> To improve performance we have " keyed state" in the operator's memory,
>> basically we keep a Map which contains the state per each of the keys. The
>> problem comes when we want to restore the state after a failure or after
>> rescaling the operator. What we are doing is sending the concatenation of
>> all the state to every operator using an union redistribution and then we
>> restore the "in memory state" every time we see a new key. Then, after a
>> while, we just clear the redistributed state. This is somewhat complex and
>> prone to errors so we would like to find an alternative way of doing this.
>>
>> As far as I know Flink knows which keys belong to each operator
>> (distributing key groups) so I guess it would be possible to somehow
>> calculate the key id from each of the stored keys and restore the in
>> memory
>> state at once if we could access to the key groups mapping. Is that
>> possible? We could patch Flink if necessary to access that information.
>>
>> Thanks,
>>
>> Gerard
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>>
>>
>
>


Secured communication with Zookeeper without Kerberos

2018-02-20 Thread Edward Rojas
Hi,

I'm setting up a Flink cluster on kubernetes with High Availability using
Zookeeper. It's working well so far without the security configuration for
zookeeper.

I need to have secured communication between Flink and zookeeper but I'd
like to avoid the need to setup a Kerberos server only for this.

Is there a way to configure secured communication between Flink and
Zookeeper without using kerberos ? With SSL for example ?

Thanks in advance!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Retrieving name of last external checkpoint directory

2018-02-20 Thread Aljoscha Krettek
Hi,

I think there is currently no easy way of doing this. Things that come to mind 
are:
 - looking at the JM log
 - polling the JM REST interface for completed externalised checkpoints

The good news is that Flink 1.5 will rework how externalised checkpoints work a 
bit: basically, all checkpoints can now be considered externalised and the 
metadata will be stored in the root directory of the checkpoint, not in one 
global directory for all jobs. This way, the metadata for externalised 
checkpoints resides in the checkpoint directory of each job and it should be 
reasonably simple to restore from that.

Best,
Aljoscha

> On 15. Feb 2018, at 10:55, Dawid Wysakowicz  
> wrote:
> 
> Hi,
> 
> We are running few jobs on yarn and in case of some failure (that the job 
> could not recover from on its own) we want to use last successful external 
> checkpoint to restore the job from manually. The problem is that the
> ${state.checkpoints.dir} contains checkpoint directories for all jobs that we 
> are running. How can we find out the last successful external checkpoint for 
> some particular job? Will be grateful for any pointers.
> 
> Regards,
> Dawid



Re: Correlation between number of operators and Job manager memory requirements

2018-02-20 Thread Shailesh Jain
Hi Till,

Thanks for your reply.

>> My suggestion would be to split the different patterns up and run them
with in different jobs.

I'm not able to understand how splitting up the jobs based on patterns
would be more efficient than based on the key. The total number of
operators would still be the same, right?

>> But splitting the input stream will generate many concurrent operators
which all run the same CEP operator.

Are you suggesting using the split transformation here? I also see a
similar thread [1] where you had suggested using split. When I generate a
watermark on SplitStream, will it be assigned only on that 'partition' of
the split stream? If so, will applying the CEP operator on the SplitStream
behave in the same way (i.e. like a KeyedCEPOperator) and NOT create
separate NFA instances for each partition (selection)?

>> CEP operators should be chainable if I'm not mistaken

I am not able to find any documentation on how can I explicitly chain 2 CEP
operators which are applied to the same data stream (not one after
another). It would be really helpful if you can point me to it.

Thanks,
Shailesh


[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Different-watermarks-on-keyed-stream-td14751.html
On Tue, Feb 20, 2018 at 4:46 PM, Till Rohrmann  wrote:

> Hi Shailesh,
>
> I fear that given your job topology, it is not that surprising that things
> break. The problem is that you might have M x N CEP operators concurrently
> active. This means that they have to keep their state in memory. Given 3.5
> GB isn't that much if you have more than 300 CEP NFAs running. This is
> roughly 10 MB per NFA. Depending on your the time window, the size of
> records and the stream throughput, this should be easily reachable.
>
> My suggestion would be to split the different patterns up and run them
> with in different jobs. Then you should also give more resources to the TM.
> And ideally you don't do the filter operation on the stream, because this
> increases the number of CEP operators quite a bit and thus also the memory
> footprint.
>
> Concerning your questions:
> 1. CEP operators should be chainable, if I'm not mistaken
> 2. Per-key watermarks are indeed not supported in Flink. But splitting the
> input stream will generate many concurrent operators which all run the same
> CEP operator. Best would be to generate watermarks which work for all keys.
> 3. I think your assumption should be correct. I think monitoring the JM
> process via VisualVM should be quite good to see the memory requirements.
>
> Cheers,
> Till
>
> On Tue, Feb 20, 2018 at 11:23 AM, Shailesh Jain <
> shailesh.j...@stellapps.com> wrote:
>
>> Hi Till,
>>
>> When I'm submitting one big job, both JM and TM (sometimes just JM) are
>> crashing at the time of initialization itself (i.e. not all operators
>> switch to RUNNING) with OOM. The number of threads on TM go to almost 1000.
>>
>> But when I'm submitting multiple jobs, job submission is completed. But
>> when data starts coming in (its a live stream), the task managers memory
>> usage grows and eventually it crashes.
>>
>> The patterns I'm trying to match are simple (A followed by B, A followed
>> by B within X mins etc.), but the number of patterns is large (due to the
>> reason mentioned in my question 2 below).
>>
>> Configuration: 1 JM and 1 TM
>>
>> jobmanager.heap.mb: 512
>> taskmanager.heap.mb: 3596
>> taskmanager.numberOfTaskSlots: 5
>> parallelism.default: 1
>> jobmanager.rpc.port: 6123
>> state.backend: filesystem
>> taskmanager.debug.memory.startLogThread: true
>> taskmanager.debug.memory.logIntervalMs: 12
>> akka.ask.timeout: 2 min
>> akka.client.timeout: 5 min
>> akka.framesize: 404857600b
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 3
>> restart-strategy.fixed-delay.delay: 10 s
>>
>> I'm submitting 5 jobs, and each job has ~80 operators.
>>
>> With the above configuration, the job submission is successful, but the
>> TM's eventually max out their heap usage.
>>
>> But, as mentioned earlier, when I change the number of slots to 1 and
>> submit 1 job with 300+ operators, the job submission fails with OOM.
>>
>> 3 questions here:
>>
>> 1. Is it possible to chain multiple CEP operators into a single task? So
>> that the number of threads is reduced. The reason here is that when I'm
>> submitting one big job, the OOM always occurs when JVM is trying to create
>> a new thread.
>>
>> 2. Instead of using a KeyedStream, I'm creating multiple streams per key
>> (using a filter operator) and then applying all N patterns to that stream.
>> So essentially it is resulting in M (number of patterns) x N (number of
>> keys) CEP operators/tasks. The reason behind creating this is that I need
>> to have different watermarks per key (a key represents a physical source,
>> and the source time could be different, resulting in events getting
>> dropped), and I believe generating watermarks per key is not supported yet.

Re: Get which key groups are assigned to an operator

2018-02-20 Thread Stefan Richter
Hi,

ok, now I understand your goal a bit better. If would still like to point out 
that it may take a bit more than it looks like. Just to name one example, you 
probably also want to support asynchronous snapshots which is most likely 
difficult when using just a hashmap. I think the proper solution for you (and 
also something that we are considering to support in the future) is that 
different backends could be supported for different operators in a job. But 
that is currently not possible. I still want to answer your other question: you 
could currently compute all things about key-groups and their assignment to 
operators by using the methods from 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.

Best,
Stefan

> Am 20.02.2018 um 14:52 schrieb Gerard Garcia :
> 
> Hi Stefan, thanks 
> 
> Yes, we are also using keyed state in other operators the problem is that 
> serialization is quite expensive and in some of them we would prefer to avoid 
> it by storing the state in memory (for our use case one specific operator 
> with in memory state gives at least a 30% throughput improvement). When we 
> are not operating in a keyed stream is easy, basically all the operators have 
> the same in memory state, what we would like to do is the same but when we 
> are operating in a keyed stream. Does it make more sense now?
> 
> We are using rocksdb as state backend and as far as I know elements get 
> always serialized when stored in the state and I'm not sure if there is even 
> some disk access (maybe not synchronously) that could hurt performance.
> 
> Gerard
> 
> On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter  > wrote:
> Hi,
> 
> from what I read, I get the impression that you attempt to implement you own 
> "keyed state" with a hashmap? Why not using the keyed state that is already 
> provided by Flink and gives you efficient rescaling etc. out of the box? 
> Please see [1] for the details.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state
>  
> 
> 
>> Am 20.02.2018 um 13:44 schrieb gerardg > >:
>> 
>> Hello,
>> 
>> To improve performance we have " keyed state" in the operator's memory,
>> basically we keep a Map which contains the state per each of the keys. The
>> problem comes when we want to restore the state after a failure or after
>> rescaling the operator. What we are doing is sending the concatenation of
>> all the state to every operator using an union redistribution and then we
>> restore the "in memory state" every time we see a new key. Then, after a
>> while, we just clear the redistributed state. This is somewhat complex and
>> prone to errors so we would like to find an alternative way of doing this.
>> 
>> As far as I know Flink knows which keys belong to each operator
>> (distributing key groups) so I guess it would be possible to somehow
>> calculate the key id from each of the stored keys and restore the in memory
>> state at once if we could access to the key groups mapping. Is that
>> possible? We could patch Flink if necessary to access that information. 
>> 
>> Thanks, 
>> 
>> Gerard
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>> 
> 
> 



Save the date: ApacheCon North America, September 24-27 in Montréal

2018-02-20 Thread Rich Bowen

Dear Apache Enthusiast,

(You’re receiving this message because you’re subscribed to a user@ or 
dev@ list of one or more Apache Software Foundation projects.)


We’re pleased to announce the upcoming ApacheCon [1] in Montréal, 
September 24-27. This event is all about you — the Apache project community.


We’ll have four tracks of technical content this time, as well as lots 
of opportunities to connect with your project community, hack on the 
code, and learn about other related (and unrelated!) projects across the 
foundation.


The Call For Papers (CFP) [2] and registration are now open. Register 
early to take advantage of the early bird prices and secure your place 
at the event hotel.


Important dates
March 30: CFP closes
April 20: CFP notifications sent
	August 24: Hotel room block closes (please do not wait until the last 
minute)


Follow @ApacheCon on Twitter to be the first to hear announcements about 
keynotes, the schedule, evening events, and everything you can expect to 
see at the event.


See you in Montréal!

Sincerely, Rich Bowen, V.P. Events,
on behalf of the entire ApacheCon team

[1] http://www.apachecon.com/acna18
[2] https://cfp.apachecon.com/conference.html?apachecon-north-america-2018


Re: Get which key groups are assigned to an operator

2018-02-20 Thread Gerard Garcia
Hi Stefan, thanks

Yes, we are also using keyed state in other operators the problem is that
serialization is quite expensive and in some of them we would prefer to
avoid it by storing the state in memory (for our use case one specific
operator with in memory state gives at least a 30% throughput improvement).
When we are not operating in a keyed stream is easy, basically all the
operators have the same in memory state, what we would like to do is the
same but when we are operating in a keyed stream. Does it make more sense
now?

We are using rocksdb as state backend and as far as I know elements get
always serialized when stored in the state and I'm not sure if there is
even some disk access (maybe not synchronously) that could hurt performance.

Gerard

On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter  wrote:

> Hi,
>
> from what I read, I get the impression that you attempt to implement you
> own "keyed state" with a hashmap? Why not using the keyed state that is
> already provided by Flink and gives you efficient rescaling etc. out of the
> box? Please see [1] for the details.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/stream/state/state.html#using-managed-keyed-state
>
> Am 20.02.2018 um 13:44 schrieb gerardg :
>
> Hello,
>
> To improve performance we have " keyed state" in the operator's memory,
> basically we keep a Map which contains the state per each of the keys. The
> problem comes when we want to restore the state after a failure or after
> rescaling the operator. What we are doing is sending the concatenation of
> all the state to every operator using an union redistribution and then we
> restore the "in memory state" every time we see a new key. Then, after a
> while, we just clear the redistributed state. This is somewhat complex and
> prone to errors so we would like to find an alternative way of doing this.
>
> As far as I know Flink knows which keys belong to each operator
> (distributing key groups) so I guess it would be possible to somehow
> calculate the key id from each of the stored keys and restore the in memory
> state at once if we could access to the key groups mapping. Is that
> possible? We could patch Flink if necessary to access that information.
>
> Thanks,
>
> Gerard
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>
>


Re: Get which key groups are assigned to an operator

2018-02-20 Thread Stefan Richter
Hi,

from what I read, I get the impression that you attempt to implement you own 
"keyed state" with a hashmap? Why not using the keyed state that is already 
provided by Flink and gives you efficient rescaling etc. out of the box? Please 
see [1] for the details.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state
 


> Am 20.02.2018 um 13:44 schrieb gerardg :
> 
> Hello,
> 
> To improve performance we have " keyed state" in the operator's memory,
> basically we keep a Map which contains the state per each of the keys. The
> problem comes when we want to restore the state after a failure or after
> rescaling the operator. What we are doing is sending the concatenation of
> all the state to every operator using an union redistribution and then we
> restore the "in memory state" every time we see a new key. Then, after a
> while, we just clear the redistributed state. This is somewhat complex and
> prone to errors so we would like to find an alternative way of doing this.
> 
> As far as I know Flink knows which keys belong to each operator
> (distributing key groups) so I guess it would be possible to somehow
> calculate the key id from each of the stored keys and restore the in memory
> state at once if we could access to the key groups mapping. Is that
> possible? We could patch Flink if necessary to access that information. 
> 
> Thanks, 
> 
> Gerard
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Flink Batch job: All slots for groupReduce task scheduled on same machine

2018-02-20 Thread Aljoscha Krettek
Hmm, that seems weird. Could you please also post the code of the complete 
program? Only the parts that build the program graph should be enough. And 
maybe a screenshot of the complete graph from the dashboard.

--
Aljoscha

> On 20. Feb 2018, at 11:36, Aneesha Kaushal  
> wrote:
> 
> 
> 2018-02-20, 14:31:58  2018-02-20, 14:52:2820m 30s Map (Map at 
> com.rfk.dataplatform.batch.jobs.topk.TopkOperations$$anonfun$4.apply(TopkOperations.scala:128))
>  10.8 GB 130,639,359 10.8 GB 130,639,359 16  
> 00016000
> FINISHED
> Start TimeEnd TimeDurationBytes received  Records 
> receivedBytes sent  Records sentAttempt HostStatus
> 2018-02-20, 14:43:05  2018-02-20, 14:52:289m 22s  693 MB  8,169,369   
> 693 MB  8,169,369   1   ip-10-17-10-20:46079FINISHED
> 2018-02-20, 14:31:58  2018-02-20, 14:32:3537s 692 MB  
> 8,164,898   692 MB  8,164,898   1   ip-10-17-10-20:46079
> FINISHED
> 2018-02-20, 14:45:52  2018-02-20, 14:52:256m 32s  692 MB  8,160,648   
> 692 MB  8,160,648   1   ip-10-17-10-20:46079FINISHED
> 2018-02-20, 14:32:53  2018-02-20, 14:33:3036s 692 MB  
> 8,164,117   692 MB  8,164,117   1   ip-10-17-11-156:53921   
> FINISHED
> 2018-02-20, 14:39:05  2018-02-20, 14:39:4337s 692 MB  
> 8,168,042   692 MB  8,168,042   1   ip-10-17-11-156:53921   
> FINISHED
> 2018-02-20, 14:42:12  2018-02-20, 14:46:574m 45s  692 MB  8,161,923   
> 692 MB  8,161,923   1   ip-10-17-11-156:53921   FINISHED
> 2018-02-20, 14:38:13  2018-02-20, 14:38:4734s 692 MB  
> 8,163,351   692 MB  8,163,351   1   ip-10-17-8-168:54366
> FINISHED
> 2018-02-20, 14:39:34  2018-02-20, 14:40:0833s 692 MB  
> 8,163,694   692 MB  8,163,694   1   ip-10-17-8-168:54366
> FINISHED
> 2018-02-20, 14:32:09  2018-02-20, 14:32:4233s 692 MB  
> 8,165,675   692 MB  8,165,675   1   ip-10-17-8-168:54366
> FINISHED
> 2018-02-20, 14:41:34  2018-02-20, 14:46:525m 17s  692 MB  8,165,679   
> 692 MB  8,165,679   1   ip-10-17-8-193:33639FINISHED
> 2018-02-20, 14:44:03  2018-02-20, 14:47:103m 6s   692 MB  8,165,245   
> 692 MB  8,165,245   1   ip-10-17-8-193:33639FINISHED
> 2018-02-20, 14:41:20  2018-02-20, 14:41:5434s 692 MB  
> 8,168,041   692 MB  8,168,041   1   ip-10-17-8-193:33639
> FINISHED
> 2018-02-20, 14:40:55  2018-02-20, 14:41:3236s 692 MB  
> 8,167,142   692 MB  8,167,142   1   ip-10-17-9-52:36094 
> FINISHED
> 2018-02-20, 14:41:35  2018-02-20, 14:46:545m 18s  692 MB  8,161,355   
> 692 MB  8,161,355   1   ip-10-17-9-52:36094 FINISHED
> 2018-02-20, 14:40:08  2018-02-20, 14:40:5244s 692 MB  
> 8,166,737   692 MB  8,166,737   1   ip-10-17-9-52:36094 
> FINISHED
> 2018-02-20, 14:44:23  2018-02-20, 14:47:122m 48s  692 MB  8,163,443   
> 692 MB  8,163,443   1   ip-10-17-9-52:36094 FINISHED
> 
> 
> 2018-02-20, 14:31:58  2018-02-20, 14:59:1827m 19s GroupReduce 
> (topk.IntermsToTopkEntityOp.reduceGroup)10.8 GB 130,639,359 3.53 GB 
> 5,163,805   16  
> 00016000
> FINISHED
> Start TimeEnd TimeDurationBytes received  Records 
> receivedBytes sent  Records sentAttempt HostStatus
> 2018-02-20, 14:31:58  2018-02-20, 14:58:4926m 51s 684 MB  8,098,138   
> 226 MB  323,203 1   ip-10-17-10-20:46079FINISHED
> 2018-02-20, 14:31:58  2018-02-20, 14:59:0127m 3s  690 MB  8,210,429   
> 226 MB  322,178 1   ip-10-17-10-20:46079FINISHED
> 2018-02-20, 14:31:58  2018-02-20, 14:59:0627m 8s  714 MB  8,483,239   
> 226 MB  322,797 1   ip-10-17-10-20:46079FINISHED
> 2018-02-20, 14:31:58  2018-02-20, 14:58:5726m 58s 694 MB  8,176,076   
> 226 MB  322,600 1   ip-10-17-10-20:46079FINISHED
> 2018-02-20, 14:31:58  2018-02-20, 14:59:0227m 4s  680 MB  8,005,934   
> 226 MB  323,506 1   ip-10-17-10-20:46079FINISHED
> 2018-02-20, 14:31:58  2018-02-20, 14:59:1527m 16s 739 MB  8,708,468   
> 227 MB  323,087 1   ip-10-17-10-20:46079FINISHED
> 2018-02-20, 14:31:58  2018-02-20, 14:58:3926m 41s 682 MB  8,015,473   
> 225 MB  322,401 1   ip-10-17-10-20:46079FINISHED
> 2018-02-20, 14:31:58  2018-02-20, 14:58:5126m 53s 674 MB  7,994,360   
> 226 MB  323,354 1   ip-10-17-10-20:46079FINISHED
> 2018-02-20, 14:31:58  2018-02-20, 14:59:1827m 19s 715 MB  8,581,459   
> 226 MB  322,303 1   ip-10-17-10-20:46079FINISHED
> 2018-02-20, 14:31:58  2018-02-20, 14:58:4426m 45s 682 MB  7,912,704   
> 228 MB  322,915 1   ip-10-17-10-20:46079FINISHED
> 2018-02-20, 14:31:58  2018-02-20, 14:59:0727m 8s  706 MB 

Get which key groups are assigned to an operator

2018-02-20 Thread gerardg
Hello,

To improve performance we have " keyed state" in the operator's memory,
basically we keep a Map which contains the state per each of the keys. The
problem comes when we want to restore the state after a failure or after
rescaling the operator. What we are doing is sending the concatenation of
all the state to every operator using an union redistribution and then we
restore the "in memory state" every time we see a new key. Then, after a
while, we just clear the redistributed state. This is somewhat complex and
prone to errors so we would like to find an alternative way of doing this.

As far as I know Flink knows which keys belong to each operator
(distributing key groups) so I guess it would be possible to somehow
calculate the key id from each of the stored keys and restore the in memory
state at once if we could access to the key groups mapping. Is that
possible? We could patch Flink if necessary to access that information. 

Thanks, 

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Managed State Custom Serializer with Avro

2018-02-20 Thread Arvid Heise
Hi Aljoscha,

I opened https://issues.apache.org/jira/browse/FLINK-8715 for the
RocksDB issue with pointers to the code. Let me know if you need more
details.

Best,

Arvid

On Tue, Feb 20, 2018 at 1:04 PM, Arvid Heise  wrote:
> Hi Aljoscha, hi Till,
>
> @Aljoscha, the new AvroSerializer is almost what I wanted except that
> it does not use the schema of the snapshot while reading. In fact,
> this version will fail with the same error as before when a field is
> added or removed.
> https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L265
> needs to use the schema from
> https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L188
> as the first parameter. Accordingly, a readSchema field need to be set
> in #ensureCompatibility and relayed in #duplicate.
> Should I add a ticket for that as well?
>
> @Till concerning the poor man's migration. The doc of
> #ensureCompatibility in 1.3.2 states:
>
> {@link CompatibilityResult#compatible()}: this signals Flink that
> this serializer is compatible, or
> * has been reconfigured to be compatible, to continue reading
> previous data, and that the
> * serialization schema remains the same. No migration needs to be
> performed.
>
> The important part is the reconfiguration, which is also mentioned on
> the big documentation. The default avro and kryo serializers actually
> try to reconfigure themselves.
>
> @Aljoscha, I will open a ticket for the RocksDB thingy. I pinned the
> problem down and will try to come up with an easy solution. It's a tad
> hard to compare the different versions (since I'm deep into the
> debugger), so I just might write a 1.3.2 ticket.
>
> @Till, thanks for reminding me that we are not talking about
> incremental checkpoints ;) That makes it indeed much easier to
> understand the whole state recovery with evolution.
>
> Best,
>
> Arvid
>
> On Tue, Feb 20, 2018 at 12:27 PM, Aljoscha Krettek  
> wrote:
>> Hi Arvid,
>>
>> Did you check out the most recent AvroSerializer code?
>> https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185
>> I think this does what you're suggesting.
>>
>> Regarding the integration tests, if this is in fact the case it is not good
>> and I would be very happy about a Jira Issue/PR there.
>>
>> Regarding your last point, I think that the RockDB backend stores the
>> metadata, which includes the type serialiser snapshot once, and not for all
>> keys or key groups.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 20. Feb 2018, at 11:40, Arvid Heise  wrote:
>>
>> Hi guys,
>>
>> just wanted to write about that topic on my own.
>>
>> The FF talk of Tzu-Li gave me also the impression that by just using
>> AvroSerializer, we get some kind of state evolution for free.
>> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
>>
>> However, I discovered two issues on 1.3.2:
>>
>> 1. The AvroSerializer does not use read/write schema. The snapshot
>> stores type information instead of the more plausible schema
>> information.
>> However, the actual type should not matter as long as a compatible
>> type is used for state restoration.
>> I have rewritten the AvroSerializer to store the schema in the
>> snapshot config and actually uses it as a read schema during the
>> initialization of the DatumReader.
>>
>> 2. During integration tests, it turns out that the current
>> implementation of the StateDescriptor always returns copies of the
>> serializer through #getSerializer. So #ensureCompatibility is invoked
>> on a different serializer than the actual #deserialize method. So
>> although my AvroSerializer sets the correct read schema, it is not
>> used, since it is set on the wrong instance.
>> I propose to make sure that #ensureCompatibility is invoked on the
>> original serializer in the state descriptor. Otherwise all adjustments
>> to the serializer are lost.
>>
>> I can provide tests and patches if needed.
>>
>> One related question:
>>
>> If I do an incremental snapshot with RocksDB backend and keyed state
>> backend, is the snapshot config attached to all keys? So would the
>> following work:
>> * Write (key1, value1) and (key2, value2) with schema1. Do cancel with
>> snapshot.
>> * Read (key1, value1) with schema1->schema2 and write with (key1,
>> value1). Do cancel with snapshot.
>> 
>> * Read (key1, value1) with schema2 and read with (key2, value2) with
>> schema1->schema2.
>>
>> Thanks for any feedback
>>
>> Arvid
>>
>> On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen 
>> wrote:
>>
>> Hi Till,
>>
>> Thanks for the 

Re: Managed State Custom Serializer with Avro

2018-02-20 Thread Arvid Heise
Hi Aljoscha, hi Till,

@Aljoscha, the new AvroSerializer is almost what I wanted except that
it does not use the schema of the snapshot while reading. In fact,
this version will fail with the same error as before when a field is
added or removed.
https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L265
needs to use the schema from
https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L188
as the first parameter. Accordingly, a readSchema field need to be set
in #ensureCompatibility and relayed in #duplicate.
Should I add a ticket for that as well?

@Till concerning the poor man's migration. The doc of
#ensureCompatibility in 1.3.2 states:

{@link CompatibilityResult#compatible()}: this signals Flink that
this serializer is compatible, or
* has been reconfigured to be compatible, to continue reading
previous data, and that the
* serialization schema remains the same. No migration needs to be
performed.

The important part is the reconfiguration, which is also mentioned on
the big documentation. The default avro and kryo serializers actually
try to reconfigure themselves.

@Aljoscha, I will open a ticket for the RocksDB thingy. I pinned the
problem down and will try to come up with an easy solution. It's a tad
hard to compare the different versions (since I'm deep into the
debugger), so I just might write a 1.3.2 ticket.

@Till, thanks for reminding me that we are not talking about
incremental checkpoints ;) That makes it indeed much easier to
understand the whole state recovery with evolution.

Best,

Arvid

On Tue, Feb 20, 2018 at 12:27 PM, Aljoscha Krettek  wrote:
> Hi Arvid,
>
> Did you check out the most recent AvroSerializer code?
> https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185
> I think this does what you're suggesting.
>
> Regarding the integration tests, if this is in fact the case it is not good
> and I would be very happy about a Jira Issue/PR there.
>
> Regarding your last point, I think that the RockDB backend stores the
> metadata, which includes the type serialiser snapshot once, and not for all
> keys or key groups.
>
> Best,
> Aljoscha
>
>
> On 20. Feb 2018, at 11:40, Arvid Heise  wrote:
>
> Hi guys,
>
> just wanted to write about that topic on my own.
>
> The FF talk of Tzu-Li gave me also the impression that by just using
> AvroSerializer, we get some kind of state evolution for free.
> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
>
> However, I discovered two issues on 1.3.2:
>
> 1. The AvroSerializer does not use read/write schema. The snapshot
> stores type information instead of the more plausible schema
> information.
> However, the actual type should not matter as long as a compatible
> type is used for state restoration.
> I have rewritten the AvroSerializer to store the schema in the
> snapshot config and actually uses it as a read schema during the
> initialization of the DatumReader.
>
> 2. During integration tests, it turns out that the current
> implementation of the StateDescriptor always returns copies of the
> serializer through #getSerializer. So #ensureCompatibility is invoked
> on a different serializer than the actual #deserialize method. So
> although my AvroSerializer sets the correct read schema, it is not
> used, since it is set on the wrong instance.
> I propose to make sure that #ensureCompatibility is invoked on the
> original serializer in the state descriptor. Otherwise all adjustments
> to the serializer are lost.
>
> I can provide tests and patches if needed.
>
> One related question:
>
> If I do an incremental snapshot with RocksDB backend and keyed state
> backend, is the snapshot config attached to all keys? So would the
> following work:
> * Write (key1, value1) and (key2, value2) with schema1. Do cancel with
> snapshot.
> * Read (key1, value1) with schema1->schema2 and write with (key1,
> value1). Do cancel with snapshot.
> 
> * Read (key1, value1) with schema2 and read with (key2, value2) with
> schema1->schema2.
>
> Thanks for any feedback
>
> Arvid
>
> On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen 
> wrote:
>
> Hi Till,
>
> Thanks for the quick reply, I'm using 1.3.2 atm.
>
> Cheers,
> Niels
>
> On Feb 19, 2018 19:10, "Till Rohrmann"  wrote:
>
>
> Hi Niels,
>
> which version of Flink are you using? Currently, Flink does not support to
> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described,
> it will try to use the old serializer stored in the checkpoint stream to
> restore state.
>
> 

Re: Managed State Custom Serializer with Avro

2018-02-20 Thread Till Rohrmann
A small addition,

currently savepoints are always full checkpoints. Thus, you should not have
the problem when calling cancel with savepoint.

Concerning 2, I think the idea was to only check for compatibility at
restore time. The check will either say its compatible or not. If it's not
compatible, then the program will fail because Flink does not support
migration yet. Therefore, there should be no need to call the ensure
compatibility on the StateDescriptor serializer if I'm not mistaken.

Cheers,
Till

On Tue, Feb 20, 2018 at 12:27 PM, Aljoscha Krettek 
wrote:

> Hi Arvid,
>
> Did you check out the most recent AvroSerializer code? https://github.com/
> apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/
> flink-avro/src/main/java/org/apache/flink/formats/avro/
> typeutils/AvroSerializer.java#L185 I think this does what you're
> suggesting.
>
> Regarding the integration tests, if this is in fact the case it is not
> good and I would be very happy about a Jira Issue/PR there.
>
> Regarding your last point, I think that the RockDB backend stores the
> metadata, which includes the type serialiser snapshot once, and not for all
> keys or key groups.
>
> Best,
> Aljoscha
>
>
> On 20. Feb 2018, at 11:40, Arvid Heise  wrote:
>
> Hi guys,
>
> just wanted to write about that topic on my own.
>
> The FF talk of Tzu-Li gave me also the impression that by just using
> AvroSerializer, we get some kind of state evolution for free.
> https://www.slideshare.net/FlinkForward/flink-forward-
> berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
>
> However, I discovered two issues on 1.3.2:
>
> 1. The AvroSerializer does not use read/write schema. The snapshot
> stores type information instead of the more plausible schema
> information.
> However, the actual type should not matter as long as a compatible
> type is used for state restoration.
> I have rewritten the AvroSerializer to store the schema in the
> snapshot config and actually uses it as a read schema during the
> initialization of the DatumReader.
>
> 2. During integration tests, it turns out that the current
> implementation of the StateDescriptor always returns copies of the
> serializer through #getSerializer. So #ensureCompatibility is invoked
> on a different serializer than the actual #deserialize method. So
> although my AvroSerializer sets the correct read schema, it is not
> used, since it is set on the wrong instance.
> I propose to make sure that #ensureCompatibility is invoked on the
> original serializer in the state descriptor. Otherwise all adjustments
> to the serializer are lost.
>
> I can provide tests and patches if needed.
>
> One related question:
>
> If I do an incremental snapshot with RocksDB backend and keyed state
> backend, is the snapshot config attached to all keys? So would the
> following work:
> * Write (key1, value1) and (key2, value2) with schema1. Do cancel with
> snapshot.
> * Read (key1, value1) with schema1->schema2 and write with (key1,
> value1). Do cancel with snapshot.
> 
> * Read (key1, value1) with schema2 and read with (key2, value2) with
> schema1->schema2.
>
> Thanks for any feedback
>
> Arvid
>
> On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen 
> wrote:
>
> Hi Till,
>
> Thanks for the quick reply, I'm using 1.3.2 atm.
>
> Cheers,
> Niels
>
> On Feb 19, 2018 19:10, "Till Rohrmann"  wrote:
>
>
> Hi Niels,
>
> which version of Flink are you using? Currently, Flink does not support to
> upgrade the TypeSerializer itself, if I'm not mistaken. As you've
> described,
> it will try to use the old serializer stored in the checkpoint stream to
> restore state.
>
> I've pulled Gordon into the conversation who can tell you a little bit
> more about the current capability and limitations of state evolution.
>
> Cheers,
> Till
>
> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote:
>
>
> Hi all,
>
> I'm currently trying to use Avro in order to evolve our data present in
> Flink's Managed State. I've extended the TypeSerializer class
> successfully
> for this purpose, but still have issues using Schema Evolution.
>
> *The problem:*
> When we try to read data (deserialize from savepoint) with a new
> serialiser
> and a new schema, Flink seems to use the old schema of the old serializer
> (written to the savepoint). This results in an old GenericRecord that
> doesn't adhere to the new Avro schema.
>
> *What seems to happen to me is the following* (Say we evolve from dataV1
> to
> dataV2):
> - State containing dataV1 is serialized with avro schema V1 to a
> check/savepoint. Along with the data, the serializer itself is written.
> - Upon restore, the old serializer is retrieved from the data (therefore
> needs to be on the classpath). Data is restored using this old
> serializer.
> The new serializer provided is only used for writes.
>
> If this is indeed the case it explains our aforementioned problem. 

Re: Managed State Custom Serializer with Avro

2018-02-20 Thread Aljoscha Krettek
Hi Arvid,

Did you check out the most recent AvroSerializer code? 
https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185
 

 I think this does what you're suggesting.

Regarding the integration tests, if this is in fact the case it is not good and 
I would be very happy about a Jira Issue/PR there.

Regarding your last point, I think that the RockDB backend stores the metadata, 
which includes the type serialiser snapshot once, and not for all keys or key 
groups.

Best,
Aljoscha

> On 20. Feb 2018, at 11:40, Arvid Heise  wrote:
> 
> Hi guys,
> 
> just wanted to write about that topic on my own.
> 
> The FF talk of Tzu-Li gave me also the impression that by just using
> AvroSerializer, we get some kind of state evolution for free.
> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
> 
> However, I discovered two issues on 1.3.2:
> 
> 1. The AvroSerializer does not use read/write schema. The snapshot
> stores type information instead of the more plausible schema
> information.
> However, the actual type should not matter as long as a compatible
> type is used for state restoration.
> I have rewritten the AvroSerializer to store the schema in the
> snapshot config and actually uses it as a read schema during the
> initialization of the DatumReader.
> 
> 2. During integration tests, it turns out that the current
> implementation of the StateDescriptor always returns copies of the
> serializer through #getSerializer. So #ensureCompatibility is invoked
> on a different serializer than the actual #deserialize method. So
> although my AvroSerializer sets the correct read schema, it is not
> used, since it is set on the wrong instance.
> I propose to make sure that #ensureCompatibility is invoked on the
> original serializer in the state descriptor. Otherwise all adjustments
> to the serializer are lost.
> 
> I can provide tests and patches if needed.
> 
> One related question:
> 
> If I do an incremental snapshot with RocksDB backend and keyed state
> backend, is the snapshot config attached to all keys? So would the
> following work:
> * Write (key1, value1) and (key2, value2) with schema1. Do cancel with 
> snapshot.
> * Read (key1, value1) with schema1->schema2 and write with (key1,
> value1). Do cancel with snapshot.
> 
> * Read (key1, value1) with schema2 and read with (key2, value2) with
> schema1->schema2.
> 
> Thanks for any feedback
> 
> Arvid
> 
> On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen  
> wrote:
>> Hi Till,
>> 
>> Thanks for the quick reply, I'm using 1.3.2 atm.
>> 
>> Cheers,
>> Niels
>> 
>> On Feb 19, 2018 19:10, "Till Rohrmann"  wrote:
>>> 
>>> Hi Niels,
>>> 
>>> which version of Flink are you using? Currently, Flink does not support to
>>> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described,
>>> it will try to use the old serializer stored in the checkpoint stream to
>>> restore state.
>>> 
>>> I've pulled Gordon into the conversation who can tell you a little bit
>>> more about the current capability and limitations of state evolution.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote:
 
 Hi all,
 
 I'm currently trying to use Avro in order to evolve our data present in
 Flink's Managed State. I've extended the TypeSerializer class
 successfully
 for this purpose, but still have issues using Schema Evolution.
 
 *The problem:*
 When we try to read data (deserialize from savepoint) with a new
 serialiser
 and a new schema, Flink seems to use the old schema of the old serializer
 (written to the savepoint). This results in an old GenericRecord that
 doesn't adhere to the new Avro schema.
 
 *What seems to happen to me is the following* (Say we evolve from dataV1
 to
 dataV2):
 - State containing dataV1 is serialized with avro schema V1 to a
 check/savepoint. Along with the data, the serializer itself is written.
 - Upon restore, the old serializer is retrieved from the data (therefore
 needs to be on the classpath). Data is restored using this old
 serializer.
 The new serializer provided is only used for writes.
 
 If this is indeed the case it explains our aforementioned problem. If you
 have any pointers as to whether this is true and what a possible solution
 would be that would be very much appreciated!
 
 Thanks!
 Niels
 
 
 
 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>> 
>>> 
>>> 
>>> 

Re: Correlation between number of operators and Job manager memory requirements

2018-02-20 Thread Till Rohrmann
Hi Shailesh,

I fear that given your job topology, it is not that surprising that things
break. The problem is that you might have M x N CEP operators concurrently
active. This means that they have to keep their state in memory. Given 3.5
GB isn't that much if you have more than 300 CEP NFAs running. This is
roughly 10 MB per NFA. Depending on your the time window, the size of
records and the stream throughput, this should be easily reachable.

My suggestion would be to split the different patterns up and run them with
in different jobs. Then you should also give more resources to the TM. And
ideally you don't do the filter operation on the stream, because this
increases the number of CEP operators quite a bit and thus also the memory
footprint.

Concerning your questions:
1. CEP operators should be chainable, if I'm not mistaken
2. Per-key watermarks are indeed not supported in Flink. But splitting the
input stream will generate many concurrent operators which all run the same
CEP operator. Best would be to generate watermarks which work for all keys.
3. I think your assumption should be correct. I think monitoring the JM
process via VisualVM should be quite good to see the memory requirements.

Cheers,
Till

On Tue, Feb 20, 2018 at 11:23 AM, Shailesh Jain  wrote:

> Hi Till,
>
> When I'm submitting one big job, both JM and TM (sometimes just JM) are
> crashing at the time of initialization itself (i.e. not all operators
> switch to RUNNING) with OOM. The number of threads on TM go to almost 1000.
>
> But when I'm submitting multiple jobs, job submission is completed. But
> when data starts coming in (its a live stream), the task managers memory
> usage grows and eventually it crashes.
>
> The patterns I'm trying to match are simple (A followed by B, A followed
> by B within X mins etc.), but the number of patterns is large (due to the
> reason mentioned in my question 2 below).
>
> Configuration: 1 JM and 1 TM
>
> jobmanager.heap.mb: 512
> taskmanager.heap.mb: 3596
> taskmanager.numberOfTaskSlots: 5
> parallelism.default: 1
> jobmanager.rpc.port: 6123
> state.backend: filesystem
> taskmanager.debug.memory.startLogThread: true
> taskmanager.debug.memory.logIntervalMs: 12
> akka.ask.timeout: 2 min
> akka.client.timeout: 5 min
> akka.framesize: 404857600b
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 3
> restart-strategy.fixed-delay.delay: 10 s
>
> I'm submitting 5 jobs, and each job has ~80 operators.
>
> With the above configuration, the job submission is successful, but the
> TM's eventually max out their heap usage.
>
> But, as mentioned earlier, when I change the number of slots to 1 and
> submit 1 job with 300+ operators, the job submission fails with OOM.
>
> 3 questions here:
>
> 1. Is it possible to chain multiple CEP operators into a single task? So
> that the number of threads is reduced. The reason here is that when I'm
> submitting one big job, the OOM always occurs when JVM is trying to create
> a new thread.
>
> 2. Instead of using a KeyedStream, I'm creating multiple streams per key
> (using a filter operator) and then applying all N patterns to that stream.
> So essentially it is resulting in M (number of patterns) x N (number of
> keys) CEP operators/tasks. The reason behind creating this is that I need
> to have different watermarks per key (a key represents a physical source,
> and the source time could be different, resulting in events getting
> dropped), and I believe generating watermarks per key is not supported yet.
> Is this understanding correct? Do you have any ideas/recommendations to
> address this use case?
>
> 3. How can we benchmark the resources required by JM? Is it OK to assume
> that the amount of memory required by JM grows linearly with the total
> number of operators deployed?
>
> Thanks,
> Shailesh
>
>
> On Mon, Feb 19, 2018 at 10:18 PM, Till Rohrmann 
> wrote:
>
>> Hi Shailesh,
>>
>> my question would be where do you see the OOM happening? Does it happen
>> on the JM or the TM.
>>
>> The memory requirements for each operator strongly depend on the operator
>> and it is hard to give a general formula for that. It mostly depends on the
>> user function. Flink itself should not need too much extra memory for the
>> framework specific code.
>>
>> CEP, however, can easily add a couple of hundred megabytes to your memory
>> requirements. This depends strongly on the pattern you're matching and
>> which state backend you're using.
>>
>> Concerning your question one big job vs. multiple jobs, I could see that
>> this helps if not all jobs are executed at the same time. Especially if you
>> only have a single TM with a limited number of slots, I think that you
>> effectively queue up jobs. That should reduce the required amount of
>> resources for each individual job.
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 19, 2018 at 11:35 AM, Shailesh Jain <
>> shailesh.j...@stellapps.com> wrote:
>>
>>> 

Re: flink read hdfs file error

2018-02-20 Thread Aljoscha Krettek
Hi,

What is the exact problem you're seeing? Could you please post your POM file 
and also a listing of all the files in your user jar?

Best,
Aljoscha

> On 15. Feb 2018, at 11:12, Or Sher  wrote:
> 
> Hi,
> Did you ever get to solve this issue? I'm getting the same error.
> 
> On 1.3.2 I used to run the fat jar as a standalone without any job submission 
> and it worked just fine
> It looked like it used an embedded MiniCluster.
> 
> After just changing the dependencies to 1.4.0 we started getting this errors.
> 
> Does this execution type should also be supported in 1.4.0?
> Couldn't find anything in the docs about it.
> It always says to run the job using "flink run" which depends on an already 
> running cluster.
> 
> On Fri, Jan 26, 2018 at 12:59 PM Aljoscha Krettek  > wrote:
> Hi,
> 
> It seems you are using Akka in your user program and the reference.conf in 
> your Jar is clashing with the same File from Flink. What is the reason for 
> having Akka (or reference.conf) in your user jar?
> 
> Best,
> Aljoscha
> 
> 
>> On 22. Jan 2018, at 11:09, 韩宁宁 <453673...@qq.com > 
>> wrote:
>> 
>> Dear All
>>I have a question about  Flink
>> I want to read the files on HDFS by flink,but I encountered an error 
>> as follows,can you please advise the solution about this problem. It will be 
>> much appreciated.:
>> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>> SLF4J: Defaulting to no-operation (NOP) logger implementation
>> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder 
>>  for further details.
>> Exception in thread "main" 
>> com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ 
>> jar:file:/data/home/fbi/hanningning/flink-hdfs/target/flink-hdfs.jar!/reference.conf:
>>  804: Could not resolve substitution to a value: ${akka.stream.materializer}
>>  at 
>> com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
>>  at 
>> com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
>>  at 
>> com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
>>  at 
>> com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
>>  at 
>> com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
>>  at 
>> com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
>>  at 
>> com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
>>  at 
>> com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
>>  at 
>> com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
>>  at 
>> com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
>>  at 
>> com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
>>  at 
>> com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
>> .
>> 
>> 
>> my code===
>> public class App {
>> 
>> public static void main(String[] args) throws Exception {
>> 
>> final String inputPath = args[0]//hdfs file path;
>> final ExecutionEnvironment env = 
>> ExecutionEnvironment.getExecutionEnvironment();
>> 
>> HadoopInputFormat hadoopInputFormat =
>> new HadoopInputFormat(new 
>> TextInputFormat(),LongWritable.class,
>>  

Re: Flink Batch job: All slots for groupReduce task scheduled on same machine

2018-02-20 Thread Aneesha Kaushal

2018-02-20, 14:31:582018-02-20, 14:52:2820m 30s Map (Map at 
com.rfk.dataplatform.batch.jobs.topk.TopkOperations$$anonfun$4.apply(TopkOperations.scala:128))
 10.8 GB 130,639,359 10.8 GB 130,639,359 16  
00016000
FINISHED
Start Time  End TimeDurationBytes received  Records 
receivedBytes sent  Records sentAttempt HostStatus
2018-02-20, 14:43:052018-02-20, 14:52:289m 22s  693 MB  8,169,369   
693 MB  8,169,369   1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:32:3537s 692 MB  
8,164,898   692 MB  8,164,898   1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:45:522018-02-20, 14:52:256m 32s  692 MB  8,160,648   
692 MB  8,160,648   1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:32:532018-02-20, 14:33:3036s 692 MB  
8,164,117   692 MB  8,164,117   1   ip-10-17-11-156:53921   FINISHED
2018-02-20, 14:39:052018-02-20, 14:39:4337s 692 MB  
8,168,042   692 MB  8,168,042   1   ip-10-17-11-156:53921   FINISHED
2018-02-20, 14:42:122018-02-20, 14:46:574m 45s  692 MB  8,161,923   
692 MB  8,161,923   1   ip-10-17-11-156:53921   FINISHED
2018-02-20, 14:38:132018-02-20, 14:38:4734s 692 MB  
8,163,351   692 MB  8,163,351   1   ip-10-17-8-168:54366FINISHED
2018-02-20, 14:39:342018-02-20, 14:40:0833s 692 MB  
8,163,694   692 MB  8,163,694   1   ip-10-17-8-168:54366FINISHED
2018-02-20, 14:32:092018-02-20, 14:32:4233s 692 MB  
8,165,675   692 MB  8,165,675   1   ip-10-17-8-168:54366FINISHED
2018-02-20, 14:41:342018-02-20, 14:46:525m 17s  692 MB  8,165,679   
692 MB  8,165,679   1   ip-10-17-8-193:33639FINISHED
2018-02-20, 14:44:032018-02-20, 14:47:103m 6s   692 MB  8,165,245   
692 MB  8,165,245   1   ip-10-17-8-193:33639FINISHED
2018-02-20, 14:41:202018-02-20, 14:41:5434s 692 MB  
8,168,041   692 MB  8,168,041   1   ip-10-17-8-193:33639FINISHED
2018-02-20, 14:40:552018-02-20, 14:41:3236s 692 MB  
8,167,142   692 MB  8,167,142   1   ip-10-17-9-52:36094 FINISHED
2018-02-20, 14:41:352018-02-20, 14:46:545m 18s  692 MB  8,161,355   
692 MB  8,161,355   1   ip-10-17-9-52:36094 FINISHED
2018-02-20, 14:40:082018-02-20, 14:40:5244s 692 MB  
8,166,737   692 MB  8,166,737   1   ip-10-17-9-52:36094 FINISHED
2018-02-20, 14:44:232018-02-20, 14:47:122m 48s  692 MB  8,163,443   
692 MB  8,163,443   1   ip-10-17-9-52:36094 FINISHED


2018-02-20, 14:31:582018-02-20, 14:59:1827m 19s GroupReduce 
(topk.IntermsToTopkEntityOp.reduceGroup)10.8 GB 130,639,359 3.53 GB 
5,163,805   16  
00016000
FINISHED
Start Time  End TimeDurationBytes received  Records 
receivedBytes sent  Records sentAttempt HostStatus
2018-02-20, 14:31:582018-02-20, 14:58:4926m 51s 684 MB  8,098,138   
226 MB  323,203 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:59:0127m 3s  690 MB  8,210,429   
226 MB  322,178 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:59:0627m 8s  714 MB  8,483,239   
226 MB  322,797 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:58:5726m 58s 694 MB  8,176,076   
226 MB  322,600 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:59:0227m 4s  680 MB  8,005,934   
226 MB  323,506 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:59:1527m 16s 739 MB  8,708,468   
227 MB  323,087 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:58:3926m 41s 682 MB  8,015,473   
225 MB  322,401 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:58:5126m 53s 674 MB  7,994,360   
226 MB  323,354 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:59:1827m 19s 715 MB  8,581,459   
226 MB  322,303 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:58:4426m 45s 682 MB  7,912,704   
228 MB  322,915 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:59:0727m 8s  706 MB  8,288,227   
226 MB  322,480 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:58:5927m 1s  698 MB  8,152,011   
225 MB  322,836 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:58:0426m 5s  646 MB  7,598,798   
226 MB  322,270 1   ip-10-17-10-20:46079FINISHED
2018-02-20, 14:31:582018-02-20, 14:58:2226m 24s 656 MB  

Re: Managed State Custom Serializer with Avro

2018-02-20 Thread Arvid Heise
Hi guys,

just wanted to write about that topic on my own.

The FF talk of Tzu-Li gave me also the impression that by just using
AvroSerializer, we get some kind of state evolution for free.
https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink

However, I discovered two issues on 1.3.2:

1. The AvroSerializer does not use read/write schema. The snapshot
stores type information instead of the more plausible schema
information.
However, the actual type should not matter as long as a compatible
type is used for state restoration.
I have rewritten the AvroSerializer to store the schema in the
snapshot config and actually uses it as a read schema during the
initialization of the DatumReader.

2. During integration tests, it turns out that the current
implementation of the StateDescriptor always returns copies of the
serializer through #getSerializer. So #ensureCompatibility is invoked
on a different serializer than the actual #deserialize method. So
although my AvroSerializer sets the correct read schema, it is not
used, since it is set on the wrong instance.
I propose to make sure that #ensureCompatibility is invoked on the
original serializer in the state descriptor. Otherwise all adjustments
to the serializer are lost.

I can provide tests and patches if needed.

One related question:

If I do an incremental snapshot with RocksDB backend and keyed state
backend, is the snapshot config attached to all keys? So would the
following work:
* Write (key1, value1) and (key2, value2) with schema1. Do cancel with snapshot.
* Read (key1, value1) with schema1->schema2 and write with (key1,
value1). Do cancel with snapshot.

* Read (key1, value1) with schema2 and read with (key2, value2) with
schema1->schema2.

Thanks for any feedback

Arvid

On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen  wrote:
> Hi Till,
>
> Thanks for the quick reply, I'm using 1.3.2 atm.
>
> Cheers,
> Niels
>
> On Feb 19, 2018 19:10, "Till Rohrmann"  wrote:
>>
>> Hi Niels,
>>
>> which version of Flink are you using? Currently, Flink does not support to
>> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described,
>> it will try to use the old serializer stored in the checkpoint stream to
>> restore state.
>>
>> I've pulled Gordon into the conversation who can tell you a little bit
>> more about the current capability and limitations of state evolution.
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote:
>>>
>>> Hi all,
>>>
>>> I'm currently trying to use Avro in order to evolve our data present in
>>> Flink's Managed State. I've extended the TypeSerializer class
>>> successfully
>>> for this purpose, but still have issues using Schema Evolution.
>>>
>>> *The problem:*
>>> When we try to read data (deserialize from savepoint) with a new
>>> serialiser
>>> and a new schema, Flink seems to use the old schema of the old serializer
>>> (written to the savepoint). This results in an old GenericRecord that
>>> doesn't adhere to the new Avro schema.
>>>
>>> *What seems to happen to me is the following* (Say we evolve from dataV1
>>> to
>>> dataV2):
>>> - State containing dataV1 is serialized with avro schema V1 to a
>>> check/savepoint. Along with the data, the serializer itself is written.
>>> - Upon restore, the old serializer is retrieved from the data (therefore
>>> needs to be on the classpath). Data is restored using this old
>>> serializer.
>>> The new serializer provided is only used for writes.
>>>
>>> If this is indeed the case it explains our aforementioned problem. If you
>>> have any pointers as to whether this is true and what a possible solution
>>> would be that would be very much appreciated!
>>>
>>> Thanks!
>>> Niels
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>>
>> 
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html
>> To unsubscribe from Managed State Custom Serializer with Avro, click here.
>> NAML


Re: sink with BucketingSink to S3 files override

2018-02-20 Thread Aljoscha Krettek
Hi,

I'm afraid the BucketingSink does not work well with S3 because of the 
eventually-consistent nature of S3. As you noticed in the code snipped you 
sent, the sink relies on the fact that directory listings are accurate, which 
is not the case with S3.

The Flink community is aware of this problem and it's one of the top priorities 
for the next release after Flink 1.5.

Best,
Aljoscha

> On 19. Feb 2018, at 17:39, galantaa  wrote:
> 
> Hey,
> I have some kind of a concurrency problem with Bucketing sink when I write
> to S3.
> I use the AvroKeyValueSinkWriter.
> The problem is that when I send events the suppose to be written to the same
> directory, but to a different part file (because of different event types),
> the files override each other.
> The problem occurs only when I sink the files to S3. 
> When I write the files to the local storage it does not happen, but I think
> that only because there's this loop in openNewPartFile:
> 
> // The following loop tries different partCounter values in ascending order
> until it reaches the minimum
> // that is not yet used. This works since there is only one parallel subtask
> that tries names with this
> // subtask id. Otherwise we would run into concurrency issues here. This is
> aligned with the way we now
> // clean the base directory in case of rescaling.
> 
> /int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>  fs.exists(getPendingPathFor(partPath)) ||
>  fs.exists(getInProgressPathFor(partPath))) {
>   bucketState.partCounter++;
>   partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
> bucketState.partCounter);
>   }
> /
> that makes sense. But on S3 the files does not exist until checkpointing, so
> the loop won't find the files.
> 
> After debugging, I've noticed that in the invoke method, in
> state.getBucketState() the first time I try to write event to the bucket, it
> creates a new bucketState in the HashMap, but the second time I try to write
> to the same bucket (with the different event), it does find this new
> bucketState.
> 
> Thanks for the help! 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Correlation between number of operators and Job manager memory requirements

2018-02-20 Thread Shailesh Jain
Hi Till,

When I'm submitting one big job, both JM and TM (sometimes just JM) are
crashing at the time of initialization itself (i.e. not all operators
switch to RUNNING) with OOM. The number of threads on TM go to almost 1000.

But when I'm submitting multiple jobs, job submission is completed. But
when data starts coming in (its a live stream), the task managers memory
usage grows and eventually it crashes.

The patterns I'm trying to match are simple (A followed by B, A followed by
B within X mins etc.), but the number of patterns is large (due to the
reason mentioned in my question 2 below).

Configuration: 1 JM and 1 TM

jobmanager.heap.mb: 512
taskmanager.heap.mb: 3596
taskmanager.numberOfTaskSlots: 5
parallelism.default: 1
jobmanager.rpc.port: 6123
state.backend: filesystem
taskmanager.debug.memory.startLogThread: true
taskmanager.debug.memory.logIntervalMs: 12
akka.ask.timeout: 2 min
akka.client.timeout: 5 min
akka.framesize: 404857600b
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

I'm submitting 5 jobs, and each job has ~80 operators.

With the above configuration, the job submission is successful, but the
TM's eventually max out their heap usage.

But, as mentioned earlier, when I change the number of slots to 1 and
submit 1 job with 300+ operators, the job submission fails with OOM.

3 questions here:

1. Is it possible to chain multiple CEP operators into a single task? So
that the number of threads is reduced. The reason here is that when I'm
submitting one big job, the OOM always occurs when JVM is trying to create
a new thread.

2. Instead of using a KeyedStream, I'm creating multiple streams per key
(using a filter operator) and then applying all N patterns to that stream.
So essentially it is resulting in M (number of patterns) x N (number of
keys) CEP operators/tasks. The reason behind creating this is that I need
to have different watermarks per key (a key represents a physical source,
and the source time could be different, resulting in events getting
dropped), and I believe generating watermarks per key is not supported yet.
Is this understanding correct? Do you have any ideas/recommendations to
address this use case?

3. How can we benchmark the resources required by JM? Is it OK to assume
that the amount of memory required by JM grows linearly with the total
number of operators deployed?

Thanks,
Shailesh


On Mon, Feb 19, 2018 at 10:18 PM, Till Rohrmann 
wrote:

> Hi Shailesh,
>
> my question would be where do you see the OOM happening? Does it happen on
> the JM or the TM.
>
> The memory requirements for each operator strongly depend on the operator
> and it is hard to give a general formula for that. It mostly depends on the
> user function. Flink itself should not need too much extra memory for the
> framework specific code.
>
> CEP, however, can easily add a couple of hundred megabytes to your memory
> requirements. This depends strongly on the pattern you're matching and
> which state backend you're using.
>
> Concerning your question one big job vs. multiple jobs, I could see that
> this helps if not all jobs are executed at the same time. Especially if you
> only have a single TM with a limited number of slots, I think that you
> effectively queue up jobs. That should reduce the required amount of
> resources for each individual job.
>
> Cheers,
> Till
>
> On Mon, Feb 19, 2018 at 11:35 AM, Shailesh Jain <
> shailesh.j...@stellapps.com> wrote:
>
>> Actually, there are too many hyperparameters to experiment with, that is
>> why I'm trying to understand if there is any particular way in which a
>> cluster could be benchmarked.
>>
>> Another strange behaviour I am observing is: Delaying the operator
>> creation (by distributing the operators across jobs, and submitting
>> multiple jobs to the same cluster instead of one) is helping in creating
>> more operators. Any ideas on why that is happening?
>>
>> Shailesh
>>
>>
>> On Sun, Feb 18, 2018 at 11:16 PM, Pawel Bartoszek <
>> pawelbartosze...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> You could definitely try to find formula for heap size, but isnt's it
>>> easier just to try out different memory settings and see which works best
>>> for you?
>>>
>>> Thanks,
>>> Pawel
>>>
>>> 17 lut 2018 12:26 "Shailesh Jain" 
>>> napisał(a):
>>>
>>> Oops, hit send by mistake.
>>>
>>> In the configuration section, it is mentioned that for "many operators"
>>> heap size should be increased.
>>>
>>> "JVM heap size (in megabytes) for the JobManager. You may have to
>>> increase the heap size for the JobManager if you are running very large
>>> applications (with many operators), or if you are keeping a long history of
>>> them."
>>>
>>> Is there any recommendation on the heap space required when there are
>>> around 200 CEP operators, and close 80 Filter operators?
>>>
>>> Any other leads on calculating the expected heap space 

Re: CoProcess() VS union.Process() & Timers in them

2018-02-20 Thread m@xi
OK man! Thanks a lot.

To tell you the truth the documentation did not explain it in a convincing
way to consider it an important/potential operator to use in my
applications.

Thanks for mentioning.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: CoProcess() VS union.Process() & Timers in them

2018-02-20 Thread Fabian Hueske
I don't think that the mapping is that sophisticated.
I'd assume it is a bit simpler and just keeps one local pipeline (the one
with the same subtask index) which will run in the same slot (unless
explicitly configured differently).

TBH, I would not rely on this behavior. rescale() is rather an artifact of
the first version of the DataStream API.

Best, Fabian

2018-02-20 11:00 GMT+01:00 m@xi :

> Hey Fabian!
>
> Thanks for the comprehensive replies. Now I understand those concepts
> properly.
>
> Regarding .rescale() , it does not receive any arguments. Thus, I assume
> that the way it does the shuffling from operator A to operator B instances
> is a black box for the programmer and probably has to do with the number of
> slots in each taskmanager. It strives to favour local data exchange (aka
> *intra-exchange* : between slot of the same taskmanager) instead of
> *inter-exchange* of data between different taskmanagers (that burdens the
> network).
>
> Am I correct?
>
> Thanks in advance.
>
> Best,
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Flink Batch job: All slots for groupReduce task scheduled on same machine

2018-02-20 Thread Aljoscha Krettek
Could you please send a screenshot?

> On 20. Feb 2018, at 11:09, Aneesha Kaushal  
> wrote:
> 
> Hello Aljoscha
> 
> I looked into the Subtasks session on Flink Dashboard, for the about two 
> tasks.
> 
> Thanks
> Aneesha
> 
>> On 20-Feb-2018, at 3:32 PM, Aljoscha Krettek > > wrote:
>> 
>> Hi,
>> 
>> Could you please also post where/how you see which tasks are mapped to which 
>> slots/TaskManagers?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 20. Feb 2018, at 10:50, Aneesha Kaushal >> > wrote:
>>> 
>>> Hello, 
>>> 
>>> I have a fink batch job, where I am grouping dataset on some keys, and then 
>>> using group reduce. Parallelism is set to 16. 
>>> The slots for the Map task is distributed across all the machines, but for 
>>> GroupReduce all the slots are being assigned to the same machine. Can you 
>>> help me understand why/when this can happen? 
>>> Code looks something like: 
>>> dataset.map(MapFunction())
>>>   .groupBy()
>>>   .sortGroup(, Order.DESCENDING)
>>>   .reduceGroup(GroupReduceFunction()).name("Group reduce")
>>> From flink dashboard: 
>>> 
>>> 
>>> 
>>> 
>>> Thanks in advance
>>> Aneesha
>>> 
>>> 
>>> 
>>> 
>> 
> 



Re: Manipulating Processing elements of Network Buffers

2018-02-20 Thread m@xi
Hi Till!

Thanks a lot for your useful reply. 

So now I get it. I should not manipulate or disturb the network buffer
contents, as this will trigger other problematic behaviours. On the other
hand, the price of buffering the data in my operator first and e.g. sorting
them first based on some criterion, and then processing them...what is the
its impact to the efficiency/effectiveness of a streaming algorithm.

I mean, Flink is "pure" streaming, but not-so-pure due to the network
buffers, so if I use another buffering at site in each operator, this will
make my application slower and also this is not streaming, this becomes
batch.

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Batch job: All slots for groupReduce task scheduled on same machine

2018-02-20 Thread Aneesha Kaushal
Hello Aljoscha

I looked into the Subtasks session on Flink Dashboard, for the about two tasks.

Thanks
Aneesha

> On 20-Feb-2018, at 3:32 PM, Aljoscha Krettek  wrote:
> 
> Hi,
> 
> Could you please also post where/how you see which tasks are mapped to which 
> slots/TaskManagers?
> 
> Best,
> Aljoscha
> 
>> On 20. Feb 2018, at 10:50, Aneesha Kaushal > > wrote:
>> 
>> Hello, 
>> 
>> I have a fink batch job, where I am grouping dataset on some keys, and then 
>> using group reduce. Parallelism is set to 16. 
>> The slots for the Map task is distributed across all the machines, but for 
>> GroupReduce all the slots are being assigned to the same machine. Can you 
>> help me understand why/when this can happen? 
>> Code looks something like: 
>> dataset.map(MapFunction())
>>   .groupBy()
>>   .sortGroup(, Order.DESCENDING)
>>   .reduceGroup(GroupReduceFunction()).name("Group reduce")
>> From flink dashboard: 
>> 
>> 
>> 
>> 
>> Thanks in advance
>> Aneesha
>> 
>> 
>> 
>> 
> 



Re: Flink Batch job: All slots for groupReduce task scheduled on same machine

2018-02-20 Thread Aljoscha Krettek
Hi,

Could you please also post where/how you see which tasks are mapped to which 
slots/TaskManagers?

Best,
Aljoscha

> On 20. Feb 2018, at 10:50, Aneesha Kaushal  
> wrote:
> 
> Hello, 
> 
> I have a fink batch job, where I am grouping dataset on some keys, and then 
> using group reduce. Parallelism is set to 16. 
> The slots for the Map task is distributed across all the machines, but for 
> GroupReduce all the slots are being assigned to the same machine. Can you 
> help me understand why/when this can happen? 
> Code looks something like: 
> dataset.map(MapFunction())
>   .groupBy()
>   .sortGroup(, Order.DESCENDING)
>   .reduceGroup(GroupReduceFunction()).name("Group reduce")
> From flink dashboard: 
> 
> 
> 
> 
> Thanks in advance
> Aneesha
> 
> 
> 
> 



Re: CoProcess() VS union.Process() & Timers in them

2018-02-20 Thread m@xi
Hey Fabian!

Thanks for the comprehensive replies. Now I understand those concepts
properly.

Regarding .rescale() , it does not receive any arguments. Thus, I assume
that the way it does the shuffling from operator A to operator B instances
is a black box for the programmer and probably has to do with the number of
slots in each taskmanager. It strives to favour local data exchange (aka
*intra-exchange* : between slot of the same taskmanager) instead of
*inter-exchange* of data between different taskmanagers (that burdens the
network).

Am I correct?

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: # of active session windows of a streaming job

2018-02-20 Thread Fabian Hueske
Hi Dongwon Kim,

That's an interesting question.

I don't have a solution blueprint for you, but a few ideas that should help
to solve the problem.

I would start with a separate job first and later try to integrate it with
the other job.
You could implement a Trigger that fires when a new window is created and
when the window is closed. A ProcessWindowFunction would emit a +1 if the
window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case
of merging windows, i.e., two opened windows can be merged and only one
(the merged) window is closed. So would need to emit a -2 if a merged
window was closes (assuming only two windows were merged).
In order to do that, you'd need to carry the merging information forward.
The Trigger.onMerge method cannot trigger the window function, but it could
store the merging information in state that is later accessed.

Hope this helps,
Fabian

2018-02-20 9:54 GMT+01:00 Dongwon Kim :

> Hi,
>
> It could be a totally stupid question but I currently have no idea how to
> get the number of active session windows from a running job.
>
> Our traffic trajectory application (which handles up to 10,000 tps) uses
> event-time session window on KeyedStream (keyed by userID).
>
> Should I write another Flink job for the purpose?
>
> Cheers,
>
> Dongwon Kim


Flink Batch job: All slots for groupReduce task scheduled on same machine

2018-02-20 Thread Aneesha Kaushal
Hello, 

I have a fink batch job, where I am grouping dataset on some keys, and then 
using group reduce. Parallelism is set to 16. 
The slots for the Map task is distributed across all the machines, but for 
GroupReduce all the slots are being assigned to the same machine. Can you help 
me understand why/when this can happen? 
Code looks something like: 
dataset.map(MapFunction())
  .groupBy()
  .sortGroup(, Order.DESCENDING)
  .reduceGroup(GroupReduceFunction()).name("Group reduce")
From flink dashboard: 




Thanks in advance
Aneesha






Re: Iterating over state entries

2018-02-20 Thread Fabian Hueske
Hi Ken,

That's correct. The iterator will become invalid once you leave the method.
If you are only interested in a few specific entries than index access is
probably the most efficient approach.

Best, Fabian

2018-02-20 1:03 GMT+01:00 Ken Krugler :

> Hi Till,
>
> On Feb 19, 2018, at 8:14 AM, Till Rohrmann  wrote:
>
> Hi Ken,
>
> just for my clarification, the `RocksDBMapState#entries` method does not
> satisfy your requirements? This method does not allow you to iterate across
> different keys of your keyed stream of course. But it should allow you to
> iterate over the different entries for a given key of your keyed stream.
>
>
> As per my email to Fabian, I should have been more precise in my
> requirements.
>
> I need to do incremental iteration of the entries, versus a complete
> iteration.
>
> And I'm assuming I can't keep the iterator around across calls to the
> function.
>
> Regards,
>
> — Ken
>
>
> On Mon, Feb 19, 2018 at 12:10 AM, Ken Krugler  > wrote:
>
>> Hi there,
>>
>> I’ve got a MapState where I need to iterate over the entries.
>>
>> This currently isn’t supported (at least for Rocks DB), AFAIK, though
>> there is an issue/PR  to
>> improve this.
>>
>> The best solution I’ve seen is what Fabian proposed, which involves
>> keeping a ValueState with a count of entries, and then having the key for
>> the MapState be the index.
>>
>> I cannot comment on the internal design, but you could put the data into a
>> RocksDBStateBackend MapState where the value X is your data
>> type and the key is the list index. You would need another ValueState for
>> the current number of elements that you put into the MapState.
>> A MapState allows to fetch and traverse the key, value, or entry set of
>> the
>> Map without loading it completely into memory.
>> The sets are traversed in sort order of the key, so should be in insertion
>> order (given that you properly increment the list index).
>>
>>
>> This effectively lets you iterate over all of the map entries for a given
>> (keyed) state - though it doesn’t solve the “I have to iterate over _every_
>> entry” situation.
>>
>> Is this currently the best option?
>>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>


Re: Need to understand the execution model of the Flink

2018-02-20 Thread Fabian Hueske
No, there is no size or cardinality estimation happening at the moment.

Best, Fabian

2018-02-19 21:56 GMT+01:00 Darshan Singh :

> Thanks , is there a metric or other way to know how much space each
> task/job is taking? Does execution plan has these details?
>
> Thanks
>
> On Mon, Feb 19, 2018 at 10:54 AM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> that's a difficult question without knowing the details of your job.
>> A NoSpaceLeftOnDevice error occurs when a file system is full.
>>
>> This can happen if:
>> - A Flink algorithm writes to disk, e.g., an external sort or the hash
>> table of a hybrid hash join. This can happen for GroupBy, Join, Distinct,
>> or any other operation that requires to group or join data. Filters will
>> never spill to disk.
>> - An OutputFormat writes to disk.
>>
>> The data is written to a temp directory, that can be configured in the
>> ./conf/flink-conf.yaml file.
>>
>> Did you check how the tasks are distributed across the task managers?
>> The web UI can help to diagnose such problems.
>>
>> Best, Fabian
>>
>> 2018-02-19 11:22 GMT+01:00 Darshan Singh :
>>
>>> Thanks Fabian for such detailed explanation.
>>>
>>> I am using a datset in between so i guess csv is read once. Now to my
>>> real issue i have 6 task managers each having 4 cores and i have 2 slots
>>> per task manager.
>>>
>>> Now my csv file is jus 1 gb and i create table and transform to dataset
>>> and then run 15 different filters and extra processing which all run in
>>> almost parallel.
>>>
>>> However it fails with error no space left on device on one of the task
>>> manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
>>> it is running out of space. I do use some joins with othrr tables but those
>>> are few megabytes.
>>>
>>> So i was assuming that somehow all parallel executions were storing data
>>> in /tmp and were filling it.
>>>
>>> So i would like to know wht could be filling space.
>>>
>>> Thanks
>>>
>>> On 19 Feb 2018 10:10 am, "Fabian Hueske"  wrote:
>>>
>>> Hi,
>>>
>>> this works as follows.
>>>
>>> - Table API and SQL queries are translated into regular DataSet jobs
>>> (assuming you are running in a batch ExecutionEnvironment).
>>> - A query is translated into a sequence of DataSet operators when you 1)
>>> transform the Table into a DataSet or 2) write it to a TableSink. In both
>>> cases, the optimizer is invoked and recursively goes back from the
>>> converted/emitted Table back to its roots, i.e., a TableSource or a
>>> DataSet.
>>>
>>> This means, that if you create a Table from a TableSource and apply
>>> multiple filters on it and write each filter to a TableSink, the CSV file
>>> will be read 10 times, filtered 10 times and written 10 times. This is not
>>> efficient, because, you could also just read the file once and apply all
>>> filters in parallel.
>>> You can do this by converting the Table that you read with a TableSource
>>> into a DataSet and register the DataSet again as a Table. In that case, the
>>> translations of all TableSinks will stop at the DataSet and not include the
>>> TableSource which reads the file.
>>>
>>> The following figures illustrate the difference:
>>>
>>> 1) Without DataSet in the middle:
>>>
>>> TableSource -> Filter1 -> TableSink1
>>> TableSource -> Filter2 -> TableSink2
>>> TableSource -> Filter3 -> TableSink3
>>>
>>> 2) With DataSet in the middle:
>>>
>>> /-> Filter1 -> TableSink1
>>> TableSource -<-> Filter2 -> TableSink2
>>> \-> Filter3 -> TableSink3
>>>
>>> I'll likely add a feature to internally translate an intermediate Table
>>> to make this a bit easier.
>>> The underlying problem is that the SQL optimizer cannot translate
>>> queries with multiple sinks.
>>> Instead, each sink is individually translated and the optimizer does not
>>> know that common execution paths could be shared.
>>>
>>> Best,
>>> Fabian
>>>
>>>
>>> 2018-02-19 2:19 GMT+01:00 Darshan Singh :
>>>
 Thanks for reply.

 I guess I am not looking for alternate. I am trying to understand what
 flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
 they will be reading csv as there is no other way.

 Thanks

 On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman 
 wrote:

>
> Do you really need the large single table created in step 2?
>
> If not, what you typically do is that the Csv source first do the
> common transformations. Then depending on whether the 10 outputs have
> different processing paths or not, you either do a split() to do 
> individual
> processing depending on some criteria, or you just have the sink put each
> record in separate tables.
> You have full control, at each step along the transformation path
> whether it can be parallelized or not, and if there are no 

# of active session windows of a streaming job

2018-02-20 Thread Dongwon Kim
Hi,

It could be a totally stupid question but I currently have no idea how to get 
the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses 
event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose? 

Cheers,

Dongwon Kim

Re: A "per operator instance" window all ?

2018-02-20 Thread Julien

Hi Xingcan, Ken and Till,

OK, thank you. It is clear.

I have various option then:

 * the one suggested by Ken where I can find a way to build a key that
   will be well distributed (1 key per task)
 o it relies on the way Flink partitions the key, but it will do
   the job
 * or I can also go with another way to build my key where I will have
   more keys than the parallelism, so the distribution will be better
 o I will still have few number of requests (much less than the
   number of resource ids as 1 key will be for multiple resource ids)
 o I will potentially do multiple requests on the same task, but it
   may be acceptable, especially if I go with AsyncIO
 * or I can go with the OperatorState and implements my own firing logic
 o I am in a case where the memory-based mechanism should be fine


Thanks again,
Regards.


Julien.


On 20/02/2018 02:48, Xingcan Cui wrote:

Hi Julien,

you could use the OperatorState 
 to 
cache the data in a window and the last time your window fired. Then 
you check the ctx.timerService().currentProcessingTime() in 
processElement() and once it exceeds the next window boundary, all the 
cached data should be processed as if the window is fired.


Note that currently, there are only memory-based operator states provided.

Hope this helps,
Xingcan

On 19 Feb 2018, at 4:34 PM, Julien > wrote:


Hello,

I've already tried to key my stream with 
"resourceId.hashCode%parallelism" (with parallelism of 4 in my example).
So all my keys will be either 0,1, 2 or 3. I can then benefit from a 
time window on this keyed stream and do only 4 queries to my external 
system.
But it is not well distributed with the default partitioner on keyed 
stream. (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).


I think I should explore the customer partitioner, as you suggested 
Xingcan.
Maybe my last question on this will be: "can you give me more details 
on this point "and simulate a window operation by yourself in a 
ProcessFunction" ?


When I look at the documentation about the custom partitioner, I can 
see that the result of partitionCustom is a DataStream.

It is not a KeyedStream.
So the only window I have will be windowAll (which will bring me back 
to a parallelism of 1, no ?).


And if I do something like "myStream.partitionCustom(partitioner>,).keyBy().window(...)", will it preserve 
my custom partitioner ?
When looking at the "KeyedStream" class, it seems that it will go 
back to the "KeyGroupStreamPartitioner" and forget my custom 
partitioner ?


Thanks again for your feedback,

Julien.


On 19/02/2018 03:45, 周思华 wrote:

Hi Julien,
    If I am not misunderstand, I think you can key your stream on a 
`Random.nextInt() % parallesm`, this way  you can "group" together 
alerts from different and benefit from multi parallems.



发自网易邮箱大师

On 02/19/2018 09:08,Xingcan Cui> wrote:

Hi Julien,

sorry for my misunderstanding before. For now, the window can only 
be defined on a KeyedStream or an ordinary DataStream but with 
parallelism = 1. I’d like to provide three options for your scenario.


1. If your external data is static and can be fit into the memory, 
you can use ManagedStates to cache them without considering the 
querying problem.
2. Or you can use a CustomPartitioner to manually distribute your 
alert data and simulate an window operation by yourself in a 
ProcessFuncton.
3. You may also choose to use some external systems such as 
in-memory store, which can work as a cache for your queries.


Best,
Xingcan

On 19 Feb 2018, at 5:55 AM, Julien > wrote:


Hi Xingcan,

Thanks for your answer.
Yes, I understand that point:
• if I have 100 resource IDs with parallelism of 4, then each 
operator instance will handle about 25 keys



The issue I have is that I want, on a given operator instance, to 
group those 25 keys together in order to do only 1 query to an 
external system per operator instance:


• on a given operator instance, I will do 1 query for my 25 keys
• so with the 4 operator instances, I will do 4 query in parallel 
(with about 25 keys per query)


I do not know how I can do that.

If I define a window on my keyed stream (with for 
example stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))), then 
my understanding is that the window is "associated" to the key. So 
in this case, on a given operator instance, I will have 25 of those 
windows (one per key), and I will do 25 queries (instead of 1).


Do you understand my point ?
Or maybe am I missing something ?

I'd like to find a way on operator instance 1 to group all the 
alerts received on those 25 resource ids and do 1 query for those 
25 resource ids.

Same thing for