Re: A "per operator instance" window all ?

2018-02-19 Thread Xingcan Cui
Hi Julien,

you could use the OperatorState 

 to cache the data in a window and the last time your window fired. Then you 
check the ctx.timerService().currentProcessingTime() in processElement() and 
once it exceeds the next window boundary, all the cached data should be 
processed as if the window is fired.

Note that currently, there are only memory-based operator states provided.

Hope this helps,
Xingcan

> On 19 Feb 2018, at 4:34 PM, Julien  wrote:
> 
> Hello,
> 
> I've already tried to key my stream with "resourceId.hashCode%parallelism" 
> (with parallelism of 4 in my example).
> So all my keys will be either 0,1, 2 or 3. I can then benefit from a time 
> window on this keyed stream and do only 4 queries to my external system.
> But it is not well distributed with the default partitioner on keyed stream. 
> (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).
> 
> I think I should explore the customer partitioner, as you suggested Xingcan.
> Maybe my last question on this will be: "can you give me more details on this 
> point "and simulate a window operation by yourself in a ProcessFunction" ?
> 
> When I look at the documentation about the custom partitioner, I can see that 
> the result of partitionCustom is a DataStream.
> It is not a KeyedStream.
> So the only window I have will be windowAll (which will bring me back to a 
> parallelism of 1, no ?).
> 
> And if I do something like "myStream.partitionCustom(, key>).keyBy().window(...)", will it preserve my custom partitioner ?
> When looking at the "KeyedStream" class, it seems that it will go back to the 
> "KeyGroupStreamPartitioner" and forget my custom partitioner ?
> 
> Thanks again for your feedback,
> 
> Julien.
> 
> 
> On 19/02/2018 03:45, 周思华 wrote:
>> Hi Julien,
>> If I am not misunderstand, I think you can key your stream on a 
>> `Random.nextInt() % parallesm`, this way  you can "group" together alerts 
>> from different and benefit from multi parallems.
>> 
>> 
>> 发自网易邮箱大师
>> 
>> On 02/19/2018 09:08,Xingcan Cui wrote: 
>> Hi Julien,
>> 
>> sorry for my misunderstanding before. For now, the window can only be 
>> defined on a KeyedStream or an ordinary DataStream but with parallelism = 1. 
>> I’d like to provide three options for your scenario.
>> 
>> 1. If your external data is static and can be fit into the memory, you can 
>> use ManagedStates to cache them without considering the querying problem.
>> 2. Or you can use a CustomPartitioner to manually distribute your alert data 
>> and simulate an window operation by yourself in a ProcessFuncton.
>> 3. You may also choose to use some external systems such as in-memory store, 
>> which can work as a cache for your queries.
>> 
>> Best,
>> Xingcan
>> 
>>> On 19 Feb 2018, at 5:55 AM, Julien  wrote:
>>> 
>>> Hi Xingcan,
>>> 
>>> Thanks for your answer.
>>> Yes, I understand that point:
>>> • if I have 100 resource IDs with parallelism of 4, then each operator 
>>> instance will handle about 25 keys
>>> 
>>> 
>>> The issue I have is that I want, on a given operator instance, to group 
>>> those 25 keys together in order to do only 1 query to an external system 
>>> per operator instance:
>>> 
>>> • on a given operator instance, I will do 1 query for my 25 keys
>>> • so with the 4 operator instances, I will do 4 query in parallel (with 
>>> about 25 keys per query)
>>> 
>>> I do not know how I can do that.
>>> 
>>> If I define a window on my keyed stream (with for example 
>>> stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
>>>  then my understanding is that the window is "associated" to the key. So in 
>>> this case, on a given operator instance, I will have 25 of those windows 
>>> (one per key), and I will do 25 queries (instead of 1).
>>> 
>>> Do you understand my point ?
>>> Or maybe am I missing something ?
>>> 
>>> I'd like to find a way on operator instance 1 to group all the alerts 
>>> received on those 25 resource ids and do 1 query for those 25 resource ids.
>>> Same thing for operator instance 2, 3 and 4.
>>> 
>>> 
>>> Thank you,
>>> Regards.
>>> 
>>> 
>>> On 18/02/2018 14:43, Xingcan Cui wrote:
 Hi Julien,
 
 the cardinality of your keys (e.g., resource ID) will not be restricted to 
 the parallelism. For instance, if you have 100 resource IDs processed by 
 KeyedStream with parallelism 4, each operator instance will handle about 
 25 keys. 
 
 Hope that helps.
 
 Best,
 Xingcan
 
> On 18 Feb 2018, at 8:49 PM, Julien  wrote:
> 
> Hi,
> 
> I am pretty new to flink and I don't know what will be the best way to 
> deal with the following use case:
> 
>   • as an input, I recieve some alerts from a kafka topic
>   • an alert is linked to a network resource (like router-1, 
> router-2, switch-1, switch-2, ...)
>>>

Re: Iterating over state entries

2018-02-19 Thread Ken Krugler
Hi Till,

> On Feb 19, 2018, at 8:14 AM, Till Rohrmann  wrote:
> 
> Hi Ken,
> 
> just for my clarification, the `RocksDBMapState#entries` method does not 
> satisfy your requirements? This method does not allow you to iterate across 
> different keys of your keyed stream of course. But it should allow you to 
> iterate over the different entries for a given key of your keyed stream.

As per my email to Fabian, I should have been more precise in my requirements.

I need to do incremental iteration of the entries, versus a complete iteration.

And I'm assuming I can't keep the iterator around across calls to the function.

Regards,

— Ken


> On Mon, Feb 19, 2018 at 12:10 AM, Ken Krugler  > wrote:
> Hi there,
> 
> I’ve got a MapState where I need to iterate over the entries.
> 
> This currently isn’t supported (at least for Rocks DB), AFAIK, though there 
> is an issue/PR  to improve 
> this.
> 
> The best solution I’ve seen is what Fabian proposed, which involves keeping a 
> ValueState with a count of entries, and then having the key for the MapState 
> be the index.
> 
>> I cannot comment on the internal design, but you could put the data into a
>> RocksDBStateBackend MapState where the value X is your data
>> type and the key is the list index. You would need another ValueState for
>> the current number of elements that you put into the MapState.
>> A MapState allows to fetch and traverse the key, value, or entry set of the
>> Map without loading it completely into memory.
>> The sets are traversed in sort order of the key, so should be in insertion
>> order (given that you properly increment the list index).
> 
> 
> This effectively lets you iterate over all of the map entries for a given 
> (keyed) state - though it doesn’t solve the “I have to iterate over _every_ 
> entry” situation.
> 
> Is this currently the best option?

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Iterating over state entries

2018-02-19 Thread Ken Krugler
Hi Fabian,

> I'd like to clarify what I said before.
> 
> By using MapState mainly gain two things:
> - position access by index
> - the full list does not need to be deserialized to read values (which is how 
> ListState works).
> 
> Point access should obviously done by get(index). 
> However, iterating over the list should be done by iterating over the entry 
> (or value) set. The entry set iterator will prefetch multiple entries and 
> only deserialize the key / values when you access them. This reduces the 
> number of RocksDB look-ups.

Sorry, I should have been more precise in my description below. I have to do 
incremental iteration (e.g. process the next 10 entries).

I’m assuming I can’t hold onto the iterator across calls to a function, right?

If so, then making get(index) calls via the technique described below is 
currently the most efficient approach, yes?

Thanks,

— Ken


> 2018-02-19 0:10 GMT+01:00 Ken Krugler  >:
> Hi there,
> 
> I’ve got a MapState where I need to iterate over the entries.
> 
> This currently isn’t supported (at least for Rocks DB), AFAIK, though there 
> is an issue/PR  to improve 
> this.
> 
> The best solution I’ve seen is what Fabian proposed, which involves keeping a 
> ValueState with a count of entries, and then having the key for the MapState 
> be the index.
> 
>> I cannot comment on the internal design, but you could put the data into a
>> RocksDBStateBackend MapState where the value X is your data
>> type and the key is the list index. You would need another ValueState for
>> the current number of elements that you put into the MapState.
>> A MapState allows to fetch and traverse the key, value, or entry set of the
>> Map without loading it completely into memory.
>> The sets are traversed in sort order of the key, so should be in insertion
>> order (given that you properly increment the list index).
> 
> 
> This effectively lets you iterate over all of the map entries for a given 
> (keyed) state - though it doesn’t solve the “I have to iterate over _every_ 
> entry” situation.
> 
> Is this currently the best option?
> 
> Thanks,
> 
> — Ken
> 
> 
> http://about.me/kkrugler 
> +1 530-210-6378 
> 


http://about.me/kkrugler
+1 530-210-6378



Re: Need to understand the execution model of the Flink

2018-02-19 Thread Darshan Singh
Thanks , is there a metric or other way to know how much space each
task/job is taking? Does execution plan has these details?

Thanks

On Mon, Feb 19, 2018 at 10:54 AM, Fabian Hueske  wrote:

> Hi,
>
> that's a difficult question without knowing the details of your job.
> A NoSpaceLeftOnDevice error occurs when a file system is full.
>
> This can happen if:
> - A Flink algorithm writes to disk, e.g., an external sort or the hash
> table of a hybrid hash join. This can happen for GroupBy, Join, Distinct,
> or any other operation that requires to group or join data. Filters will
> never spill to disk.
> - An OutputFormat writes to disk.
>
> The data is written to a temp directory, that can be configured in the
> ./conf/flink-conf.yaml file.
>
> Did you check how the tasks are distributed across the task managers?
> The web UI can help to diagnose such problems.
>
> Best, Fabian
>
> 2018-02-19 11:22 GMT+01:00 Darshan Singh :
>
>> Thanks Fabian for such detailed explanation.
>>
>> I am using a datset in between so i guess csv is read once. Now to my
>> real issue i have 6 task managers each having 4 cores and i have 2 slots
>> per task manager.
>>
>> Now my csv file is jus 1 gb and i create table and transform to dataset
>> and then run 15 different filters and extra processing which all run in
>> almost parallel.
>>
>> However it fails with error no space left on device on one of the task
>> manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
>> it is running out of space. I do use some joins with othrr tables but those
>> are few megabytes.
>>
>> So i was assuming that somehow all parallel executions were storing data
>> in /tmp and were filling it.
>>
>> So i would like to know wht could be filling space.
>>
>> Thanks
>>
>> On 19 Feb 2018 10:10 am, "Fabian Hueske"  wrote:
>>
>> Hi,
>>
>> this works as follows.
>>
>> - Table API and SQL queries are translated into regular DataSet jobs
>> (assuming you are running in a batch ExecutionEnvironment).
>> - A query is translated into a sequence of DataSet operators when you 1)
>> transform the Table into a DataSet or 2) write it to a TableSink. In both
>> cases, the optimizer is invoked and recursively goes back from the
>> converted/emitted Table back to its roots, i.e., a TableSource or a
>> DataSet.
>>
>> This means, that if you create a Table from a TableSource and apply
>> multiple filters on it and write each filter to a TableSink, the CSV file
>> will be read 10 times, filtered 10 times and written 10 times. This is not
>> efficient, because, you could also just read the file once and apply all
>> filters in parallel.
>> You can do this by converting the Table that you read with a TableSource
>> into a DataSet and register the DataSet again as a Table. In that case, the
>> translations of all TableSinks will stop at the DataSet and not include the
>> TableSource which reads the file.
>>
>> The following figures illustrate the difference:
>>
>> 1) Without DataSet in the middle:
>>
>> TableSource -> Filter1 -> TableSink1
>> TableSource -> Filter2 -> TableSink2
>> TableSource -> Filter3 -> TableSink3
>>
>> 2) With DataSet in the middle:
>>
>> /-> Filter1 -> TableSink1
>> TableSource -<-> Filter2 -> TableSink2
>> \-> Filter3 -> TableSink3
>>
>> I'll likely add a feature to internally translate an intermediate Table
>> to make this a bit easier.
>> The underlying problem is that the SQL optimizer cannot translate queries
>> with multiple sinks.
>> Instead, each sink is individually translated and the optimizer does not
>> know that common execution paths could be shared.
>>
>> Best,
>> Fabian
>>
>>
>> 2018-02-19 2:19 GMT+01:00 Darshan Singh :
>>
>>> Thanks for reply.
>>>
>>> I guess I am not looking for alternate. I am trying to understand what
>>> flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
>>> they will be reading csv as there is no other way.
>>>
>>> Thanks
>>>
>>> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman 
>>> wrote:
>>>

 Do you really need the large single table created in step 2?

 If not, what you typically do is that the Csv source first do the
 common transformations. Then depending on whether the 10 outputs have
 different processing paths or not, you either do a split() to do individual
 processing depending on some criteria, or you just have the sink put each
 record in separate tables.
 You have full control, at each step along the transformation path
 whether it can be parallelized or not, and if there are no sequential
 constraints on your model, then you can easily fill all cores on all hosts
 quite easily.

 Even if you need the step 2 table, I would still just treat that as a
 split(), a branch ending in a Sink that does the storage there. No need to
 read records from file over and over again, nor to store them first in step
 2 table and read them out again.

>>>

Re: Regarding BucketingSink

2018-02-19 Thread Vishal Santoshi
That is fine, till flink assure at-least-once semantics ?

If the contents of a .pending file, through the turbulence ( restarts etc
)  are assured to be in another file than anything starting with "_"
underscore will by default ignored by hadoop ( hive or MR etc ).



On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> Sorry for the confusion. The framework (Flink) does currently not do any
> cleanup of pending files, yes.
>
> Best,
> Aljoscha
>
>
> On 19. Feb 2018, at 17:01, Vishal Santoshi 
> wrote:
>
> >> You should only have these dangling pending files after a
> failure-recovery cycle, as you noticed. My suggestion would be to
> periodically clean up older pending files.
>
> A little confused. Is that what the framework should do, or us as part of
> some cleanup job ?
>
>
>
>
>
> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> The BucketingSink does not clean up pending files on purpose. In a
>> distributed setting, and especially with rescaling of Flink operators, it
>> is sufficiently hard to figure out which of the pending files you actually
>> can delete and which of them you have to leave because they will get moved
>> to "final" as part of recovering from a checkpoint on some other parallel
>> instance of the sink.
>>
>> You should only have these dangling pending files after a
>> failure-recovery cycle, as you noticed. My suggestion would be to
>> periodically clean up older pending files.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 19. Feb 2018, at 16:37, Till Rohrmann  wrote:
>>
>> Hi Vishal,
>>
>> what pending files should indeed get eventually finalized. This happens
>> on a checkpoint complete notification. Thus, what you report seems not
>> right. Maybe Aljoscha can shed a bit more light into the problem.
>>
>> In order to further debug the problem, it would be really helpful to get
>> access to DEBUG log files of a TM which runs the BucketingSink.
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong  wrote:
>>
>>> Hi Vishal,
>>>
>>> I have the same concern about save pointing in BucketingSink.
>>> As for your question, I think before the pending files get cleared in
>>> handleRestoredBucketState .
>>> They are finalized in notifyCheckpointComplete
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors
>>> /flink-connector-filesystem/src/main/java/org/apache/flink/
>>> streaming/connectors/fs/bucketing/BucketingSink.java#L628
>>>
>>> I'm looking into this part of the source code now, since we are
>>> experiencing some unclosed files after check pointing.
>>> It would be great if you can share more if you find something new about
>>> your problem, which might help with our problem.
>>>
>>> Best regards,
>>> Mu
>>>
>>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 -rw-r--r--   3 root hadoop 11 2018-02-14 18:48
 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
 -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
 -rw-r--r--   3 root hadoop 11 2018-02-14 21:17
 /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length


 This is strange, we had a few retries b'coz of an OOM on one of the TMs
 and we see this situation. 2 files ( on either sides )  that were dealt
 with fine but a dangling .pending file. I am sure this is not what is meant
 to be.   We I think have an edge condition and looking at the code it is
 not obvious. May be some one who wrote the code can shed some light as to
 how can this happen.



 On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> without --allowNonRestoredState, on a suspend/resume we do see the
> length file along with the finalized file ( finalized during resume )
>
> -rw-r--r--   3 root hadoop 10 2018-02-09 13:57
> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>
> that does makes much more sense.
>
> I guess we should document --allowNonRestoredState better ? It seems
> it actually drops state ?
>
>
>
> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> This is 1.4 BTW.  I am not sure that I am reading this correctly but
>> the lifecycle of cancel/resume is 2 steps
>>
>>
>>
>> 1. Cancel job with SP
>>
>>
>> closeCurrentPartFile
>>
>> https://github.com/apache/flink/blob/master/flink-connectors
>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>> treaming/connectors/fs/bucketing/BucketingSink.java#L549
>>
>> is called from close()
>>
>>
>> https://github.com/apache/flink/blob/master/flink-connectors
>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>> treaming/connectors/fs/bucketing/Bucketin

Re: Managed State Custom Serializer with Avro

2018-02-19 Thread Niels Denissen
Hi Till,

Thanks for the quick reply, I'm using 1.3.2 atm.

Cheers,
Niels

On Feb 19, 2018 19:10, "Till Rohrmann"  wrote:

> Hi Niels,
>
> which version of Flink are you using? Currently, Flink does not support to
> upgrade the TypeSerializer itself, if I'm not mistaken. As you've
> described, it will try to use the old serializer stored in the checkpoint
> stream to restore state.
>
> I've pulled Gordon into the conversation who can tell you a little bit
> more about the current capability and limitations of state evolution.
>
> Cheers,
> Till
>
> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]
> > wrote:
>
>> Hi all,
>>
>> I'm currently trying to use Avro in order to evolve our data present in
>> Flink's Managed State. I've extended the TypeSerializer class successfully
>> for this purpose, but still have issues using Schema Evolution.
>>
>> *The problem:*
>> When we try to read data (deserialize from savepoint) with a new
>> serialiser
>> and a new schema, Flink seems to use the old schema of the old serializer
>> (written to the savepoint). This results in an old GenericRecord that
>> doesn't adhere to the new Avro schema.
>>
>> *What seems to happen to me is the following* (Say we evolve from dataV1
>> to
>> dataV2):
>> - State containing dataV1 is serialized with avro schema V1 to a
>> check/savepoint. Along with the data, the serializer itself is written.
>> - Upon restore, the old serializer is retrieved from the data (therefore
>> needs to be on the classpath). Data is restored using this old serializer.
>> The new serializer provided is only used for writes.
>>
>> If this is indeed the case it explains our aforementioned problem. If you
>> have any pointers as to whether this is true and what a possible solution
>> would be that would be very much appreciated!
>>
>> Thanks!
>> Niels
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html
> To unsubscribe from Managed State Custom Serializer with Avro, click here
> 
> .
> NAML
> 
>


Re: Managed State Custom Serializer with Avro

2018-02-19 Thread Till Rohrmann
Hi Niels,

which version of Flink are you using? Currently, Flink does not support to
upgrade the TypeSerializer itself, if I'm not mistaken. As you've
described, it will try to use the old serializer stored in the checkpoint
stream to restore state.

I've pulled Gordon into the conversation who can tell you a little bit more
about the current capability and limitations of state evolution.

Cheers,
Till

On Mon, Feb 19, 2018 at 4:14 PM, Niels  wrote:

> Hi all,
>
> I'm currently trying to use Avro in order to evolve our data present in
> Flink's Managed State. I've extended the TypeSerializer class successfully
> for this purpose, but still have issues using Schema Evolution.
>
> *The problem:*
> When we try to read data (deserialize from savepoint) with a new serialiser
> and a new schema, Flink seems to use the old schema of the old serializer
> (written to the savepoint). This results in an old GenericRecord that
> doesn't adhere to the new Avro schema.
>
> *What seems to happen to me is the following* (Say we evolve from dataV1 to
> dataV2):
> - State containing dataV1 is serialized with avro schema V1 to a
> check/savepoint. Along with the data, the serializer itself is written.
> - Upon restore, the old serializer is retrieved from the data (therefore
> needs to be on the classpath). Data is restored using this old serializer.
> The new serializer provided is only used for writes.
>
> If this is indeed the case it explains our aforementioned problem. If you
> have any pointers as to whether this is true and what a possible solution
> would be that would be very much appreciated!
>
> Thanks!
> Niels
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Stopping a kafka consumer gracefully (no losing of inflight events, StoppableFunction)

2018-02-19 Thread Till Rohrmann
Hi Bart,

you're right that Flink currently does not support a graceful stop
mechanism for the Kafka source. The community has already a good idea how
to solve it in the general case and will hopefully soon add it to Flink.

Concerning the StoppableFunction: This interface was introduced quite some
time ago and currently only works for some batch sources. In order to make
it work with streaming, we need to add some more functionality to the
engine in order to properly stop and take a savepoint.

Cheers,
Till

On Mon, Feb 19, 2018 at 3:36 PM, Bart Kastermans  wrote:

> In https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/cli.html it is shown that
> for gracefully stopping a job you need to implement the StoppableFunction
> interface.  This
> appears not (yet) implemented for Kafka consumers.  Am I missing
> something, or is there a
> different way to gracefully stop a job using a kafka source so we can
> restart it later without
> losing any (in flight) events?
>
> - bart
>


Re: Correlation between number of operators and Job manager memory requirements

2018-02-19 Thread Till Rohrmann
Hi Shailesh,

my question would be where do you see the OOM happening? Does it happen on
the JM or the TM.

The memory requirements for each operator strongly depend on the operator
and it is hard to give a general formula for that. It mostly depends on the
user function. Flink itself should not need too much extra memory for the
framework specific code.

CEP, however, can easily add a couple of hundred megabytes to your memory
requirements. This depends strongly on the pattern you're matching and
which state backend you're using.

Concerning your question one big job vs. multiple jobs, I could see that
this helps if not all jobs are executed at the same time. Especially if you
only have a single TM with a limited number of slots, I think that you
effectively queue up jobs. That should reduce the required amount of
resources for each individual job.

Cheers,
Till

On Mon, Feb 19, 2018 at 11:35 AM, Shailesh Jain  wrote:

> Actually, there are too many hyperparameters to experiment with, that is
> why I'm trying to understand if there is any particular way in which a
> cluster could be benchmarked.
>
> Another strange behaviour I am observing is: Delaying the operator
> creation (by distributing the operators across jobs, and submitting
> multiple jobs to the same cluster instead of one) is helping in creating
> more operators. Any ideas on why that is happening?
>
> Shailesh
>
>
> On Sun, Feb 18, 2018 at 11:16 PM, Pawel Bartoszek <
> pawelbartosze...@gmail.com> wrote:
>
>> Hi,
>>
>> You could definitely try to find formula for heap size, but isnt's it
>> easier just to try out different memory settings and see which works best
>> for you?
>>
>> Thanks,
>> Pawel
>>
>> 17 lut 2018 12:26 "Shailesh Jain" 
>> napisał(a):
>>
>> Oops, hit send by mistake.
>>
>> In the configuration section, it is mentioned that for "many operators"
>> heap size should be increased.
>>
>> "JVM heap size (in megabytes) for the JobManager. You may have to
>> increase the heap size for the JobManager if you are running very large
>> applications (with many operators), or if you are keeping a long history of
>> them."
>>
>> Is there any recommendation on the heap space required when there are
>> around 200 CEP operators, and close 80 Filter operators?
>>
>> Any other leads on calculating the expected heap space allocation to
>> start the job would be really helpful.
>>
>> Thanks,
>> Shailesh
>>
>>
>>
>> On Sat, Feb 17, 2018 at 5:53 PM, Shailesh Jain <
>> shailesh.j...@stellapps.com> wrote:
>>
>>> Hi,
>>>
>>> I have flink job with almost 300 operators, and every time I'm trying to
>>> submit the job, the cluster crashes with OutOfMemory exception.
>>>
>>> I have 1 job manager and 1 task manager with 2 GB heap space allocated
>>> to both.
>>>
>>> In the configuration section of the documentation
>>>
>>>
>>>
>>>
>>
>>
>


Re: A "per operator instance" window all ?

2018-02-19 Thread Till Rohrmann
Hi Julien,

at the moment Flink only supports parallel windows which are keyed. What
you would need is something like a per-partition window which is currently
not supported. The problem with that is that it is not clear how to rescale
a per-partition window because it effectively means that you have only as
many key groups as you have partitions. What you can also do is to key by a
prefix of your resource id. That way you will group more resource ids into
the same window. Choosing a prefix which gives you enough groups to evenly
utilize your workers as well as higher granularity for your external
requests should then be doable.

Ken's solution should work for your use case. However, be aware that this
will break as soon as Flink changes its internal key to key-group mapping.

Cheers,
Till

On Mon, Feb 19, 2018 at 5:27 PM, Ken Krugler 
wrote:

> Hi Julien,
>
> I'd run into a similar situation, where I need to have a keyed stream, but
> I want (effectively) one key per task.
>
> It’s possible to generate keys that will get distributed as you need,
> though it does require making assumptions about how Flink generates
> hashes/key groups.
>
> And once you start talking about state, then it gets a bit harder, as you
> need to know the max parallelism, which is used to calculate “key groups”.
>
> Below is a cheesy function I wrote to make an Integer that (if used as the
> key) will partition the record to the target operator.
>
> I use it in a custom Map function to add a key field.
>
> — Ken
>
> /**
> * Return an integer value that will get partitioned to the target
> , given the
> * workflow's  (for key groups) and the operator
> .
> *
> * @param maxParallelism
> * @param parallelism
> * @param operatorIndex
> * @return Integer suitable for use in a record as the key.
> */
> public static Integer makeKeyForOperatorIndex(int maxParallelism, int
> parallelism, int operatorIndex) {
> if (maxParallelism == ExecutionJobVertex.VALUE_NOT_SET) {
> maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(p
> arallelism);
> }
>
>
> for (int i = 0; i < maxParallelism * 2; i++) {
> Integer key = new Integer(i);
> int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key,
> maxParallelism);
> int index = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
> maxParallelism, parallelism, keyGroup);
> if (index == operatorIndex) {
> return key;
> }
> }
>
>
> throw new RuntimeException(String.format("Unable to find key for target
> operator index %d (max parallelism = %d, parallelism = %d",
> operatorIndex, maxParallelism, parallelism));
> }
>
>
> On Feb 19, 2018, at 12:34 AM, Julien  wrote:
>
> Hello,
>
> I've already tried to key my stream with "resourceId.hashCode%parallelism"
> (with parallelism of 4 in my example).
> So all my keys will be either 0,1, 2 or 3. I can then benefit from a time
> window on this keyed stream and do only 4 queries to my external system.
> But it is not well distributed with the default partitioner on keyed
> stream. (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).
>
> I think I should explore the customer partitioner, as you suggested
> Xingcan.
> Maybe my last question on this will be: "can you give me more details on
> this point "and simulate a window operation by yourself in a
> ProcessFunction" ?
>
> When I look at the documentation about the custom partitioner, I can see
> that the result of partitionCustom is a DataStream.
> It is not a KeyedStream.
> So the only window I have will be windowAll (which will bring me back to a
> parallelism of 1, no ?).
>
> And if I do something like "myStream.partitionCustom( partitioner>,).keyBy().window(...)", will it preserve my
> custom partitioner ?
> When looking at the "KeyedStream" class, it seems that it will go back to
> the "KeyGroupStreamPartitioner" and forget my custom partitioner ?
>
> Thanks again for your feedback,
>
> Julien.
>
>
> On 19/02/2018 03:45, 周思华 wrote:
>
> Hi Julien,
> If I am not misunderstand, I think you can key your stream on a
> `Random.nextInt() % parallesm`, this way  you can "group" together alerts
> from different and benefit from multi parallems.
>
>
> 发自网易邮箱大师
>
> On 02/19/2018 09:08,Xingcan Cui 
> wrote:
>
> Hi Julien,
>
> sorry for my misunderstanding before. For now, the window can only be
> defined on a KeyedStream or an ordinary DataStream but with parallelism =
> 1. I’d like to provide three options for your scenario.
>
> 1. If your external data is static and can be fit into the memory, you can
> use ManagedStates
> 
>  to
> cache them without considering the querying problem.
> 2. Or you can use a CustomPartitioner
> 
>  to
> manually distribute your alert data and simulate an window operation by
> yourself in a ProcessFuncton.
> 3. You may also choose to use some external s

sink with BucketingSink to S3 files override

2018-02-19 Thread galantaa
Hey,
I have some kind of a concurrency problem with Bucketing sink when I write
to S3.
I use the AvroKeyValueSinkWriter.
The problem is that when I send events the suppose to be written to the same
directory, but to a different part file (because of different event types),
the files override each other.
The problem occurs only when I sink the files to S3. 
When I write the files to the local storage it does not happen, but I think
that only because there's this loop in openNewPartFile:

// The following loop tries different partCounter values in ascending order
until it reaches the minimum
// that is not yet used. This works since there is only one parallel subtask
that tries names with this
// subtask id. Otherwise we would run into concurrency issues here. This is
aligned with the way we now
// clean the base directory in case of rescaling.

/int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
bucketState.partCounter);
while (fs.exists(partPath) ||
  fs.exists(getPendingPathFor(partPath)) ||
  fs.exists(getInProgressPathFor(partPath))) {
bucketState.partCounter++;
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
bucketState.partCounter);
}
/
that makes sense. But on S3 the files does not exist until checkpointing, so
the loop won't find the files.

After debugging, I've noticed that in the invoke method, in
state.getBucketState() the first time I try to write event to the bucket, it
creates a new bucketState in the HashMap, but the second time I try to write
to the same bucket (with the different event), it does find this new
bucketState.

Thanks for the help! 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: A "per operator instance" window all ?

2018-02-19 Thread Ken Krugler
Hi Julien,

I'd run into a similar situation, where I need to have a keyed stream, but I 
want (effectively) one key per task.

It’s possible to generate keys that will get distributed as you need, though it 
does require making assumptions about how Flink generates hashes/key groups.

And once you start talking about state, then it gets a bit harder, as you need 
to know the max parallelism, which is used to calculate “key groups”.

Below is a cheesy function I wrote to make an Integer that (if used as the key) 
will partition the record to the target operator.

I use it in a custom Map function to add a key field.

— Ken

/**
 * Return an integer value that will get partitioned to the target 
, given the
 * workflow's  (for key groups) and the operator 
.
 * 
 * @param maxParallelism
 * @param parallelism
 * @param operatorIndex
 * @return Integer suitable for use in a record as the key.
 */
public static Integer makeKeyForOperatorIndex(int maxParallelism, int 
parallelism, int operatorIndex) {
if (maxParallelism == ExecutionJobVertex.VALUE_NOT_SET) {
maxParallelism = 
KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
}

for (int i = 0; i < maxParallelism * 2; i++) {
Integer key = new Integer(i);
int keyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
int index = 
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 
parallelism, keyGroup);
if (index == operatorIndex) {
return key;
}
}

throw new RuntimeException(String.format("Unable to find key 
for target operator index %d (max parallelism = %d, parallelism = %d", 
operatorIndex, maxParallelism, parallelism));
}


> On Feb 19, 2018, at 12:34 AM, Julien  wrote:
> 
> Hello,
> 
> I've already tried to key my stream with "resourceId.hashCode%parallelism" 
> (with parallelism of 4 in my example).
> So all my keys will be either 0,1, 2 or 3. I can then benefit from a time 
> window on this keyed stream and do only 4 queries to my external system.
> But it is not well distributed with the default partitioner on keyed stream. 
> (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).
> 
> I think I should explore the customer partitioner, as you suggested Xingcan.
> Maybe my last question on this will be: "can you give me more details on this 
> point "and simulate a window operation by yourself in a ProcessFunction" ?
> 
> When I look at the documentation about the custom partitioner, I can see that 
> the result of partitionCustom is a DataStream.
> It is not a KeyedStream.
> So the only window I have will be windowAll (which will bring me back to a 
> parallelism of 1, no ?).
> 
> And if I do something like "myStream.partitionCustom(, key>).keyBy().window(...)", will it preserve my custom partitioner ?
> When looking at the "KeyedStream" class, it seems that it will go back to the 
> "KeyGroupStreamPartitioner" and forget my custom partitioner ?
> 
> Thanks again for your feedback,
> 
> Julien.
> 
> 
> On 19/02/2018 03:45, 周思华 wrote:
>> Hi Julien,
>> If I am not misunderstand, I think you can key your stream on a 
>> `Random.nextInt() % parallesm`, this way  you can "group" together alerts 
>> from different and benefit from multi parallems.
>> 
>> 
>> 发自网易邮箱大师
>> 
>> On 02/19/2018 09:08,Xingcan Cui 
>>  wrote: 
>> Hi Julien,
>> 
>> sorry for my misunderstanding before. For now, the window can only be 
>> defined on a KeyedStream or an ordinary DataStream but with parallelism = 1. 
>> I’d like to provide three options for your scenario.
>> 
>> 1. If your external data is static and can be fit into the memory, you can 
>> use ManagedStates 
>> 
>>  to cache them without considering the querying problem.
>> 2. Or you can use a CustomPartitioner 
>> 
>>  to manually distribute your alert data and simulate an window operation by 
>> yourself in a ProcessFuncton.
>> 3. You may also choose to use some external systems such as in-memory store, 
>> which can work as a cache for your queries.
>> 
>> Best,
>> Xingcan
>> 
>>> On 19 Feb 2018, at 5:55 AM, Julien >> > wrote:
>>> 
>>> Hi Xingcan,
>>> 
>>> Thanks for your answer.
>>> Yes, I understand that point:
>>> if I have 100 resource IDs with parallelism of 4, then each operator 
>>> instance will handle about 25 keys
>>> 
>>> 
>>> The issue I have is that I want, on a given operator instance, to

Re: Iterating over state entries

2018-02-19 Thread Till Rohrmann
Hi Ken,

just for my clarification, the `RocksDBMapState#entries` method does not
satisfy your requirements? This method does not allow you to iterate across
different keys of your keyed stream of course. But it should allow you to
iterate over the different entries for a given key of your keyed stream.

Cheers,
Till

On Mon, Feb 19, 2018 at 12:10 AM, Ken Krugler 
wrote:

> Hi there,
>
> I’ve got a MapState where I need to iterate over the entries.
>
> This currently isn’t supported (at least for Rocks DB), AFAIK, though
> there is an issue/PR  to
> improve this.
>
> The best solution I’ve seen is what Fabian proposed, which involves
> keeping a ValueState with a count of entries, and then having the key for
> the MapState be the index.
>
> I cannot comment on the internal design, but you could put the data into a
> RocksDBStateBackend MapState where the value X is your data
> type and the key is the list index. You would need another ValueState for
> the current number of elements that you put into the MapState.
> A MapState allows to fetch and traverse the key, value, or entry set of the
> Map without loading it completely into memory.
> The sets are traversed in sort order of the key, so should be in insertion
> order (given that you properly increment the list index).
>
>
> This effectively lets you iterate over all of the map entries for a given
> (keyed) state - though it doesn’t solve the “I have to iterate over _every_
> entry” situation.
>
> Is this currently the best option?
>
> Thanks,
>
> — Ken
>
> 
> http://about.me/kkrugler
> +1 530-210-6378 <(530)%20210-6378>
>
>


Re: Iterating over state entries

2018-02-19 Thread Fabian Hueske
Hi Ken,

I'd like to clarify what I said before.

By using MapState mainly gain two things:
- position access by index
- the full list does not need to be deserialized to read values (which is
how ListState works).

Point access should obviously done by get(index).
However, iterating over the list should be done by iterating over the entry
(or value) set. The entry set iterator will prefetch multiple entries and
only deserialize the key / values when you access them. This reduces the
number of RocksDB look-ups.

Best,
Fabian


2018-02-19 0:10 GMT+01:00 Ken Krugler :

> Hi there,
>
> I’ve got a MapState where I need to iterate over the entries.
>
> This currently isn’t supported (at least for Rocks DB), AFAIK, though
> there is an issue/PR  to
> improve this.
>
> The best solution I’ve seen is what Fabian proposed, which involves
> keeping a ValueState with a count of entries, and then having the key for
> the MapState be the index.
>
> I cannot comment on the internal design, but you could put the data into a
> RocksDBStateBackend MapState where the value X is your data
> type and the key is the list index. You would need another ValueState for
> the current number of elements that you put into the MapState.
> A MapState allows to fetch and traverse the key, value, or entry set of the
> Map without loading it completely into memory.
> The sets are traversed in sort order of the key, so should be in insertion
> order (given that you properly increment the list index).
>
>
> This effectively lets you iterate over all of the map entries for a given
> (keyed) state - though it doesn’t solve the “I have to iterate over _every_
> entry” situation.
>
> Is this currently the best option?
>
> Thanks,
>
> — Ken
>
> 
> http://about.me/kkrugler
> +1 530-210-6378 <(530)%20210-6378>
>
>


Re: Regarding BucketingSink

2018-02-19 Thread Aljoscha Krettek
Hi,

Sorry for the confusion. The framework (Flink) does currently not do any 
cleanup of pending files, yes.

Best,
Aljoscha

> On 19. Feb 2018, at 17:01, Vishal Santoshi  wrote:
> 
> >> You should only have these dangling pending files after a failure-recovery 
> >> cycle, as you noticed. My suggestion would be to periodically clean up 
> >> older pending files.
> 
> A little confused. Is that what the framework should do, or us as part of 
> some cleanup job ?
> 
> 
> 
> 
> 
> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek  > wrote:
> Hi,
> 
> The BucketingSink does not clean up pending files on purpose. In a 
> distributed setting, and especially with rescaling of Flink operators, it is 
> sufficiently hard to figure out which of the pending files you actually can 
> delete and which of them you have to leave because they will get moved to 
> "final" as part of recovering from a checkpoint on some other parallel 
> instance of the sink.
> 
> You should only have these dangling pending files after a failure-recovery 
> cycle, as you noticed. My suggestion would be to periodically clean up older 
> pending files.
> 
> Best,
> Aljoscha
> 
> 
>> On 19. Feb 2018, at 16:37, Till Rohrmann > > wrote:
>> 
>> Hi Vishal,
>> 
>> what pending files should indeed get eventually finalized. This happens on a 
>> checkpoint complete notification. Thus, what you report seems not right. 
>> Maybe Aljoscha can shed a bit more light into the problem.
>> 
>> In order to further debug the problem, it would be really helpful to get 
>> access to DEBUG log files of a TM which runs the BucketingSink.
>> 
>> Cheers,
>> Till
>> 
>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong > > wrote:
>> Hi Vishal,
>> 
>> I have the same concern about save pointing in BucketingSink.
>> As for your question, I think before the pending files get cleared in 
>> handleRestoredBucketState .
>> They are finalized in notifyCheckpointComplete
>> 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628
>>  
>> 
>> 
>> I'm looking into this part of the source code now, since we are experiencing 
>> some unclosed files after check pointing.
>> It would be great if you can share more if you find something new about your 
>> problem, which might help with our problem.
>> 
>> Best regards,
>> Mu
>> 
>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi > > wrote:
>> -rw-r--r--   3 root hadoop 11 2018-02-14 18:48 
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>> -rw-r--r--   3 root hadoop 11 2018-02-14 21:17 
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>> 
>> 
>> This is strange, we had a few retries b'coz of an OOM on one of the TMs and 
>> we see this situation. 2 files ( on either sides )  that were dealt with 
>> fine but a dangling .pending file. I am sure this is not what is meant to 
>> be.   We I think have an edge condition and looking at the code it is not 
>> obvious. May be some one who wrote the code can shed some light as to how 
>> can this happen.
>> 
>> 
>> 
>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi > > wrote:
>> without --allowNonRestoredState, on a suspend/resume we do see the length 
>> file along with the finalized file ( finalized during resume ) 
>> 
>> -rw-r--r--   3 root hadoop 10 2018-02-09 13:57 
>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>> 
>> that does makes much more sense. 
>> 
>> I guess we should document --allowNonRestoredState better ? It seems it 
>> actually drops state ?
>> 
>> 
>> 
>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi > > wrote:
>> This is 1.4 BTW.  I am not sure that I am reading this correctly but the 
>> lifecycle of cancel/resume is 2 steps
>> 
>> 
>> 
>> 1. Cancel job with SP
>> 
>> 
>> closeCurrentPartFile
>> 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549
>>  
>> 
>> 
>> is called from close()
>> 
>> 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416
>>  
>> 

Re: Regarding BucketingSink

2018-02-19 Thread Vishal Santoshi
>> You should only have these dangling pending files after a
failure-recovery cycle, as you noticed. My suggestion would be to
periodically clean up older pending files.

A little confused. Is that what the framework should do, or us as part of
some cleanup job ?





On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> The BucketingSink does not clean up pending files on purpose. In a
> distributed setting, and especially with rescaling of Flink operators, it
> is sufficiently hard to figure out which of the pending files you actually
> can delete and which of them you have to leave because they will get moved
> to "final" as part of recovering from a checkpoint on some other parallel
> instance of the sink.
>
> You should only have these dangling pending files after a failure-recovery
> cycle, as you noticed. My suggestion would be to periodically clean up
> older pending files.
>
> Best,
> Aljoscha
>
>
> On 19. Feb 2018, at 16:37, Till Rohrmann  wrote:
>
> Hi Vishal,
>
> what pending files should indeed get eventually finalized. This happens on
> a checkpoint complete notification. Thus, what you report seems not right.
> Maybe Aljoscha can shed a bit more light into the problem.
>
> In order to further debug the problem, it would be really helpful to get
> access to DEBUG log files of a TM which runs the BucketingSink.
>
> Cheers,
> Till
>
> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong  wrote:
>
>> Hi Vishal,
>>
>> I have the same concern about save pointing in BucketingSink.
>> As for your question, I think before the pending files get cleared in
>> handleRestoredBucketState .
>> They are finalized in notifyCheckpointComplete
>>
>> https://github.com/apache/flink/blob/master/flink-connectors
>> /flink-connector-filesystem/src/main/java/org/apache/
>> flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628
>>
>> I'm looking into this part of the source code now, since we are
>> experiencing some unclosed files after check pointing.
>> It would be great if you can share more if you find something new about
>> your problem, which might help with our problem.
>>
>> Best regards,
>> Mu
>>
>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> -rw-r--r--   3 root hadoop 11 2018-02-14 18:48
>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>>> -rw-r--r--   3 root hadoop 11 2018-02-14 21:17
>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>>>
>>>
>>> This is strange, we had a few retries b'coz of an OOM on one of the TMs
>>> and we see this situation. 2 files ( on either sides )  that were dealt
>>> with fine but a dangling .pending file. I am sure this is not what is meant
>>> to be.   We I think have an edge condition and looking at the code it is
>>> not obvious. May be some one who wrote the code can shed some light as to
>>> how can this happen.
>>>
>>>
>>>
>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 without --allowNonRestoredState, on a suspend/resume we do see the
 length file along with the finalized file ( finalized during resume )

 -rw-r--r--   3 root hadoop 10 2018-02-09 13:57
 /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length

 that does makes much more sense.

 I guess we should document --allowNonRestoredState better ? It seems
 it actually drops state ?



 On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> This is 1.4 BTW.  I am not sure that I am reading this correctly but
> the lifecycle of cancel/resume is 2 steps
>
>
>
> 1. Cancel job with SP
>
>
> closeCurrentPartFile
>
> https://github.com/apache/flink/blob/master/flink-connectors
> /flink-connector-filesystem/src/main/java/org/apache/flink/s
> treaming/connectors/fs/bucketing/BucketingSink.java#L549
>
> is called from close()
>
>
> https://github.com/apache/flink/blob/master/flink-connectors
> /flink-connector-filesystem/src/main/java/org/apache/flink/s
> treaming/connectors/fs/bucketing/BucketingSink.java#L416
>
>
> and that moves files to pending state.  That I would presume is called
> when one does a cancel.
>
>
>
> 2. The restore on resume
>
> https://github.com/apache/flink/blob/master/flink-connectors
> /flink-connector-filesystem/src/main/java/org/apache/flink/s
> treaming/connectors/fs/bucketing/BucketingSink.java#L369
>
> calls
>
> handleRestoredBucketState
>
> https://github.com/apache/flink/blob/master/flink-connectors
> /flink-connector-filesystem/src/main/java/org/apache/flink/s
> treaming/connectors/fs/bucketing/BucketingSink.java#L704
>
> cl

Re: Regarding Task Slots allocation

2018-02-19 Thread Till Rohrmann
Hi Vinay,

try to set the parallelism to 2 for the job you are executing via the
RemoteExecutionEnvironment.

Where have you specified the number of TaskManager slots? In the
flink-conf.yaml file which you used to deploy the remote Flink cluster?

Cheers,
Till

On Fri, Feb 16, 2018 at 7:14 PM, Vinay Patil 
wrote:

> Hi,
>
> I am trying to deploy a flink job to remote cluster using
> remoteExecutionEnvironment, I have specified the number of task slots for
> Task Manager to 2 , so it should have ideally taken 2 slots only, however
> all the slots are getting utilized. Is there any other configuration I have
> to do ?
>
> Regards,
> Vinay Patil
>


Re: Regarding BucketingSink

2018-02-19 Thread Aljoscha Krettek
Hi,

The BucketingSink does not clean up pending files on purpose. In a distributed 
setting, and especially with rescaling of Flink operators, it is sufficiently 
hard to figure out which of the pending files you actually can delete and which 
of them you have to leave because they will get moved to "final" as part of 
recovering from a checkpoint on some other parallel instance of the sink.

You should only have these dangling pending files after a failure-recovery 
cycle, as you noticed. My suggestion would be to periodically clean up older 
pending files.

Best,
Aljoscha

> On 19. Feb 2018, at 16:37, Till Rohrmann  wrote:
> 
> Hi Vishal,
> 
> what pending files should indeed get eventually finalized. This happens on a 
> checkpoint complete notification. Thus, what you report seems not right. 
> Maybe Aljoscha can shed a bit more light into the problem.
> 
> In order to further debug the problem, it would be really helpful to get 
> access to DEBUG log files of a TM which runs the BucketingSink.
> 
> Cheers,
> Till
> 
> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong  > wrote:
> Hi Vishal,
> 
> I have the same concern about save pointing in BucketingSink.
> As for your question, I think before the pending files get cleared in 
> handleRestoredBucketState .
> They are finalized in notifyCheckpointComplete
> 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628
>  
> 
> 
> I'm looking into this part of the source code now, since we are experiencing 
> some unclosed files after check pointing.
> It would be great if you can share more if you find something new about your 
> problem, which might help with our problem.
> 
> Best regards,
> Mu
> 
> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi  > wrote:
> -rw-r--r--   3 root hadoop 11 2018-02-14 18:48 
> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15 
> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
> -rw-r--r--   3 root hadoop 11 2018-02-14 21:17 
> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
> 
> 
> This is strange, we had a few retries b'coz of an OOM on one of the TMs and 
> we see this situation. 2 files ( on either sides )  that were dealt with fine 
> but a dangling .pending file. I am sure this is not what is meant to be.   We 
> I think have an edge condition and looking at the code it is not obvious. May 
> be some one who wrote the code can shed some light as to how can this happen.
> 
> 
> 
> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi  > wrote:
> without --allowNonRestoredState, on a suspend/resume we do see the length 
> file along with the finalized file ( finalized during resume ) 
> 
> -rw-r--r--   3 root hadoop 10 2018-02-09 13:57 
> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
> 
> that does makes much more sense. 
> 
> I guess we should document --allowNonRestoredState better ? It seems it 
> actually drops state ?
> 
> 
> 
> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi  > wrote:
> This is 1.4 BTW.  I am not sure that I am reading this correctly but the 
> lifecycle of cancel/resume is 2 steps
> 
> 
> 
> 1. Cancel job with SP
> 
> 
> closeCurrentPartFile
> 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549
>  
> 
> 
> is called from close()
> 
> 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L416
>  
> 
> 
> 
> and that moves files to pending state.  That I would presume is called when 
> one does a cancel.
> 
> 
> 
> 2. The restore on resume 
> 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L369
>  
> 
> 
> calls 
> 
> handleRestoredBucketState
> 
> htt

Re: Discarding bad data in Stream

2018-02-19 Thread Niclas Hedhman
Thanks Fabian,

I have seen Side Outputs and OutputTags but not fully understood the
mechanics yet. In my case, I don't need to keep bad records... And I think
I will end up with flatMap() after all, it just becomes a internal
documentation issue to provide relevant information...

Thanks for your response.
Niclas

On Mon, Feb 19, 2018 at 8:46 PM, Fabian Hueske  wrote:

> Hi Niclas,
>
> I'd either add a Filter to directly discard bad records. That should make
> the behavior explicit.
> If you need to do complex transformations that you don't want to do twice,
> the FlatMap approach would be the most efficient.
> If you'd like to keep the bad records, you can implement a ProcessFunction
> and add a side output [1] that collects bad records.
>
> Hope this helps,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/stream/side_output.html
>
> 2018-02-19 10:29 GMT+01:00 Niclas Hedhman :
>
>> Hi again,
>>
>> something that I don't find (easily) in the documentation is what the
>> recommended method is to discard data from the stream.
>>
>> On one hand, I could always use flatMap(), even if it is "per message"
>> since that allows me to return zero or one objects.
>>
>> DataStream stream =
>> env.addSource( source )
>>.flatMap( new MyFunction() )
>>
>>
>> But that seems a bit misleading, as the casual observer will get the idea
>> that MyFunction 'branches' out, but it doesn't.
>>
>> The other "obvious" choice is to return null and follow with a filter...
>>
>> DataStream stream =
>> env.addSource( source )
>>.map( new MyFunction() )
>>.filter( Objects::nonNull )
>>
>> BUT, that doesn't work with Java 8 method references like above, so I
>> have to create my own filter to get the type information correct to Flink;
>>
>> DataStream stream =
>> env.addSource( source )
>>.map( new MyFunction() )
>>.filter( new DiscardNullFilter<>() )
>>
>>
>> And in my opinion, that ends up looking ugly as the streams/pipeline (not
>> used to terminology yet) quickly have many transformations and branches,
>> and having a null check after each seems to put the burden of knowledge in
>> the wrong spot ("Can this function return null?")
>>
>> Throwing an exception is shutting down the entire stream, which seems
>> overly aggressive for many data related discards.
>>
>> Any other choices?
>>
>> Cheers
>> --
>> Niclas Hedhman, Software Developer
>> http://zest.apache.org - New Energy for Java
>>
>
>


-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Re: Question about the need of consumer groups from kafka

2018-02-19 Thread Till Rohrmann
Hi Ricardo,

could you please give a bit more details what you mean with "not using its
own mechanism"? Flink's Kafka connector uses the Kafka consumer and
producer (to some extent) API to talk to Kafka. The consumer groups are a
central concept of Kafka and as such, the Flink Kafka connector has to know
it in order to consume from the right group.

Creating a fat-jar will make sure that the jar includes additional
dependencies such as the flink-kafka-connector which is not present on the
Flink cluster.

Cheers,
Till

On Fri, Feb 16, 2018 at 2:05 PM, Ricardo Dinis  wrote:

> Hi,
>
> If flink can manage kafka without using its own's mechanism (kafka
> consumers) it still need the consumer group defined from kafka?
>
> And why the examples is needed to use the plugin for shadding creating a
> fat-jar?
>
> Thanks
>


Re: Concurrent modification Exception when submitting multiple jobs

2018-02-19 Thread Till Rohrmann
Hi Vinay,

could you try to create a dedicated RemoteEnvironment for each parallel
thread. I think that the StreamExecutionEnvironment is not thread safe and
should, thus, not be shared across multiple threads if that's the case.

Getting a glimpse at your code would also help to further understand the
problem.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:39 AM, Vinay Patil 
wrote:

> Hi,
>
> I am submitting job to the cluster (using remote execution env) from
> multiple threads. I am getting the following exception
>
>
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(
> ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.
> generateInternal(StreamGraphGenerator.java:128)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(
> StreamGraphGenerator.java:121)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> getStreamGraph(StreamExecutionEnvironment.java:1526)
> at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.
> execute(RemoteStreamEnvironment.java:173)
> at
> com.test.executors.FlinkExecutor.submitJobToCluster(FlinkExecutor.java:67)
>
>
> I am using Flink 1.3.2, and I am making sure that the job name is different
> for each job.
> Can you please let me know if I am doing something wrong.
>
> Regards,
> Vinay Patil
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Regarding BucketingSink

2018-02-19 Thread Till Rohrmann
Hi Vishal,

what pending files should indeed get eventually finalized. This happens on
a checkpoint complete notification. Thus, what you report seems not right.
Maybe Aljoscha can shed a bit more light into the problem.

In order to further debug the problem, it would be really helpful to get
access to DEBUG log files of a TM which runs the BucketingSink.

Cheers,
Till

On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong  wrote:

> Hi Vishal,
>
> I have the same concern about save pointing in BucketingSink.
> As for your question, I think before the pending files get cleared in
> handleRestoredBucketState .
> They are finalized in notifyCheckpointComplete
>
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-filesystem/src/main/java/org/
> apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628
>
> I'm looking into this part of the source code now, since we are
> experiencing some unclosed files after check pointing.
> It would be great if you can share more if you find something new about
> your problem, which might help with our problem.
>
> Best regards,
> Mu
>
> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> -rw-r--r--   3 root hadoop 11 2018-02-14 18:48
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>>
>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>>
>> -rw-r--r--   3 root hadoop 11 2018-02-14 21:17
>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>>
>>
>> This is strange, we had a few retries b'coz of an OOM on one of the TMs
>> and we see this situation. 2 files ( on either sides )  that were dealt
>> with fine but a dangling .pending file. I am sure this is not what is meant
>> to be.   We I think have an edge condition and looking at the code it is
>> not obvious. May be some one who wrote the code can shed some light as to
>> how can this happen.
>>
>>
>>
>>
>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> without --allowNonRestoredState, on a suspend/resume we do see the
>>> length file along with the finalized file ( finalized during resume )
>>>
>>> -rw-r--r--   3 root hadoop 10 2018-02-09 13:57
>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>>
>>> that does makes much more sense.
>>>
>>> I guess we should document --allowNonRestoredState better ? It seems it
>>> actually drops state ?
>>>
>>>
>>>
>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 This is 1.4 BTW.  I am not sure that I am reading this correctly but
 the lifecycle of cancel/resume is 2 steps



 1. Cancel job with SP


 closeCurrentPartFile

 https://github.com/apache/flink/blob/master/flink-connectors
 /flink-connector-filesystem/src/main/java/org/apache/flink/s
 treaming/connectors/fs/bucketing/BucketingSink.java#L549

 is called from close()


 https://github.com/apache/flink/blob/master/flink-connectors
 /flink-connector-filesystem/src/main/java/org/apache/flink/s
 treaming/connectors/fs/bucketing/BucketingSink.java#L416


 and that moves files to pending state.  That I would presume is called
 when one does a cancel.



 2. The restore on resume

 https://github.com/apache/flink/blob/master/flink-connectors
 /flink-connector-filesystem/src/main/java/org/apache/flink/s
 treaming/connectors/fs/bucketing/BucketingSink.java#L369

 calls

 handleRestoredBucketState

 https://github.com/apache/flink/blob/master/flink-connectors
 /flink-connector-filesystem/src/main/java/org/apache/flink/s
 treaming/connectors/fs/bucketing/BucketingSink.java#L704

 clears the pending files from state without finalizing them?



 That does not seem to be right. I must be reading the code totally
 wrong ?

 I am not sure also whether --allowNonRestoredState is skipping getting
 the state . At least https://ci.apache.org/pr
 ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not
 exactly clear what it does if we add an operator ( GDF I think will add a
 new operator in the DAG without state even if stateful, in my case the Map
 operator is not even stateful )


 Thanks and please bear with me if this is all something pretty simple.

 Vishal












 On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> What should be the behavior of BucketingSink vis a vis state ( pending
> , inprogess and finalization ) when we suspend and resume ?
>
> So I did this
>
> * I had a pipe writing to hdfs suspend and resume using
>
> --allowNonRestoredState as in I 

Window with recent messages

2018-02-19 Thread Krzysztof Białek
Hi,

My app is calculating Companies scores from Ratings given by users.
Only ratings from last 90 days should be considered.

1. Is it possible to construct window processing ratings from last 90 days?
I've started with *misusing* countWindow but this solution looks ugly for
me.

ratingStream
  .filter(new OutdatedRatingsFilter(maxRatingAge))
  .keyBy(_.companyId)
  .countWindow(0L).trigger(new OnEventTrigger).evictor(new
OutdatedRatingsEvictor(maxRatingAge))
  .process(ratingFunction)


2. How to recalculate score once the rating expires (after 90 days)?
I don't want to put artificial ratings into the stream to trigger the
recalculation.

Any idea how can I do it better?

Regards,
Krzysztof


Managed State Custom Serializer with Avro

2018-02-19 Thread Niels
Hi all, 

I'm currently trying to use Avro in order to evolve our data present in
Flink's Managed State. I've extended the TypeSerializer class successfully
for this purpose, but still have issues using Schema Evolution. 

*The problem:*
When we try to read data (deserialize from savepoint) with a new serialiser
and a new schema, Flink seems to use the old schema of the old serializer
(written to the savepoint). This results in an old GenericRecord that
doesn't adhere to the new Avro schema. 

*What seems to happen to me is the following* (Say we evolve from dataV1 to
dataV2): 
- State containing dataV1 is serialized with avro schema V1 to a
check/savepoint. Along with the data, the serializer itself is written. 
- Upon restore, the old serializer is retrieved from the data (therefore
needs to be on the classpath). Data is restored using this old serializer.
The new serializer provided is only used for writes. 

If this is indeed the case it explains our aforementioned problem. If you
have any pointers as to whether this is true and what a possible solution
would be that would be very much appreciated! 

Thanks! 
Niels



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Stopping a kafka consumer gracefully (no losing of inflight events, StoppableFunction)

2018-02-19 Thread Bart Kastermans
In https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html it 
is shown that
for gracefully stopping a job you need to implement the StoppableFunction 
interface.  This
appears not (yet) implemented for Kafka consumers.  Am I missing something, or 
is there a
different way to gracefully stop a job using a kafka source so we can restart 
it later without
losing any (in flight) events?

- bart


Re: Discarding bad data in Stream

2018-02-19 Thread Fabian Hueske
Hi Niclas,

I'd either add a Filter to directly discard bad records. That should make
the behavior explicit.
If you need to do complex transformations that you don't want to do twice,
the FlatMap approach would be the most efficient.
If you'd like to keep the bad records, you can implement a ProcessFunction
and add a side output [1] that collects bad records.

Hope this helps,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/side_output.html

2018-02-19 10:29 GMT+01:00 Niclas Hedhman :

> Hi again,
>
> something that I don't find (easily) in the documentation is what the
> recommended method is to discard data from the stream.
>
> On one hand, I could always use flatMap(), even if it is "per message"
> since that allows me to return zero or one objects.
>
> DataStream stream =
> env.addSource( source )
>.flatMap( new MyFunction() )
>
>
> But that seems a bit misleading, as the casual observer will get the idea
> that MyFunction 'branches' out, but it doesn't.
>
> The other "obvious" choice is to return null and follow with a filter...
>
> DataStream stream =
> env.addSource( source )
>.map( new MyFunction() )
>.filter( Objects::nonNull )
>
> BUT, that doesn't work with Java 8 method references like above, so I have
> to create my own filter to get the type information correct to Flink;
>
> DataStream stream =
> env.addSource( source )
>.map( new MyFunction() )
>.filter( new DiscardNullFilter<>() )
>
>
> And in my opinion, that ends up looking ugly as the streams/pipeline (not
> used to terminology yet) quickly have many transformations and branches,
> and having a null check after each seems to put the burden of knowledge in
> the wrong spot ("Can this function return null?")
>
> Throwing an exception is shutting down the entire stream, which seems
> overly aggressive for many data related discards.
>
> Any other choices?
>
> Cheers
> --
> Niclas Hedhman, Software Developer
> http://zest.apache.org - New Energy for Java
>


Re: Unexpected hop start & end timestamps after stream SQL join

2018-02-19 Thread Fabian Hueske
Hi Juho,

sorry for the late response. I found time to look into this issue.
I agree, that the start and end timestamps of the HOP window should be 1
hour apart from each other. I tried to reproduce the issue, but was not
able to do so.
Can you maybe open a JIRA and provide a simple test case (collection data
source, no Kafka) that reproduces the issue?

Regarding the task that you are trying to solve, have you looked into OVER
windows?

The following query would count for each record, how often a record with
the same ID combination was observed in the last hour based on its
timestamp:

SELECT
  s_aid1,
  s_cid,
  COUNT(*) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN
INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS occurrence,
  rowtime
FROM events
WHERE s_aid1 IS NOT NULL

If occurrence is 1, the current record is the only record within the last 1
hour with the combination of aid and cid .
The query does not batch the stream by 10 seconds, but rather produces the
results in real-time. If the batching is not required, you should be good
by adding a filter on occurrence = 1.
Otherwise, you could add the filter and wrap it by 10 secs tumbling window.

Hope this helps,
Fabian


2018-02-14 15:30 GMT+01:00 Juho Autio :

> I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result
> is unexpected. Am I doing something wrong? Maybe this is just not a
> supported join type at all? Any way here goes:
>
> I first register these two tables:
>
> 1. new_ids: a tumbling window of seen ids within the last 10 seconds:
>
> SELECT
>   s_aid1,
>   s_cid,
>   TS_MIN(rowtime) AS first_seen,
>   CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND),
> '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
>   TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
>   TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end
> FROM events
> WHERE s_aid1 IS NOT NULL
> GROUP BY
>   s_aid1,
>   s_cid,
>   TUMBLE(rowtime, INTERVAL '10' SECOND)
>
> 2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop:
>
> SELECT
>   s_aid1,
>   s_cid,
>   TS_MIN(rowtime) AS first_seen,
>   CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS
> DATE) AS processdate,
>   HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_start,
>   HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end
> FROM events
> WHERE s_aid1 IS NOT NULL
> GROUP BY
>   s_aid1,
>   s_cid,
>   HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR)
>
> If I write the results of the "seen_ids" table, the difference between
> HOP_start and HOP_end is always 1 hour, as expected.
>
> Then I register another query that joins the 2 tables:
>
> unique_ids (mostly including fields for debugging - what I need is the
> unique, new combinations of s_cid x s_aid1):
>
> SELECT
>new_ids.s_cid,
>new_ids.s_aid1,
>new_ids.processdate AS processdate,
>seen_ids.processdate AS seen_ids_processdate,
>new_ids.first_seen AS new_ids_first_seen,
>seen_ids.first_seen AS seen_ids_first_seen,
>tumble_start,
>HOP_start,
>tumble_end,
>HOP_end
> FROM new_ids, seen_ids
> WHERE new_ids.s_cid = seen_ids.s_cid
>   AND new_ids.s_aid1 = seen_ids.s_aid1
>   AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen IS
> NULL)
>
> I print the results of this table, and surprisingly the HOP_start &
> HOP_end are only separated by 10 seconds. Is this a bug?
>
> {
>   "s_cid": "appsimulator_236e5fb7",
> "s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26",
>
> "seen_ids_processdate": "2018-02-14",
>
> "seen_ids_first_seen": "2018-02-14 11:37:59.0",
> "new_ids_first_seen":  "2018-02-14 11:34:33.0",
> "tumble_start": "2018-02-14 11:34:30.0",
> "tumble_end": "2018-02-14 11:34:40.0",
>
> "HOP_start": "2018-02-14 11:37:50.0",
> "HOP_end": "2018-02-14 11:38:00.0"
> }
>
> What I'm trying to do is exclude the id from the current "new_ids" window
> if it was already seen before (within the 1 hour scope of "seen_ids"), but
> that doesn't work either. This example result row also shows that
> "seen_ids.first_seen" is bigger than it should be.
>
>
> Even if I can find a fix to this to get what I need, this strategy seems
> overly complicated. If anyone can suggest a better way, I'd be glad to
> hear. If this was a batch job, it could be defined simply as:
>
> SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H')
>
> + when streaming this query, the new distinct values should be written out
> every 10 seconds (ONLY the new ones - within that wrapping 1 hour window).
> So far I haven't been able to figure out how to do that in a simple way
> with Flink.
>
>
> *) TS_MIN is a custom function, but it's just a mapping of Flink's
> MinAggFunction:
>
> import java.sql.Timestamp
>
> import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered
>
> import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
> import org.apache.flink.table.functions.aggfunctions.MaxAggFunction
> import

Re: Need to understand the execution model of the Flink

2018-02-19 Thread Fabian Hueske
Hi,

that's a difficult question without knowing the details of your job.
A NoSpaceLeftOnDevice error occurs when a file system is full.

This can happen if:
- A Flink algorithm writes to disk, e.g., an external sort or the hash
table of a hybrid hash join. This can happen for GroupBy, Join, Distinct,
or any other operation that requires to group or join data. Filters will
never spill to disk.
- An OutputFormat writes to disk.

The data is written to a temp directory, that can be configured in the
./conf/flink-conf.yaml file.

Did you check how the tasks are distributed across the task managers?
The web UI can help to diagnose such problems.

Best, Fabian

2018-02-19 11:22 GMT+01:00 Darshan Singh :

> Thanks Fabian for such detailed explanation.
>
> I am using a datset in between so i guess csv is read once. Now to my real
> issue i have 6 task managers each having 4 cores and i have 2 slots per
> task manager.
>
> Now my csv file is jus 1 gb and i create table and transform to dataset
> and then run 15 different filters and extra processing which all run in
> almost parallel.
>
> However it fails with error no space left on device on one of the task
> manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
> it is running out of space. I do use some joins with othrr tables but those
> are few megabytes.
>
> So i was assuming that somehow all parallel executions were storing data
> in /tmp and were filling it.
>
> So i would like to know wht could be filling space.
>
> Thanks
>
> On 19 Feb 2018 10:10 am, "Fabian Hueske"  wrote:
>
> Hi,
>
> this works as follows.
>
> - Table API and SQL queries are translated into regular DataSet jobs
> (assuming you are running in a batch ExecutionEnvironment).
> - A query is translated into a sequence of DataSet operators when you 1)
> transform the Table into a DataSet or 2) write it to a TableSink. In both
> cases, the optimizer is invoked and recursively goes back from the
> converted/emitted Table back to its roots, i.e., a TableSource or a
> DataSet.
>
> This means, that if you create a Table from a TableSource and apply
> multiple filters on it and write each filter to a TableSink, the CSV file
> will be read 10 times, filtered 10 times and written 10 times. This is not
> efficient, because, you could also just read the file once and apply all
> filters in parallel.
> You can do this by converting the Table that you read with a TableSource
> into a DataSet and register the DataSet again as a Table. In that case, the
> translations of all TableSinks will stop at the DataSet and not include the
> TableSource which reads the file.
>
> The following figures illustrate the difference:
>
> 1) Without DataSet in the middle:
>
> TableSource -> Filter1 -> TableSink1
> TableSource -> Filter2 -> TableSink2
> TableSource -> Filter3 -> TableSink3
>
> 2) With DataSet in the middle:
>
> /-> Filter1 -> TableSink1
> TableSource -<-> Filter2 -> TableSink2
> \-> Filter3 -> TableSink3
>
> I'll likely add a feature to internally translate an intermediate Table to
> make this a bit easier.
> The underlying problem is that the SQL optimizer cannot translate queries
> with multiple sinks.
> Instead, each sink is individually translated and the optimizer does not
> know that common execution paths could be shared.
>
> Best,
> Fabian
>
>
> 2018-02-19 2:19 GMT+01:00 Darshan Singh :
>
>> Thanks for reply.
>>
>> I guess I am not looking for alternate. I am trying to understand what
>> flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
>> they will be reading csv as there is no other way.
>>
>> Thanks
>>
>> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman 
>> wrote:
>>
>>>
>>> Do you really need the large single table created in step 2?
>>>
>>> If not, what you typically do is that the Csv source first do the common
>>> transformations. Then depending on whether the 10 outputs have different
>>> processing paths or not, you either do a split() to do individual
>>> processing depending on some criteria, or you just have the sink put each
>>> record in separate tables.
>>> You have full control, at each step along the transformation path
>>> whether it can be parallelized or not, and if there are no sequential
>>> constraints on your model, then you can easily fill all cores on all hosts
>>> quite easily.
>>>
>>> Even if you need the step 2 table, I would still just treat that as a
>>> split(), a branch ending in a Sink that does the storage there. No need to
>>> read records from file over and over again, nor to store them first in step
>>> 2 table and read them out again.
>>>
>>> Don't ask *me* about what happens in failure scenarios... I have myself
>>> not figured that out yet.
>>>
>>> HTH
>>> Niclas
>>>
>>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh 
>>> wrote:
>>>
 Hi I would like to understand the execution model.

 1. I have a csv files which is say 10 GB.
 2. I created a table f

Re: Correlation between number of operators and Job manager memory requirements

2018-02-19 Thread Shailesh Jain
Actually, there are too many hyperparameters to experiment with, that is
why I'm trying to understand if there is any particular way in which a
cluster could be benchmarked.

Another strange behaviour I am observing is: Delaying the operator creation
(by distributing the operators across jobs, and submitting multiple jobs to
the same cluster instead of one) is helping in creating more operators. Any
ideas on why that is happening?

Shailesh

On Sun, Feb 18, 2018 at 11:16 PM, Pawel Bartoszek <
pawelbartosze...@gmail.com> wrote:

> Hi,
>
> You could definitely try to find formula for heap size, but isnt's it
> easier just to try out different memory settings and see which works best
> for you?
>
> Thanks,
> Pawel
>
> 17 lut 2018 12:26 "Shailesh Jain" 
> napisał(a):
>
> Oops, hit send by mistake.
>
> In the configuration section, it is mentioned that for "many operators"
> heap size should be increased.
>
> "JVM heap size (in megabytes) for the JobManager. You may have to increase
> the heap size for the JobManager if you are running very large applications
> (with many operators), or if you are keeping a long history of them."
>
> Is there any recommendation on the heap space required when there are
> around 200 CEP operators, and close 80 Filter operators?
>
> Any other leads on calculating the expected heap space allocation to start
> the job would be really helpful.
>
> Thanks,
> Shailesh
>
>
>
> On Sat, Feb 17, 2018 at 5:53 PM, Shailesh Jain <
> shailesh.j...@stellapps.com> wrote:
>
>> Hi,
>>
>> I have flink job with almost 300 operators, and every time I'm trying to
>> submit the job, the cluster crashes with OutOfMemory exception.
>>
>> I have 1 job manager and 1 task manager with 2 GB heap space allocated to
>> both.
>>
>> In the configuration section of the documentation
>>
>>
>>
>>
>
>


Re: [ANNOUNCE] Apache Flink 1.4.1 released

2018-02-19 Thread Stephan Ewen
Great, thanks a lot for being the release manager, Gordon!

On Fri, Feb 16, 2018 at 12:54 AM, Hao Sun  wrote:

> This is great!
>
> On Thu, Feb 15, 2018 at 2:50 PM Bowen Li  wrote:
>
>> Congratulations everyone!
>>
>> On Thu, Feb 15, 2018 at 10:04 AM, Tzu-Li (Gordon) Tai <
>> tzuli...@apache.org> wrote:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.4.1, which is the first bugfix release for the Apache Flink
>>> 1.4 series.
>>>
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>>
>>> The release is available for download at:
>>>
>>> https://flink.apache.org/downloads.html
>>>
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>>
>>> https://flink.apache.org/news/2018/02/15/release-1.4.1.html
>>>
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?
>>> projectId=12315522&version=12342212
>>>
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>>
>>> Cheers,
>>>
>>> Gordon
>>>
>>>
>>


Re: Need to understand the execution model of the Flink

2018-02-19 Thread Darshan Singh
Thanks Fabian for such detailed explanation.

I am using a datset in between so i guess csv is read once. Now to my real
issue i have 6 task managers each having 4 cores and i have 2 slots per
task manager.

Now my csv file is jus 1 gb and i create table and transform to dataset and
then run 15 different filters and extra processing which all run in almost
parallel.

However it fails with error no space left on device on one of the task
manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
it is running out of space. I do use some joins with othrr tables but those
are few megabytes.

So i was assuming that somehow all parallel executions were storing data in
/tmp and were filling it.

So i would like to know wht could be filling space.

Thanks

On 19 Feb 2018 10:10 am, "Fabian Hueske"  wrote:

Hi,

this works as follows.

- Table API and SQL queries are translated into regular DataSet jobs
(assuming you are running in a batch ExecutionEnvironment).
- A query is translated into a sequence of DataSet operators when you 1)
transform the Table into a DataSet or 2) write it to a TableSink. In both
cases, the optimizer is invoked and recursively goes back from the
converted/emitted Table back to its roots, i.e., a TableSource or a
DataSet.

This means, that if you create a Table from a TableSource and apply
multiple filters on it and write each filter to a TableSink, the CSV file
will be read 10 times, filtered 10 times and written 10 times. This is not
efficient, because, you could also just read the file once and apply all
filters in parallel.
You can do this by converting the Table that you read with a TableSource
into a DataSet and register the DataSet again as a Table. In that case, the
translations of all TableSinks will stop at the DataSet and not include the
TableSource which reads the file.

The following figures illustrate the difference:

1) Without DataSet in the middle:

TableSource -> Filter1 -> TableSink1
TableSource -> Filter2 -> TableSink2
TableSource -> Filter3 -> TableSink3

2) With DataSet in the middle:

/-> Filter1 -> TableSink1
TableSource -<-> Filter2 -> TableSink2
\-> Filter3 -> TableSink3

I'll likely add a feature to internally translate an intermediate Table to
make this a bit easier.
The underlying problem is that the SQL optimizer cannot translate queries
with multiple sinks.
Instead, each sink is individually translated and the optimizer does not
know that common execution paths could be shared.

Best,
Fabian


2018-02-19 2:19 GMT+01:00 Darshan Singh :

> Thanks for reply.
>
> I guess I am not looking for alternate. I am trying to understand what
> flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
> they will be reading csv as there is no other way.
>
> Thanks
>
> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman 
> wrote:
>
>>
>> Do you really need the large single table created in step 2?
>>
>> If not, what you typically do is that the Csv source first do the common
>> transformations. Then depending on whether the 10 outputs have different
>> processing paths or not, you either do a split() to do individual
>> processing depending on some criteria, or you just have the sink put each
>> record in separate tables.
>> You have full control, at each step along the transformation path whether
>> it can be parallelized or not, and if there are no sequential constraints
>> on your model, then you can easily fill all cores on all hosts quite easily.
>>
>> Even if you need the step 2 table, I would still just treat that as a
>> split(), a branch ending in a Sink that does the storage there. No need to
>> read records from file over and over again, nor to store them first in step
>> 2 table and read them out again.
>>
>> Don't ask *me* about what happens in failure scenarios... I have myself
>> not figured that out yet.
>>
>> HTH
>> Niclas
>>
>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh 
>> wrote:
>>
>>> Hi I would like to understand the execution model.
>>>
>>> 1. I have a csv files which is say 10 GB.
>>> 2. I created a table from this file.
>>>
>>> 3. Now I have created filtered tables on this say 10 of these.
>>> 4. Now I created a writetosink for all these 10 filtered tables.
>>>
>>> Now my question is that are these 10 filetered tables be written in
>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>>>
>>> Next question I have is that the table which I created form the csv file
>>> which is common wont be persisted by flink internally rather for all 10
>>> filtered tables it will read csv files and then apply the filter and write
>>> to sink.
>>>
>>> I think that for all 10 filtered tables it will read csv again and again
>>> in this case it will be read 10 times.  Is my understanding correct or I am
>>> missing something.
>>>
>>> What if I step 2 I change table to dataset and back?
>>>
>>> Thanks
>>>
>>
>>
>>
>> --
>> Niclas Hedhman, Software Developer
>>

Re: Need to understand the execution model of the Flink

2018-02-19 Thread Fabian Hueske
Hi,

this works as follows.

- Table API and SQL queries are translated into regular DataSet jobs
(assuming you are running in a batch ExecutionEnvironment).
- A query is translated into a sequence of DataSet operators when you 1)
transform the Table into a DataSet or 2) write it to a TableSink. In both
cases, the optimizer is invoked and recursively goes back from the
converted/emitted Table back to its roots, i.e., a TableSource or a
DataSet.

This means, that if you create a Table from a TableSource and apply
multiple filters on it and write each filter to a TableSink, the CSV file
will be read 10 times, filtered 10 times and written 10 times. This is not
efficient, because, you could also just read the file once and apply all
filters in parallel.
You can do this by converting the Table that you read with a TableSource
into a DataSet and register the DataSet again as a Table. In that case, the
translations of all TableSinks will stop at the DataSet and not include the
TableSource which reads the file.

The following figures illustrate the difference:

1) Without DataSet in the middle:

TableSource -> Filter1 -> TableSink1
TableSource -> Filter2 -> TableSink2
TableSource -> Filter3 -> TableSink3

2) With DataSet in the middle:

/-> Filter1 -> TableSink1
TableSource -<-> Filter2 -> TableSink2
\-> Filter3 -> TableSink3

I'll likely add a feature to internally translate an intermediate Table to
make this a bit easier.
The underlying problem is that the SQL optimizer cannot translate queries
with multiple sinks.
Instead, each sink is individually translated and the optimizer does not
know that common execution paths could be shared.

Best,
Fabian

2018-02-19 2:19 GMT+01:00 Darshan Singh :

> Thanks for reply.
>
> I guess I am not looking for alternate. I am trying to understand what
> flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
> they will be reading csv as there is no other way.
>
> Thanks
>
> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman 
> wrote:
>
>>
>> Do you really need the large single table created in step 2?
>>
>> If not, what you typically do is that the Csv source first do the common
>> transformations. Then depending on whether the 10 outputs have different
>> processing paths or not, you either do a split() to do individual
>> processing depending on some criteria, or you just have the sink put each
>> record in separate tables.
>> You have full control, at each step along the transformation path whether
>> it can be parallelized or not, and if there are no sequential constraints
>> on your model, then you can easily fill all cores on all hosts quite easily.
>>
>> Even if you need the step 2 table, I would still just treat that as a
>> split(), a branch ending in a Sink that does the storage there. No need to
>> read records from file over and over again, nor to store them first in step
>> 2 table and read them out again.
>>
>> Don't ask *me* about what happens in failure scenarios... I have myself
>> not figured that out yet.
>>
>> HTH
>> Niclas
>>
>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh 
>> wrote:
>>
>>> Hi I would like to understand the execution model.
>>>
>>> 1. I have a csv files which is say 10 GB.
>>> 2. I created a table from this file.
>>>
>>> 3. Now I have created filtered tables on this say 10 of these.
>>> 4. Now I created a writetosink for all these 10 filtered tables.
>>>
>>> Now my question is that are these 10 filetered tables be written in
>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>>>
>>> Next question I have is that the table which I created form the csv file
>>> which is common wont be persisted by flink internally rather for all 10
>>> filtered tables it will read csv files and then apply the filter and write
>>> to sink.
>>>
>>> I think that for all 10 filtered tables it will read csv again and again
>>> in this case it will be read 10 times.  Is my understanding correct or I am
>>> missing something.
>>>
>>> What if I step 2 I change table to dataset and back?
>>>
>>> Thanks
>>>
>>
>>
>>
>> --
>> Niclas Hedhman, Software Developer
>> http://polygene.apache.org - New Energy for Java
>>
>
>


Re: Manipulating Processing elements of Network Buffers

2018-02-19 Thread Till Rohrmann
Hi Max,

the network buffer order is quite important at the moment, because the
network stream does not only transport data but also control events such as
the checkpoint barriers. In order to guarantee that you don't lose data in
case of a failure it is (at the moment) strictly necessary that checkpoint
barriers don't overtake data records for example. Moreover, records might
span multiple memory buffers if they are large. Therefore, it might not be
all that useful to do this ordering on the network buffer level.

Instead, what you can always do is to sort elements in your user function.
The price you have to pay for this is that you have to buffer elements in
between and also checkpoint them.

Cheers,
Till

On Thu, Feb 15, 2018 at 3:13 PM, m@xi  wrote:

> Hello Flinker!
>
> I know that one should set appropriately the number of Network Buffers (NB)
> that its Flink deployment will use. Except from that, I am wondering if one
> might change/manipulate the specific sequence of data records into the NB
> in
> order to optimize the performance of its application.
>
> For instance, lets assume that a NB has now 3 elements {a,b,c} in this
> specific order. The data is going be shipped to a taskmanager(s) for
> further
> processing etc etc. But maybe if the aforementioned elements where to be
> shipped in another order, e.g. {b,c,a} then a specific task would run
> faster.
>
> Is there any such way to manipulate the ordering in the NB or the ordering
> of the arrival of tuples at the input of an operator???
>
> Thanks in advance.
>
> Best,
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: How to find correct "imports"

2018-02-19 Thread Timo Walther

Hi Esa,

the easiest and recommended way is:

- Create your Flink project with the provided quickstart scripts [1]
- Visit the documentation about a feature you want to use. E.g. for the 
Table & SQL API [2]


Usually it is described which modules you need.

I hope this helps.

Regards,
Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/


Am 2/19/18 um 10:36 AM schrieb Niclas Hedhman:
It is called "declared dependencies", and Flink has a huge number of 
artifacts, and they have also changed name over time. But Maven 
Central provides a search facility.


Try 
http://search.maven.org/#search%7Cga%7C5%7Cg%3Aorg.apache.flink%20AND%20v%3A1.4.0 



And it will give you all artifacts from Flink 1.4.0

On Mon, Feb 19, 2018 at 4:56 PM, Esa Heikkinen 
mailto:esa.heikki...@student.tut.fi>> 
wrote:


Hi

I am quite new with Flink and Scala. I have had a bit of trouble
finding corrects “imports”.

What would be the best way to find them ?

For example the imports for StreamTableEnvironment and CsvTableSource.

And how do I know if I should put something pom.xml ?

Esa




--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java





Re: Retrieve written records of a sink after job

2018-02-19 Thread Flavio Pompermaier
Hi Fabian,
thanks for the feedback. Right now I'm doing exactly as you said.
Since I was seeing this as a useful API extension I just proposed this
addition and so I asked for feedbacks.
Of course, it doesn't make much sense if I'm the only one asking for it :)

Best,
Flavio

On Mon, Feb 19, 2018 at 10:31 AM, Fabian Hueske  wrote:

> Hi Flavio,
>
> Not sure if I would add this functionality to the sinks.
> You could also add a MapFunction with a counting Accumulator right before
> each sink.
>
> Best, Fabian
>
>
> 2018-02-14 14:11 GMT+01:00 Flavio Pompermaier :
>
>> So, if I'm not wrong, the right way to do this is using
>> accumulators..what do you think about my proposal to add an easy way to add
>> to a sink an accumulator for the written/outputed records?
>>
>> On Wed, Feb 14, 2018 at 1:08 PM, Chesnay Schepler 
>> wrote:
>>
>>> Technically yes, a subset of metrics is stored in the ExecutionGraph
>>> when the job finishes. (This is for example where the webUI derives the
>>> values from for finished jobs). However these are on the task level, and
>>> will not contain the number of incoming records if your sink is chained to
>>> another operator. Changing this would be a larger endeavor, and tbh i don't
>>> see this happening soon.
>>>
>>> I'm afraid for now you're stuck with the REST API for finished jobs.
>>> (Correction for my previous mail: The metrics REST API cannot be used for
>>> finished jobs)
>>>
>>> Alternatively, if you rather want to work on files/json you can enable
>>> job archiving by configuring the jobmanager.archive.fs.dir directory.
>>> When the job finishes this will contain a big JSON file for each job
>>> containing all responses that the UI would return for finished jobs.
>>>
>>>
>>> On 14.02.2018 12:50, Flavio Pompermaier wrote:
>>>
>>> The problem here is that I don't know the vertex id of the sink..would
>>> it be possible to access the sink info by id?
>>> And couldn't be all those info attached to the JobExecutionResult
>>> (avoiding to set up all the rest connection etc)?
>>>
>>> On Wed, Feb 14, 2018 at 12:44 PM, Chesnay Schepler 
>>> wrote:
>>>
 The only way to access this info from the client is the REST API
 
 or the Metrics REST API
 .



 On 14.02.2018 12:38, Flavio Pompermaier wrote:

 Actually I'd like to get this number from my Java class in order to
 update some external dataset "catalog",
 so I'm asking if there's some programmatic way to access this info
 (from JobExecutionResult for example).

 On Wed, Feb 14, 2018 at 12:25 PM, Chesnay Schepler 
 wrote:

> Do you want to know how many records the sink received, or how many
> the sink wrote to the DB?
> If it's the first you're in luck because we measure that already,
> check out the metrics documentation.
> If it's the latter, then this issue is essentially covered by
> FLINK-7286 which aims at allowing functions
> to modify the numRecordsIn/numRecordsOut counts.
>
>
> On 14.02.2018 12:22, Flavio Pompermaier wrote:
>
> Hi to all,
> I have a (batch) job that writes to 1 or more sinks.
> Is there a way to retrieve, once the job has terminated, the number of
> records written to each sink?
> Is there any better way than than using an accumulator for each sink?
> If that is the only way to do that, the Sink API could be enriched in
> order to automatically create an accumulator when required. E.g.
>
> dataset.output(JDBCOutputFormat.buildJDBCOutputFormat()
> .setDrivername(...)
> .setDBUrl(...)
> .setQuery(...)
> *.addRecordsCountAccumulator("some-name")*
> .finish())
>
> Best,
> Flavio
>
>
>


 --
 Flavio Pompermaier
 Development Department

 OKKAM S.r.l.
 Tel. +(39) 0461 041809 <+39%200461%20041809>



>>>
>>>
>>> --
>>> Flavio Pompermaier
>>> Development Department
>>>
>>> OKKAM S.r.l.
>>> Tel. +(39) 0461 041809 <+39%200461%20041809>
>>>
>>>
>>>
>>
>>
>> --
>> Flavio Pompermaier
>> Development Department
>>
>> OKKAM S.r.l.
>> Tel. +(39) 0461 041809 <+39%200461%20041809>
>>
>
>


-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


Re: Architecture question

2018-02-19 Thread Fabian Hueske
Hi,

What you are looking for is a BucketingSink that works on event time (the
timestamp is encoded in your data).
AFAIK, Flink's BucketingSink has been designed to work in processing time,
but you can implement a Bucketer that creates buckets based on a timestamp
in the data.
You might need to play around with the parameters for closing open buckets
for a good behavior (similar to watermark tuning).

Best, Fabian

2018-02-14 22:18 GMT+01:00 robert :

> I need to grab avro data from a kafka topic and write to the local file
> system
>
> Inside the avro record there is a date time field. From that field I need
> to
> name the file accordingly. (20180103) as an example
>
>
> I was thinking of using flink to read, unpack this generic record then put
> to a sink that will sort to make sure it goes into the right file.
>
> Does anyhow have a high-level approach for this ?
>
> The bucketing sink look promising. Any examples of this type of problem for
> flink to solve ?
>
> Thanks
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: How to find correct "imports"

2018-02-19 Thread Niclas Hedhman
It is called "declared dependencies", and Flink has a huge number of
artifacts, and they have also changed name over time. But Maven Central
provides a search facility.

Try http://search.maven.org/#search%7Cga%7C5%7Cg%3Aorg.
apache.flink%20AND%20v%3A1.4.0

And it will give you all artifacts from Flink 1.4.0

On Mon, Feb 19, 2018 at 4:56 PM, Esa Heikkinen  wrote:

> Hi
>
>
>
> I am quite new with Flink and Scala. I have had a bit of trouble finding
> corrects “imports”.
>
> What would be the best way to find them ?
>
>
>
> For example the imports for StreamTableEnvironment and CsvTableSource.
>
>
>
> And how do I know if I should put something pom.xml ?
>
>
>
> Esa
>



-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Re: Retrieve written records of a sink after job

2018-02-19 Thread Fabian Hueske
Hi Flavio,

Not sure if I would add this functionality to the sinks.
You could also add a MapFunction with a counting Accumulator right before
each sink.

Best, Fabian


2018-02-14 14:11 GMT+01:00 Flavio Pompermaier :

> So, if I'm not wrong, the right way to do this is using accumulators..what
> do you think about my proposal to add an easy way to add to a sink an
> accumulator for the written/outputed records?
>
> On Wed, Feb 14, 2018 at 1:08 PM, Chesnay Schepler 
> wrote:
>
>> Technically yes, a subset of metrics is stored in the ExecutionGraph when
>> the job finishes. (This is for example where the webUI derives the values
>> from for finished jobs). However these are on the task level, and will not
>> contain the number of incoming records if your sink is chained to another
>> operator. Changing this would be a larger endeavor, and tbh i don't see
>> this happening soon.
>>
>> I'm afraid for now you're stuck with the REST API for finished jobs.
>> (Correction for my previous mail: The metrics REST API cannot be used for
>> finished jobs)
>>
>> Alternatively, if you rather want to work on files/json you can enable
>> job archiving by configuring the jobmanager.archive.fs.dir directory.
>> When the job finishes this will contain a big JSON file for each job
>> containing all responses that the UI would return for finished jobs.
>>
>>
>> On 14.02.2018 12:50, Flavio Pompermaier wrote:
>>
>> The problem here is that I don't know the vertex id of the sink..would it
>> be possible to access the sink info by id?
>> And couldn't be all those info attached to the JobExecutionResult
>> (avoiding to set up all the rest connection etc)?
>>
>> On Wed, Feb 14, 2018 at 12:44 PM, Chesnay Schepler 
>> wrote:
>>
>>> The only way to access this info from the client is the REST API
>>> 
>>> or the Metrics REST API
>>> .
>>>
>>>
>>>
>>> On 14.02.2018 12:38, Flavio Pompermaier wrote:
>>>
>>> Actually I'd like to get this number from my Java class in order to
>>> update some external dataset "catalog",
>>> so I'm asking if there's some programmatic way to access this info
>>> (from JobExecutionResult for example).
>>>
>>> On Wed, Feb 14, 2018 at 12:25 PM, Chesnay Schepler 
>>> wrote:
>>>
 Do you want to know how many records the sink received, or how many the
 sink wrote to the DB?
 If it's the first you're in luck because we measure that already, check
 out the metrics documentation.
 If it's the latter, then this issue is essentially covered by
 FLINK-7286 which aims at allowing functions
 to modify the numRecordsIn/numRecordsOut counts.


 On 14.02.2018 12:22, Flavio Pompermaier wrote:

 Hi to all,
 I have a (batch) job that writes to 1 or more sinks.
 Is there a way to retrieve, once the job has terminated, the number of
 records written to each sink?
 Is there any better way than than using an accumulator for each sink?
 If that is the only way to do that, the Sink API could be enriched in
 order to automatically create an accumulator when required. E.g.

 dataset.output(JDBCOutputFormat.buildJDBCOutputFormat()
 .setDrivername(...)
 .setDBUrl(...)
 .setQuery(...)
 *.addRecordsCountAccumulator("some-name")*
 .finish())

 Best,
 Flavio



>>>
>>>
>>> --
>>> Flavio Pompermaier
>>> Development Department
>>>
>>> OKKAM S.r.l.
>>> Tel. +(39) 0461 041809 <+39%200461%20041809>
>>>
>>>
>>>
>>
>>
>> --
>> Flavio Pompermaier
>> Development Department
>>
>> OKKAM S.r.l.
>> Tel. +(39) 0461 041809 <+39%200461%20041809>
>>
>>
>>
>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809 <+39%200461%20041809>
>


Discarding bad data in Stream

2018-02-19 Thread Niclas Hedhman
Hi again,

something that I don't find (easily) in the documentation is what the
recommended method is to discard data from the stream.

On one hand, I could always use flatMap(), even if it is "per message"
since that allows me to return zero or one objects.

DataStream stream =
env.addSource( source )
   .flatMap( new MyFunction() )


But that seems a bit misleading, as the casual observer will get the idea
that MyFunction 'branches' out, but it doesn't.

The other "obvious" choice is to return null and follow with a filter...

DataStream stream =
env.addSource( source )
   .map( new MyFunction() )
   .filter( Objects::nonNull )

BUT, that doesn't work with Java 8 method references like above, so I have
to create my own filter to get the type information correct to Flink;

DataStream stream =
env.addSource( source )
   .map( new MyFunction() )
   .filter( new DiscardNullFilter<>() )


And in my opinion, that ends up looking ugly as the streams/pipeline (not
used to terminology yet) quickly have many transformations and branches,
and having a null check after each seems to put the burden of knowledge in
the wrong spot ("Can this function return null?")

Throwing an exception is shutting down the entire stream, which seems
overly aggressive for many data related discards.

Any other choices?

Cheers
-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Re: CoProcess() VS union.Process() & Timers in them

2018-02-19 Thread Fabian Hueske
Changing the parallelism works in Flink by taking a savepoint, shutting
down the job, and restarting it from the savepoint with another parallelism.

The rescale() operator defines how records are exchanged between two
operators with different parallelism.
Rescale prefers local data exchange over uniform distribution (which would
be rebalance()).

For example if you have a pipeline A -rescale-> B, where operator A has 2
tasks and operator B 4 tasks, then A(1) would send data to B(1) and B(3)
and A(2) to B(2) and B(4).
Since A(1) / B(1) and A(2) / B(2) run on the same machine (unless
explicitly differently scheduled), the data exchange between them is local.

Best, Fabian

2018-02-13 16:22 GMT+01:00 m@xi :

> Thanks a lot Fabian and Xingcan!
>
> @ Fabian : Nice answer. It enhanced my intuition on the topic. So, how one
> may change the parallelism while the Flink job is running, e.g. lower the
> parallelism during the weekend?
>
>
> Also, it is not clear to me how to use the rescale() operator. If you may
> provide a more thorough example, cause the one in the documentation is not
> so good in my humble opinion. With some code/pseudo code, it would be
> great.
>
> Thanks in advance.
>
> Best,
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Only a single message processed

2018-02-19 Thread Fabian Hueske
Hi Niclas,

Glad that you got it working!
Thanks for sharing the problem and solution.

Best, Fabian

2018-02-19 9:29 GMT+01:00 Niclas Hedhman :

>
> (Sorry for the incoherent order and ramblings. I am writing this as I am
> trying to sort out what is going on...)
>
> 1. It is the first message to be processed in the Kafka topic. If I set
> the offset manually, it will pick up the message at that point, process it,
> and ignore all following messages.
>
> 2. Yes, the Kafka console consumer tool is spitting out the messages
> without problem. Btw, they are plain Strings, well, serialized JSON objects.
>
> 3. Code is a bit messy, but I have copied out the relevant parts below.
>
> I also noticed that a LOT of exceptions are thrown ("breakpoint on any
> exception"), mostly ClassCastException, classes not found and
> NoSuchMethodException, but nothing that bubbles up out of the internals. Is
> this part of Scala raping the JVM, or just the normal JVM class loading
> sequence (no wonder it is so slow)? Is that expected?
>
> I have tried to use both the ObjectMapper from Jackson proper, as well as
> the shadowed ObjectMapper in flink. No difference.
>
> Recap; Positioning Kafka consumer to message 8th from the last. Only that
> message is consumed, the remaining 7 are ignored/swallowed.
>
>
> Ok, so I think I have traced this down to something happening in the
> CassandraSink. There is a Exception being thrown somewhere, which I see as
> the Kafka09Fetcher.runFetchLoop()'s finally clause is called.
>
> Found it (hours later in debugging), on this line (Flink 1.4.1)
>
> org/apache/flink/cassandra/shaded/com/google/common/util/concurrent/Futures.class:258
>
> which contains
>
> future.addListener(callbackListener, executor);  // IDEA says 'future' is 
> of type DefaultResultSetFuture
>
> throws an Exception without stepping into the addListener() method. There
> is nothing catching the Exception (and I don't want to go down the rabbit
> hole of building from source), so I can't really say what Exception is
> being thrown. IDEA doesn't seem to report it, and the catch clauses in
> OperatorChain.pushToOperator() (ClassCastException and Exception) are in
> the call stack, but doesn't catch it, which could suggest an
> java.lang.Error, and NoClassDefFoundError comes to mind, since there are SO
> MANY classloading exception going on all the time.
>
> Hold on a second... There are TWO 
> com.datastax.driver.core.DefaultResultSetFuture
> types in the classpath. One from the Cassandra client that I declared, and
> on from inside the flink-connector-cassandra_2.11 artifact...
>
> So will it work if I remove my own dependency declaration and that's it?
>
>
> YES!!! FInally.
>
>
> SOLVED!
>
> -o-o-o-o-o-
>
> public static void main( String[] args )
> throws Exception
> {
> cli = CommandLine.populateCommand( new ServerCliOptions(), args );
> initializeCassandra( cli );
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment().setMaxParallelism( 32768 
> );
> //createPollDocPipeline( env );
> createAdminPipeline( env );
> env.execute( "schedule.poll" );
> }
>
>
>
> private static void createAdminPipeline( StreamExecutionEnvironment env )
> {
> try
> {
> FlinkKafkaConsumer011 adminSource = createKafkaAdminSource();
> SplitStream adminStream =
> env.addSource( adminSource )
>.name( "scheduler.admin" )
>.map( value -> {
>try
>{
>return mapper.readValue( value, AdminCommand.class );
>}
>catch( Throwable e )
>{
>LOG.error( "Unexpected error deserializing 
> AdminCommand", e );
>return null;
>}
>} )
>.name( "admin.command.read" )
>.split( value -> singletonList( value.action() ) );
>
> SingleOutputStreamOperator, String, String>> 
> insertStream =
> adminStream.select( AdminCommand.CMD_SCHEDULE_INSERT )
>.map( new GetPollDeclaration() )
>.name( "scheduler.admin.insert" )
>.map( new PollDeclarationToTuple3Map() )
>.name( "scheduler.pollDeclToTuple3" )
>.filter( tuple -> tuple != null );
>
> SingleOutputStreamOperator, String, String>> 
> deleteStream =
> adminStream.select( AdminCommand.CMD_SCHEDULE_DELETE )
>.map( new GetPollDeclaration() )
>.name( "scheduler.admin.delete" )
>.map( new PollDeclarationToTuple3Map() )
>.name( "scheduler.pollDeclToTuple3" )
>.filter( tuple -> tuple != null );
>
> CassandraSink.addSink( insertStream )
>  .setHost( cli.prima

How to find correct "imports"

2018-02-19 Thread Esa Heikkinen
Hi

I am quite new with Flink and Scala. I have had a bit of trouble finding 
corrects "imports".
What would be the best way to find them ?

For example the imports for StreamTableEnvironment and CsvTableSource.

And how do I know if I should put something pom.xml ?

Esa


Re: A "per operator instance" window all ?

2018-02-19 Thread Julien

Hello,

I've already tried to key my stream with 
"resourceId.hashCode%parallelism" (with parallelism of 4 in my example).
So all my keys will be either 0,1, 2 or 3. I can then benefit from a 
time window on this keyed stream and do only 4 queries to my external 
system.
But it is not well distributed with the default partitioner on keyed 
stream. (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).


I think I should explore the customer partitioner, as you suggested Xingcan.
Maybe my last question on this will be: "can you give me more details on 
this point "and simulate a window operation by yourself in a 
ProcessFunction" ?


When I look at the documentation about the custom partitioner, I can see 
that the result of partitionCustom is a DataStream.

It is not a KeyedStream.
So the only window I have will be windowAll (which will bring me back to 
a parallelism of 1, no ?).


And if I do something like "myStream.partitionCustom(partitioner>,).keyBy().window(...)", will it preserve my 
custom partitioner ?
When looking at the "KeyedStream" class, it seems that it will go back 
to the "KeyGroupStreamPartitioner" and forget my custom partitioner ?


Thanks again for your feedback,

Julien.


On 19/02/2018 03:45, 周思华 wrote:

Hi Julien,
    If I am not misunderstand, I think you can key your stream on a 
`Random.nextInt() % parallesm`, this way  you can "group" together 
alerts from different and benefit from multi parallems.



发自网易邮箱大师

On 02/19/2018 09:08,Xingcan Cui 
 wrote:


Hi Julien,

sorry for my misunderstanding before. For now, the window can only
be defined on a KeyedStream or an ordinary DataStream but with
parallelism = 1. I’d like to provide three options for your scenario.

1. If your external data is static and can be fit into the memory,
you can use ManagedStates


 to
cache them without considering the querying problem.
2. Or you can use a CustomPartitioner


 to
manually distribute your alert data and simulate an window
operation by yourself in a ProcessFuncton.
3. You may also choose to use some external systems such as
in-memory store, which can work as a cache for your queries.

Best,
Xingcan


On 19 Feb 2018, at 5:55 AM, Julien mailto:jmassio...@gmail.com>> wrote:

Hi Xingcan,

Thanks for your answer.
Yes, I understand that point:

  * if I have 100 resource IDs with parallelism of 4, then each
operator instance will handle about 25 keys


The issue I have is that I want, on a given operator instance, to
group those 25 keys together in order to do only 1 query to an
external system per operator instance:

  * on a given operator instance, I will do 1 query for my 25 keys
  * so with the 4 operator instances, I will do 4 query in
parallel (with about 25 keys per query)


I do not know how I can do that.

If I define a window on my keyed stream (with for example

/stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
/then my understanding is that the window is "associated" to the
key. So in this case, on a given operator instance, I will have
25 of those windows (one per key), and I will do 25 queries
(instead of 1).

Do you understand my point ?
Or maybe am I missing something ?

I'd like to find a way on operator instance 1 to group all the
alerts received on those 25 resource ids and do 1 query for those
25 resource ids.
Same thing for operator instance 2, 3 and 4.


Thank you,
Regards.


On 18/02/2018 14:43, Xingcan Cui wrote:

Hi Julien,

the cardinality of your keys (e.g., resource ID) will not be
restricted to the parallelism. For instance, if you have 100
resource IDs processed by KeyedStream with parallelism 4, each
operator instance will handle about 25 keys.

Hope that helps.

Best,
Xingcan


On 18 Feb 2018, at 8:49 PM, Julien mailto:jmassio...@gmail.com>> wrote:

Hi,

I am pretty new to flink and I don't know what will be the best
way to deal with the following use case:

  * as an input, I recieve some alerts from a kafka topic
  o an alert is linked to a network resource (like
router-1, router-2, switch-1, switch-2, ...)
  o so an alert has two main information (the alert id and
the resource id of the resource on which this alert has
been raised)
  * then I need to do a query to an external system in order to
enrich the alert with additional information on the resource


(A "natural" candidate for the key on this stream will be the
resource id)

The issue I have is that regarding the query to t

Re: Only a single message processed

2018-02-19 Thread Niclas Hedhman
(Sorry for the incoherent order and ramblings. I am writing this as I am
trying to sort out what is going on...)

1. It is the first message to be processed in the Kafka topic. If I set the
offset manually, it will pick up the message at that point, process it, and
ignore all following messages.

2. Yes, the Kafka console consumer tool is spitting out the messages
without problem. Btw, they are plain Strings, well, serialized JSON objects.

3. Code is a bit messy, but I have copied out the relevant parts below.

I also noticed that a LOT of exceptions are thrown ("breakpoint on any
exception"), mostly ClassCastException, classes not found and
NoSuchMethodException, but nothing that bubbles up out of the internals. Is
this part of Scala raping the JVM, or just the normal JVM class loading
sequence (no wonder it is so slow)? Is that expected?

I have tried to use both the ObjectMapper from Jackson proper, as well as
the shadowed ObjectMapper in flink. No difference.

Recap; Positioning Kafka consumer to message 8th from the last. Only that
message is consumed, the remaining 7 are ignored/swallowed.


Ok, so I think I have traced this down to something happening in the
CassandraSink. There is a Exception being thrown somewhere, which I see as
the Kafka09Fetcher.runFetchLoop()'s finally clause is called.

Found it (hours later in debugging), on this line (Flink 1.4.1)

org/apache/flink/cassandra/shaded/com/google/common/util/concurrent/Futures.class:258

which contains

future.addListener(callbackListener, executor);  // IDEA says
'future' is of type DefaultResultSetFuture

throws an Exception without stepping into the addListener() method. There
is nothing catching the Exception (and I don't want to go down the rabbit
hole of building from source), so I can't really say what Exception is
being thrown. IDEA doesn't seem to report it, and the catch clauses in
OperatorChain.pushToOperator() (ClassCastException and Exception) are in
the call stack, but doesn't catch it, which could suggest an
java.lang.Error, and NoClassDefFoundError comes to mind, since there are SO
MANY classloading exception going on all the time.

Hold on a second... There are TWO
com.datastax.driver.core.DefaultResultSetFuture types in the classpath. One
from the Cassandra client that I declared, and on from inside the
flink-connector-cassandra_2.11 artifact...

So will it work if I remove my own dependency declaration and that's it?


YES!!! FInally.


SOLVED!

-o-o-o-o-o-

public static void main( String[] args )
throws Exception
{
cli = CommandLine.populateCommand( new ServerCliOptions(), args );
initializeCassandra( cli );
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setMaxParallelism(
32768 );
//createPollDocPipeline( env );
createAdminPipeline( env );
env.execute( "schedule.poll" );
}



private static void createAdminPipeline( StreamExecutionEnvironment env )
{
try
{
FlinkKafkaConsumer011 adminSource = createKafkaAdminSource();
SplitStream adminStream =
env.addSource( adminSource )
   .name( "scheduler.admin" )
   .map( value -> {
   try
   {
   return mapper.readValue( value, AdminCommand.class );
   }
   catch( Throwable e )
   {
   LOG.error( "Unexpected error deserializing
AdminCommand", e );
   return null;
   }
   } )
   .name( "admin.command.read" )
   .split( value -> singletonList( value.action() ) );

SingleOutputStreamOperator, String,
String>> insertStream =
adminStream.select( AdminCommand.CMD_SCHEDULE_INSERT )
   .map( new GetPollDeclaration() )
   .name( "scheduler.admin.insert" )
   .map( new PollDeclarationToTuple3Map() )
   .name( "scheduler.pollDeclToTuple3" )
   .filter( tuple -> tuple != null );

SingleOutputStreamOperator, String,
String>> deleteStream =
adminStream.select( AdminCommand.CMD_SCHEDULE_DELETE )
   .map( new GetPollDeclaration() )
   .name( "scheduler.admin.delete" )
   .map( new PollDeclarationToTuple3Map() )
   .name( "scheduler.pollDeclToTuple3" )
   .filter( tuple -> tuple != null );

CassandraSink.addSink( insertStream )
 .setHost( cli.primaryCassandraHost(),
cli.primaryCassandraPort() )
 .setQuery( String.format( INSERT_SCHEDULE,
cli.cassandraKeyspace ) )
 .build();

CassandraSink.addSink( deleteStream )
 .setHost( cli.primaryCassandraHost(),
cli.primaryCassandraPort() )
 .setQuery( String.format( DELETE_SCHEDULE,