Re: Queryable state and TTL

2019-07-06 Thread Eron Wright
Here's a PR for queryable state TLS that I closed because I didn't have
time, and because I get the impression that the queryable state feature is
used very often.Feel free to take it up, if you like.
https://github.com/apache/flink/pull/6626

-Eron

On Wed, Jul 3, 2019 at 11:21 PM Avi Levi  wrote:

> Hi Yu,
> Our sink is actually Kafka hence we cannot query it properly, from there
> we distribute it to different consumers. We keep info in our state such as
> entry time, some accumulated data etc' , this data is not kept elsewhere
> hence we need to query our state.
>
> Best regards
> Avi
>
>
> On Thu, Jul 4, 2019 at 7:20 AM Yu Li  wrote:
>
>> *This Message originated outside your organization.*
>> --
>> Thanks for the ping Andrey.
>>
>> For me the general answer is yes, but TBH it will probably not be added
>> in the foreseeable future due to lack of committer bandwidth (not only
>> QueryableState with TTL but all about QueryableState module) as per
>> Aljoscha pointed out in another thread [1].
>>
>> Although we could see emerging requirements and proposals on
>> QueryableState recently, prioritizing is important for each open source
>> project. And personally I think it may help if we could gather more and
>> clearly describe the other-than-debugging use cases of QueryableState in
>> production [2]. Could you share your case with us and why QueryableState is
>> necessary rather than querying the data from sink @Avi? Thanks.
>>
>> [1] https://s.apache.org/MaOl
>> [2] https://s.apache.org/hJDA
>>
>> Best Regards,
>> Yu
>>
>>
>> On Wed, 3 Jul 2019 at 23:13, Andrey Zagrebin 
>> wrote:
>>
>>> Hi Avi,
>>>
>>> It is on the road map but I am not aware about plans of any contributor
>>> to work on it for the next releases.
>>> I think the community will firstly work on the event time support for
>>> TTL.
>>> I will loop Yu in, maybe he has some plans to work on TTL for the
>>> queryable state.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Jul 3, 2019 at 3:17 PM Avi Levi  wrote:
>>>
 Hi,
 Adding queryable state to state with ttl is not supported at 1.8.0
 (throwing java.lang.IllegalArgumentException: Queryable state is currently
 not supported with TTL)

 I saw in previous mailing thread
 that
 it is on the roadmap. Is it still on the roadmap ?

 * There is a workaround which is using timers to clear the state, but
 in our case, it means firing billons of timers on daily basis all at the
 same time, which seems no to very efficient and might cause some resources
 issues

 Cheers
 Avi





Re: Problem with querying state on Flink 1.6.

2018-08-30 Thread Eron Wright
I took a brief look as to why the queryable state server would bind to the
loopback address.   Both the qs server and the
org.apache.flink.runtime.io.network.netty.NettyServer do bind the local
address based on the TM address.  That address is based on the
"taskmanager.hostname" configuration override and, by default, the
RpcService address.

A possible explanation is that, on Joe's machine, Java's
`InetAddress.getLocalHost()` resolves to the loopback address.  I believe
there's some variation in Java's behavior in that regard.

Hope this helps!

On Thu, Aug 30, 2018 at 1:27 AM Till Rohrmann  wrote:

> Hi Joe,
>
> it looks as if the queryable state server binds to the local loopback
> address. This looks like a bug to me. Could you maybe share the complete
> cluster entrypoint and the task manager logs with me?
>
> In the meantime you could try to do the following: Change
> AbstractServerBase.java:227 into `.localAddress(port)`. This should bind to
> any local address. Now you need to build your own Flink distribution by
> running `mvn clean package -DskipTests` and then go to either build-target
> or flink-dist/target/flink-1.7-SNAPSHOT-bin/flink-1.7-SNAPSHOT to find the
> distribution.
>
> Cheers,
> Till
>
> On Thu, Aug 30, 2018 at 12:12 AM Joe Olson  wrote:
>
>> I'm having a problem with querying state on Flink 1.6.
>>
>> I put a project in Github that is my best representation of the very
>> simple client example outlined in the 'querying state' section of the 1.6
>> documentation at
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
>> . The Github project is at https://github.com/jolson787/qs
>>
>> My problem: I know the query server and proxy server have started on my 1
>> job manager / 1 task manager Flink 1.6 test rig, because I see the 'Started
>> Queryable State Server' and 'Started Queryable State Proxy Server' in the
>> task manager logs. I know the ports are open on the local machine, because
>> I can telnet to them.
>>
>> From a remote machine, I implemented the QueryableStateClient as in the
>> example, and made a getKVState call. Nothing I seem to do between that or
>> the getKVstate call seems to register...no response, no errors thrown, no
>> lines in the log, no returned futures, no timeouts, etc. I know the proxy
>> server and state server ports are NOT open to the remote machine, yet the
>> client still doesn't seem to react.
>>
>> Can someone take a quick look at my very simple Github project and see if
>> anything jumps out at them? Beer is on me at Flink Forward if someone can
>> help me work through this
>>
>


Re: coordinate watermarks between jobs?

2018-05-04 Thread Eron Wright
It might be possible to apply backpressure to the channels that are
significantly ahead in event time.  Tao, it would not be trivial, but if
you'd like to investigate more deeply, take a look at the Flink runtime's
`StatusWatermarkValve` and the associated stream input processors to see
how an operator integrates incoming watermarks.   A key challenge would be
to apply backpressure to the upstream channel for reasons other than the
availability of network buffers.  Take a look at FLINK-7282 which
introduced a credit system that may be useful here.

On Fri, May 4, 2018 at 10:07 AM, Tao Xia  wrote:

> Without throttle, it will eventually ran out of memory.
> I think this is a very common use case for Flink users during stream
> replay or re-process.
> Do we have anything feature planed for it? Would like to contribute on the
> initiative.
>
> On Wed, May 2, 2018 at 2:43 AM, Fabian Hueske  wrote:
>
>> Hi Tao,
>>
>> The watermarks of operators that consume from two (or more) streams are
>> always synced to the lowest watermark.
>> This behavior guarantees that data won't be late (unless it was late when
>> watermarks were assigned). However, the operator will most likely need to
>> buffer more events from the "faster" streams.
>>
>> Right now, it is not possible to throttle faster streams to the pace of
>> the slowest stream.
>>
>> Best, Fabian
>>
>> 2018-04-27 1:05 GMT+02:00 Tao Xia :
>>
>>> Hi All,
>>>   I am trying to reply events from 3 different sources and hopefully in
>>> time sequence, say Stream1, Stream2, Stream3. Since their size vary a lot,
>>> the watermarks on one stream is much faster than other streams.  Is there
>>> any way to coordinate the watermarks between different input streams.
>>> Thanks,
>>> Tao
>>>
>>
>>
>


Re: Issue in Flink/Zookeeper authentication via Kerberos

2018-04-15 Thread Eron Wright
I believe that the solution here is to ensure that the znodes created by
Flink have an ACL that allows access only to the original creator.   For
example, if a given Flink job has a Kerberos identity of "us...@example.com",
it should set the znode ACL appropriately to disallow access to any client
that doesn't successfully authenticate as that user.  This may be
accomplished with the following Flink configuration setting:

high-availability.zookeeper.client.acl: creator

Some code links:
-
https://github.com/apache/flink/blob/release-1.4.2/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java#L171
-
https://github.com/apache/flink/blob/release-1.4.2/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L93

Hope this helps!
Eron

On Sun, Apr 15, 2018 at 2:16 AM, Sahu, Sarthak 1. (Nokia - IN/Bangalore) <
sarthak.1.s...@nokia.com> wrote:

> Glad to get the reply. With wrong Kerberos information I am expecting an
> ‘access denied’.
>
>
>
> As per flink log, it clear states that authentication failed due to
> Kerberos wrong information and trying to connect with zookeeper with
> unauthorised mode if zookeeper allows.
>
> And then it connected successfully!
>
>
>
> Do I missing any configuration in flink/zookeeper side.
>
> Expecting you suggestion here.
>
>
>
> Regards
>
> Sarthak Sahu
>
>
>
> *From:* Eron Wright [mailto:eronwri...@gmail.com]
> *Sent:* Tuesday, April 3, 2018 3:07 AM
> *To:* Sahu, Sarthak 1. (Nokia - IN/Bangalore) <sarthak.1.s...@nokia.com>
> *Cc:* suez1...@gmail.com; Timo Walther <twal...@apache.org>
>
> *Subject:* Re: Issue in Flink/Zookeeper authentication via Kerberos
>
>
>
> Hello, I'm happy to help.  Could you elaborate on the issue that you see?
> Are you saying that you expect to get 'access denied' but Zookeeper is
> allowing the connection anyway?
>
>
>
> My first thought is, maybe ZK allows unauthenticated connections but
> relies on the authorization layer to deny access to nodes based on the
> ACL.   FLink has a configuration setting to set the 'owner' of the znode.
>
>
>
> -Eron
>
>
>
> On Mon, Apr 2, 2018 at 1:50 AM, Sahu, Sarthak 1. (Nokia - IN/Bangalore) <
> sarthak.1.s...@nokia.com> wrote:
>
> Hi Eron/Shuyi
>
>
>
> Could you please help me on this below issue.
>
>
>
> Regards
>
> Sarthak Sahu
>
>
>
> *From:* Timo Walther [mailto:twal...@apache.org]
> *Sent:* Monday, March 26, 2018 3:05 PM
> *To:* user@flink.apache.org
> *Cc:* eronwri...@gmail.com; suez1...@gmail.com
> *Subject:* Re: Issue in Flink/Zookeeper authentication via Kerberos
>
>
>
> Hi Sarthak,
>
> I'm not a Kerberos expert but maybe Eron or Shuyi are more familiar with
> the details?
>
> Would be great if somebody could help.
>
> Thanks,
> Timo
>
> Am 22.03.18 um 10:16 schrieb Sahu, Sarthak 1. (Nokia - IN/Bangalore):
>
> Hi Folks,
>
>
>
>   *Environment Setup:*
>
>1. I have configured KDC 5 server.
>2. Configured Kerberos in zookeeper-3.4.10 wherein I can able to
>connect ZooKeeper Server/Client via Kerberos authentication.
>3. Now flink-1.4.0 has configured for Kerberos authentication as per
>below instruction.
>
> ·   https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/config.html#kerberos-based-security
>
> ·   https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/config.html#kerberos-based-security-1
>
>   *Success Scenario:*
>
>1. All Kerberos configuration parameter is correct and flink/zookeeper
>able to connect trough TGT.
>
>  *Problem:*
>
>1. Even if wrong Kerberos credentials given, flink able to connect
>ZooKeeper.
>
>
>
> Please find the taskmanager/jobmanger logs and flink config file for both
> scenario attached.
>
>
>
> Hoping for quick resolution.
>
>
>
> Regards
>
> Sarthak Sahu
>
>
>
>
>
>
>


Re: Far too few watermarks getting generated with Kafka source

2018-01-22 Thread Eron Wright
I think there's a misconception about `setAutowatermarkInterval`.   It
establishes the rate at which your periodic watermark generator is polled
for the current watermark.   Like most generators,
`BoundedOutOfOrdernessTimestampExtractor` produces a watermark based solely
on observed elements.   Therefore, `setAutowatermarkInterval` does not
compensate for idle sources (see FLINK-5479 and FLINK-5018).

Keep in mind that sources do not re-order emitted elements into event time
order; depending on the source's internals, it might emit elements in a
highly unordered fashion with respect to event time.   For example, the
Kafka consumer processes elements across numerous partitions
simultaneously, and the resultant ordering is highly variable.   When you
use the generic `assignTimestampsAndWatermarks` facility, the assigner is
challenged to make sense of this multiplexed stream of elements.   For this
reason, I would strongly suggest you make use of the Kafka consumer's
support for per-partition assigners, to be able to reason about the
progression of time in each partition independently.

Here's a good diagram of the phenomemon that I'm describing.  Observe how
some elements seem to 'move upward' together, and imagine that they
correspond to one partition.
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102#FIG12

Hope this helps!
Eron



On Mon, Jan 22, 2018 at 2:24 AM, Fabian Hueske  wrote:

> Hi William,
>
> The TsExtractor looks good.
> This sounds like a strange behavior and should not (or only indirectly) be
> related to the Kafka source since the WMs are generated by a separate
> extractor.
>
> - Did you compare the first (and only) generated watermark to the
> timestamps of the records that are produced by the sources?
> It might be far ahead of the timestamps in the records and won't be
> updated because the timestamps of the records are smaller.
>
> - What is the parallelism of the file sources / Kafka source and following
> operators?
> Watermarks can only advance if they advance in all parallel instance of
> the timestamp extractor.
>
> Best, Fabian
>
> 2018-01-18 16:09 GMT+01:00 William Saar :
>
>> Hi,
>> The watermark does not seem to get updated at all after the first one is
>> emitted. We used to get out-of-order warnings, but we changed to job to
>> support a bounded timestamp extractor so we no longer get those warnings.
>>
>> Our timestamp extractor looks like this
>>
>> class TsExtractor[T](time : Time) extends 
>> BoundedOutOfOrdernessTimestampExtractor[Timestamped[T]](time : Time) {
>> override def extractTimestamp(element: Timestamped[T]): Long = 
>> element.timestamp
>> }
>>
>> Our stream topology starts with a single stream, then we do two separate 
>> flat map and filtering operations on the initial stream to transform data 
>> batches
>> into streams of two different event types. We then 
>> assignTimestampsAndWatermarks(new TsExtractor[EventType](Time.seconds(20))) 
>> for each event type on both
>> branches before unioning the two branches to a single stream again (the 
>> reason for the split is that the data used to come from two different 
>> topics).
>>
>> William
>>
>>
>>
>>
>> - Original Message -
>> From:
>> "Gary Yao" 
>>
>> To:
>> "William Saar" 
>> Cc:
>> "user" 
>> Sent:
>> Thu, 18 Jan 2018 11:11:17 +0100
>> Subject:
>> Re: Far too few watermarks getting generated with Kafka source
>>
>>
>>
>> Hi William,
>>
>> How often does the Watermark get updated? Can you share your code that
>> generates
>> the watermarks? Watermarks should be strictly ascending. If your code
>> produces
>> watermarks that are not ascending, smaller ones will be discarded. Could
>> it be
>> that the events in Kafka are more "out of order" with respect to event
>> time than
>> in your file?
>>
>> You can assign timestamps in the Kafka source or later. The Flink
>> documentation
>> has a section on why it could be beneficial to assign Watermarks in the
>> Kafka
>> source:
>>
>>   https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>
>> Best,
>> Gary
>>
>> On Wed, Jan 17, 2018 at 5:15 PM, William Saar  wrote:
>>
>>> Hi,
>>> I have a job where we read data from either Kafka or a file (for
>>> testing), decode the entries and flat map them into events, and then add a
>>> timestamp and watermark assigner to the events in a later operation. This
>>> seems to generate periodic watermarks when running from a file, but when
>>> Kafka is the source we barely get any watermark updates. What could be
>>> causing this? (the environment has setAutowatermarkInterval(1000))
>>>
>>> Do we need to do all the timestamp and watermark assignment in the Kafka
>>> source? or should it work to do it in later operations? The events do seem
>>> to get propagated through the pipeline, we're just not getting 

Re: how to run flink project built with maven

2018-01-19 Thread Eron Wright
You must specify the full class name, in this case `org.example.WordCount`,
for the `--class` argument.

On Fri, Jan 19, 2018 at 9:35 AM, Jesse Lacika  wrote:

> I feel like this is probably the simplest thing, but I can't seem to
> figure it out, and I've searched and searched and can't find the answer, so
> I thought I'd try the mailing list. I'm using maven to build the sample
> flink project using the scala API as described here:
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/quickstart/scala_api_quickstart.html
>
> I've successfully built the project exactly as described. Now how do I run
> it?
>
> Everything I've tried so far fails. For example:
>
> $ ./bin/flink run --class WordCount ~/src/flink-gelly-test-maven/
> 44/target/44-1.0-SNAPSHOT.jar
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The program's
> entry point class 'WordCount' was not found in the jar file.
> at org.apache.flink.client.program.PackagedProgram.
> loadMainClass(PackagedProgram.java:613)
> at org.apache.flink.client.program.PackagedProgram.
> (PackagedProgram.java:197)
> at org.apache.flink.client.CliFrontend.buildProgram(
> CliFrontend.java:866)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:252)
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1054)
> at org.apache.flink.client.CliFrontend$1.call(
> CliFrontend.java:1101)
> at org.apache.flink.client.CliFrontend$1.call(
> CliFrontend.java:1098)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1807)
> at org.apache.flink.runtime.security.HadoopSecurityContext.
> runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
> Caused by: java.lang.ClassNotFoundException: WordCount
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.flink.client.program.PackagedProgram.
> loadMainClass(PackagedProgram.java:610)
> ... 11 more
>
> What am I doing wrong here?
>
> Thanks,
> Jesse
>


Re: History Server

2018-01-16 Thread Eron Wright
As a follow-up question, how well does the history server work for
observing a running job?   I'm trying to understand whether, in the
cluster-per-job model, a user would be expected to hop from the Web UI to
the History Server once the job completed.

Thanks

On Wed, Oct 4, 2017 at 3:49 AM, Stephan Ewen  wrote:

> To add to this:
>
> The History Server is mainly useful in cases where one runs a
> Flink-cluster-per-job. One the job finished, the processes disappear. The
> History Server should be longer lived to make past executions' stats
> available.
>
> On Mon, Sep 25, 2017 at 3:44 PM, Nico Kruber 
> wrote:
>
>> Hi Elias,
>> in theory, it could be integrated into a single web interface, but this
>> was
>> not done so far.
>> I guess the main reason for keeping it separate was probably to have a
>> better
>> separation of concerns as the history server is actually independent of
>> the
>> current JobManager execution and merely displays previous job results
>> which
>> may also come from different or previously existing JobManager instances
>> which
>> stored history data in its storage directory.
>>
>> Chesnay (cc'd) may elaborate a bit more in case you'd like to change that
>> and
>> integrate the history server (interface) into the JobManager.
>>
>>
>> Nico
>>
>> On Sunday, 24 September 2017 02:48:40 CEST Elias Levy wrote:
>> > I am curious, why is the History Server a separate process and Web UI
>> > instead of being part of the Web Dashboard within the Job Manager?
>>
>>
>>
>


Re: Timestamps and watermarks in CoProcessFunction function

2018-01-16 Thread Eron Wright
Consider the watermarks that are generated by your chosen watermark
generator as an +assertion+ about the progression of time, based on domain
knowledge, observation of elements, and connector specifics.  The generator
is asserting that any elements observed after a given watermark will come
later in event time, e.g. "we've reached 12:00 PM; subsequent events will
have a timestamp greater than 12:00 PM".

Your specific output seems fine to me.  It reads like, "event @ 11:59,
watermark @ 12:00, event @ 12:02, watermark @ 12:01, event @ 12:03".  The
watermark assertion wasn't violated in this situation.

Some operators provide special "late event" handling logic for the
situation that the assertion is violated.   The process function is quite
flexible, providing timers to observe the progression of time (due to
watermarks), and making it possible to handle late events as you see fit.
Often a process function will buffer events until a certain time is reached.

Hope this helps!
Eron

On Tue, Jan 16, 2018 at 8:51 AM, William Saar  wrote:

>
> Hi,
> I have added the code below to the start of processElement2 in
> CoProcessFunction. It prints timestamps and watermarks for the first 3
> elements for each new watermark. Shouldn't the timestamp always be lower
> than the next watermark? The 3 timestamps before the last watermark are all
> larger than the watermark time
>
> The output I get is
> wm -9223372036854775808
> ts 1478815851242
> ts 1478816075096
> ts 1478816114186
> wm 1478822353934
> ts 1478835814359
> ts 1478835083219
> ts 1478836126621
> wm 1478827220420
> ts 1478836408336
> ts 1478836469247
> ts 1478836759959
>
> if (getRuntimeContext.getIndexOfThisSubtask == 0) {
> if (context.timerService().currentWatermark() != printedWatermark) {
> printedWatermark = context.timerService().currentWatermark()
> println("wm " + printedWatermark)
> n = 0
>   } else {
> n += 1
>   }
> if (n < 3) {
> println("ts " + context.timestamp())
>   }
> }
>
>
>
>
>


Re: Two issues when deploying Flink on DC/OS

2018-01-13 Thread Eron Wright
Hello Dongwon,

Flink doesn't support a 'unique host' constraint at this time; it simply
accepts adequate offers without any such consideration.   Flink does
support a 'host attributes' constraint to filter certain hosts, but that's
not applicable here.

Under the hood, Flink uses a library called Netflix Fenzo to optimize
placement, and a uniqueness constraint could be added by more deeply
leveraging Fenzo's constraint system.   You mentioned that you're trying to
make good use of your GPU resources, which could also be achieved by
treating GPU as a scalar resource (similar to how memory and cores are
treated).   Mesos does support that, but Fenzo may require some
enhancement.   So, these are two potential ways to enhance Flink to support
your scenario.  I'm happy to help; reach out to me.

The obvious, ugly workaround is to configure your TMs to be large enough to
consume the whole host.

Eron





On Thu, Jan 11, 2018 at 7:18 AM, Gary Yao <g...@data-artisans.com> wrote:

> Hi Dongwon,
>
> I am not familiar with the deployment on DC/OS. However, Eron Wright and
> Jörg
> Schad (cc'd), who have worked on the Mesos integration, might be able to
> help
> you.
>
> Best,
> Gary
>
> On Tue, Jan 9, 2018 at 10:29 AM, Dongwon Kim <eastcirc...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I've launched JobManager and TaskManager on DC/OS successfully.
>> Now I have two new issues:
>>
>> 1) All TaskManagers are scheduled on a single node.
>> - Is it intended to maximize data locality and minimize network
>> communication cost?
>> - Is there an option in Flink to adjust the behavior of JobManager when
>> it considers multiple resource offers from different Mesos agents?
>> - I want to schedule TaskManager processes on different GPU servers so
>> that each TaskManger process can use its own GPU cards exclusively.
>> - Below is a part of JobManager log that is occurring while JobManager is
>> negotiating resources with the Mesos master:
>>
>> 2018-01-09 07:34:54,872 INFO  
>> org.apache.flink.mesos.runtime.clusterframework.MesosJobManager  - 
>> JobManager akka.tcp://flink@dnn-g08-233:18026/user/jobmanager was granted 
>> leadership with leader session ID Some(----).
>> 2018-01-09 07:34:55,889 INFO  
>> org.apache.flink.mesos.scheduler.ConnectionMonitor- Connecting 
>> to Mesos...
>> 2018-01-09 07:34:55,962 INFO  
>> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
>> Trying to associate with JobManager leader 
>> akka.tcp://flink@dnn-g08-233:18026/user/jobmanager
>> 2018-01-09 07:34:55,977 INFO  
>> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
>> Resource Manager associating with leading JobManager 
>> Actor[akka://flink/user/jobmanager#-1481183359] - leader session 
>> ----
>> 2018-01-09 07:34:56,479 INFO  
>> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
>> Scheduling Mesos task taskmanager-1 with (10240.0 MB, 8.0 cpus).
>> 2018-01-09 07:34:56,481 INFO  
>> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
>> Scheduling Mesos task taskmanager-2 with (10240.0 MB, 8.0 cpus).
>> 2018-01-09 07:34:56,481 INFO  
>> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
>> Scheduling Mesos task taskmanager-3 with (10240.0 MB, 8.0 cpus).
>> 2018-01-09 07:34:56,481 INFO  
>> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
>> Scheduling Mesos task taskmanager-4 with (10240.0 MB, 8.0 cpus).
>> 2018-01-09 07:34:56,481 INFO  
>> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
>> Scheduling Mesos task taskmanager-5 with (10240.0 MB, 8.0 cpus).
>> 2018-01-09 07:34:56,483 INFO  
>> org.apache.flink.mesos.scheduler.LaunchCoordinator- Now 
>> gathering offers for at least 5 task(s).
>> 2018-01-09 07:34:56,484 INFO  
>> org.apache.flink.mesos.scheduler.ConnectionMonitor- Connected to 
>> Mesos as framework ID 59b85b42-a4a2-4632-9578-9e480585ecdc-0004.
>> 2018-01-09 07:34:56,690 INFO  
>> org.apache.flink.mesos.scheduler.LaunchCoordinator- Received 
>> offer(s) of 606170.0 MB, 234.2 cpus:
>> 2018-01-09 07:34:56,692 INFO  
>> org.apache.flink.mesos.scheduler.LaunchCoordinator-   
>> 59b85b42-a4a2-4632-9578-9e480585ecdc-O2174 from 50.1.100.233 of 86.0 MB, 
>> 45.9 cpus for [*]
>> 2018-01-09 07:34:56,692 INFO  
>> org.apache.flink.mesos.scheduler.LaunchCoordinator-   
>>

Re: How to stop FlinkKafkaConsumer and make job finished?

2017-12-28 Thread Eron Wright
I believe you can extend the `KeyedDeserializationSchema` that you pass to
the consumer to check for end-of-stream markers.

https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.html#isEndOfStream-T-

Eron

On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi  wrote:

> Hey Jaxon,
>
> I don't think it's possible to control this via the life-cycle methods
> of your functions.
>
> Note that Flink currently does not support graceful stop in a
> meaningful manner and you can only cancel running jobs. What comes to
> my mind to cancel on EOF:
>
> 1) Extend Kafka consumer to stop emitting records after your EOF
> record. Look at the flink-connector-kafka-base module. This is
> probably not feasible and some work to get familiar with the code.
> Just putting in out there.
>
> 2) Throw a "SuccessException" that fails the job. Easy, but not nice.
>
> 3) Use an Http client and cancel your job via the Http endpoint
> (https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/monitoring/rest_api.html#job-cancellation).
> Easy, but not nice, since you need quite some logic in your function
> (e.g. ignore records after EOF record until cancellation, etc.).
>
> Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
>
> – Ufuk
>
>
> On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu  wrote:
> > I would like to stop FlinkKafkaConsumer consuming data from kafka
> manually.
> > But I find it won't be close when I invoke "cancel()" method. What I am
> > trying to do is add an EOF symbol meaning the end of my kafka data, and
> when
> > the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
> > "cancel()" method. It doesn't work. Flink streaming job won't finish
> unless
> > it get canceled or failed, when I use kafka as source.
> >
> > Somebody knowing  gives me some help, thx~~
>


Re: Job-level close()?

2017-12-18 Thread Eron Wright
Since the connection pool is per-TM, and a given job may span numerous TMs,
I think you're imagining a per-TM cleanup.

Your idea of using reference counting seems like the best option.  Maintain
a static count of open instances.  Use a synchronization block to manage
the count and to conditionally open and close the connection pool.


On Mon, Dec 18, 2017 at 1:58 AM, Fabian Hueske  wrote:

> Hi Andrew,
>
> I'm not aware of such a plan.
> Another way to address such issues is to run multiple TaskManagers with a
> single slot. In that case, only one subtask is executed per TM processes.
>
> Best, Fabian
>
> 2017-12-15 22:23 GMT+01:00 Andrew Roberts :
>
>> Hello,
>>
>> I’m writing a Flink operator that connects to a database, and running it
>> in parallel results in issues due to the singleton nature of the connection
>> pool in the library I’m working with. The operator needs to close the
>> connection pool when it’s done, but only when ALL parallel instances are
>> done. If one subtask finishes first, then it closes the pool out from under
>> the subtasks that are still working. Currently, I’m using a
>> reference-counting hack that’s pretty brittle and unsatisfying. Are there
>> any plans to add a user-definable job-level cleanup interface?
>>
>> Thanks
>>
>> Andrew
>> --
>> *Confidentiality Notice: The information contained in this e-mail and any
>> attachments may be confidential. If you are not an intended recipient, you
>> are hereby notified that any dissemination, distribution or copying of
>> this
>> e-mail is strictly prohibited. If you have received this e-mail in error,
>> please notify the sender and permanently delete the e-mail and any
>> attachments immediately. You should not retain, copy or use this e-mail or
>> any attachment for any purpose, nor disclose all or any part of the
>> contents to any other person. Thank you.*
>>
>
>


Re: flink eventTime, lateness, maxoutoforderness

2017-12-16 Thread Eron Wright
Take a look at the section of Flink documentation titled "Event Time and
Watermarks":
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_time.html#event-time-and-watermarks

Also read the excellent series "Streaming 101" and "102", has useful
animations depicting the flow of time:
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

Think of the watermark as a clock, ticking along due to information from
the connector or from a watermark generator.

Hope this helps,
Eron


On Sat, Dec 16, 2017 at 8:07 AM, chen  wrote:

> eventTime, lateness,  maxoutoforderness are all about time.
> event Time is the water mark time on the record.
> lateness is record time or the real word time?
> maxoutoforderness is record time or the real word time?
>
> dataStream.keyBy(row -> (String)row.getField(0))
> .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>  .allowedLateness(Time.seconds(5))
>  .fold(initRow(), new MyFoldFunction())
>
> public Watermark getCurrentWatermark() {
> return new Watermark(currentTime - 5000);}
>
> Does anyone could explain the time of eventTime,lateness,
> maxoutoforderness?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Flink long-running streaming job, Keytab authentication

2017-12-14 Thread Eron Wright
To my knowledge the various RPC clients take care of renewal (whether
reactively or using a renewal thread).  Some examples:
https://github.com/apache/hadoop/blob/release-2.7.3-RC2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java#L638
https://github.com/apache/kafka/blob/0.10.2/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java#L139

So I don't think Flink needs a renewal thread but the overall situation is
complex.  Some stack traces and logs may be needed to understand the issue.

Eron

On Thu, Dec 14, 2017 at 8:17 AM, Oleksandr Nitavskyi  wrote:

> Hello all,
>
>
>
> I have a question about Kerberos authentication in Yarn environment for
> long running streaming job. According to the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/security-
> kerberos.html#yarnmesos-mode ) Flink’s solution is to use keytab in order
> to perform authentication in YARN perimeter.
>
>
>
> If keytab is configured, Flink uses
> *UserGroupInformation#loginUserFromKeytab* method in order to perform
> authentication. In the YARN Security documentation (
>
> https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-
> project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/
> YarnApplicationSecurity.md#keytabs-for-am-and-containers-
> distributed-via-yarn ) mentioned that it should be enough:
>
>
>
> *Launched containers must themselves log in
> via UserGroupInformation.loginUserFromKeytab(). UGI handles the login, and
> schedules a background thread to relogin the user periodically.*
>
>
>
> But in reality if we check the Source code of UGI, we can see that no
> background Thread is created: https://github.com/apache/
> hadoop/blob/trunk/hadoop-common-project/hadoop-common/
> src/main/java/org/apache/hadoop/security/UserGroupInformation.java#L1153.
> There are just created javax.security.auth.login.LoginContext
>
> and performed authentication. Looks like it is true for different Hadoop
> branches - 2.7, 2.8, 3.0, trunk. So Flink also doesn’t create any
> background Threads: https://github.com/apache/flink/blob/master/flink-
> runtime/src/main/java/org/apache/flink/runtime/security/
> modules/HadoopModule.java#L69. So in my case job loses credentials for
> ResourceManager and HDFS after some time (12 hours in my case).
>
>
>
> Looks like UGI’s code is not aligned with the documentation and it
> doesn’t relogin periodically.
>
> But do you think patching with background Thread which performs
> UGI#reloginUserFromKeytab can be a solution?
>
>
>
> P.S. We are running Flink as a single job on Yarn.
>
>
>
>
>


Re: Fenzo NoClassDefFoundError: ObjectMapper

2017-12-14 Thread Eron Wright
Jared, I think you're correct that the shaded `ObjectMapper` is missing.
 Based on the above details and a quick look at the Fenzo code, it appears
that this bug is expressed when you've configured some hard constraints
that Mesos couldn't satisfy.  Do you agree?  Thanks also for suggesting a
fix, would you mind annotating FLINK-8265?

https://issues.apache.org/jira/browse/FLINK-8265

Sorry about this!

Eron


On Thu, Dec 14, 2017 at 12:17 PM, Jared Stehler <
jared.steh...@intellifylearning.com> wrote:

> Possibly missing an include for jackson here in the flink-mesos pom?
>
> 
> * *
> com.google.protobuf:protobuf-java
> org.apache.mesos:mesos
> com.netflix.fenzo:fenzo-core
> 
> 
> 
> 
> com.google.protobuf
> org.apache.flink.mesos.shaded.com.google.
> protobuf
> 
> 
> com.fasterxml.jackson
> org.apache.flink.mesos.shaded.com.fasterxml.jackson shadedPattern>
> 
> 
>
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703 <(617)%20701-6330>
>
>
>
> On Dec 14, 2017, at 3:10 PM, Jared Stehler  intellifylearning.com> wrote:
>
> I see a shaded jackson class with *jackson2* in the package, but none
> with the path shown below.
>
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703 <(617)%20701-6330>
>
>
>
> On Dec 14, 2017, at 3:05 PM, Jared Stehler  intellifylearning.com> wrote:
>
> Getting the following error on app master startup with flink-mesos 1.4.0:
>
> ExecutionException: java.lang.NoClassDefFoundError: 
> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at com.netflix.fenzo.TaskScheduler.doSchedule(TaskScheduler.java:678)
> at com.netflix.fenzo.TaskScheduler.scheduleOnce(TaskScheduler.java:600)
> at 
> org.apache.flink.mesos.scheduler.LaunchCoordinator$$anonfun$5.applyOrElse(LaunchCoordinator.scala:173)
> ...
> (17 additional frame(s) were not displayed)
>
> NoClassDefFoundError: 
> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
> at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
> at 
> com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784)
> at 
> com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581)
> at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796)
> at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70)
> ...
> (6 additional frame(s) were not displayed)
>
> ClassNotFoundException: 
> org.apache.flink.mesos.shaded.com.fasterxml.jackson.databind.ObjectMapper
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
> ...
> (10 additional frame(s) were not displayed)
>
>
>
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703 <(617)%20701-6330>
>
>
>
>
>
>


Re: hadoop error with flink mesos on startup

2017-12-12 Thread Eron Wright
Thanks for investigating this, Jared.  I would summarize it as
Flink-on-Mesos cannot be used in Hadoop-free mode in Flink 1.4.0.  I filed
an improvement bug to support this scenario: FLINK-8247



On Tue, Dec 12, 2017 at 11:46 AM, Jared Stehler <
jared.steh...@intellifylearning.com> wrote:

> I had been excluding all transitive dependencies from the lib dir; it
> seems to be working when I added the following deps:
>
> 
>   commons-configuration
>   commons-configuration
>   1.7
> 
>
> 
>   commons-lang
>   commons-lang
>   2.6
> 
>
>
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703 <(617)%20701-6330>
>
>
>
> On Dec 12, 2017, at 2:10 PM, Chesnay Schepler  wrote:
>
> Could you look into the flink-shaded-hadoop jar to check whether the
> missing class is actually contained?
>
> Where did the flink-shaded-hadoop jar come from? I'm asking because when
> building flink-dist from source the jar is called
> flink-shaded-hadoop2-uber-1.4.0.jar, which does indeed contain the jar.
> (the uber jar is created by building flink-shaded-hadoop2*-uber*)
>
> On 12.12.2017 19:28, Jared Stehler wrote:
>
> After upgrading to flink 1.4.0 using the hadoop-free build option, I’m
> seeing the following error on startup in the app master:
>
> 2017-12-12 18:23:15.473 [main] ERROR 
> o.a.f.m.r.clusterframework.MesosApplicationMasterRunner
> - Mesos JobManager initialization failed
>
> 
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.hadoop.security.UserGroupInformation
>
> 
> at org.apache.flink.runtime.clusterframework.overlays.
> HadoopUserOverlay$Builder.fromEnvironment(HadoopUserOverlay.java:74)
>
> 
> at org.apache.flink.mesos.entrypoint.MesosEntrypointUtils.applyOverlays(
> MesosEntrypointUtils.java:145)
> Looking at the code, it appears that the HadoopUserOverlay always tries to
> init the UserGroupInformation class, and is failing. Same error with or
> without the flink-shaded-hadoop2 library included.
>
> This is my lib dir:
>
> flink-appmaster-1.0-SNAPSHOT.jar   flink-s3-fs-presto-1.4.0.jar
> jul-to-slf4j-1.7.25.jarsentry-1.5.3.jar
> flink-dist_2.11-1.4.0.jar  flink-shaded-hadoop2-1.4.0.jar
> log4j-over-slf4j-1.7.25.jarsentry-logback-1.5.3.jar
> flink-metrics-prometheus-1.4.0.jar jackson-core-2.8.10.jar
> logback-classic-1.1.11.jar
> flink-python_2.11-1.4.0.jarjcl-over-slf4j-1.7.25.jar
> logback-core-1.1.11.jar
>
>
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703 <(617)%20701-6330>
>
>
>
>
>
>


Re: what's the best practice to determine event-time watermark for unordered stream?

2017-12-12 Thread Eron Wright
A couple of other details worth mentioning:

1. Your choice of connector matters a lot.   Some connectors provide
limited ordering guarantees, especially across keys, which leads to a
highly disordered stream of events (with respect to event time) from the
perspective of the watermark generator.   Some connectors provide
mitigations; for example, the Kafka connector supports a per-partition
watermark generator.

2. When using `assignTimestampsAndWatermarks`, Flink creates a separate
instance of your watermark generator for each instance of the operator to
which it is assigned.

3. Watermark generators may contain private fields but their state isn't
checkpointed (to my knowledge).

Hope this helps,
Eron

On Tue, Dec 12, 2017 at 7:53 AM, Vishal Santoshi 
wrote:

> To add to Fabian's comment,
>
> What can be done ( and that may not be the norm ) is keep
> a 95-99% quantile ( using an Approximate Histogram such that the execution
> is not heavy )  of the diff between server ( or  ingestion time ) and event
> time and use it as a max out of order ness.  We keep the last n of these
> quantile values each representing  x elements and chose the least. What we
> figure is that giving a max upper bound on real distribution is more
> palatable than some arbitrary value.  The late data becomes inconsequential
> as the lateness has been incorporated in the delayed watermark generation.
>
> Vishal.
>
>
>
>
>
>
>
> On Tue, Dec 12, 2017 at 7:46 AM, Fabian Hueske  wrote:
>
>> As I said before, you can solve that with a custom WatermarkAssigner.
>> Collect a histogram, take the median out of X samples, ignore outliers,
>> etc.
>>
>> 2017-12-12 13:37 GMT+01:00 Jinhua Luo :
>>
>>> Think about we have a normal ordered stream, if an abnormal event A
>>> appears and thus advances the watermark, making all subsequent normal
>>> events (earlier than A) late, I think it's a mistake.
>>> The ways you listed cannot help this mistake. The normal events cannot
>>> be dropped, and the lateness may be hard to determine (it depends on
>>> the timestamp of the abnormal event) and re-triggered the window to
>>> downstream brings in side-effect.
>>> If the abnormal event appears in the middle of stream, then maybe we
>>> could filter out this event checking the delta with the last element,
>>> but what if the abnormal event is the first event emitted by the
>>> source?
>>>
>>>
>>> 2017-12-12 19:25 GMT+08:00 Fabian Hueske :
>>> > Early events are usually not an issue because the can be kept in state
>>> until
>>> > they are ready to be processed.
>>> > Also, depending on the watermark assigner often push the watermark
>>> ahead
>>> > such that they are not early but all other events are late.
>>> >
>>> > Handling of late events depends on your use case and there are the
>>> three
>>> > options that I already listed:
>>> >
>>> > 1) dropping
>>> > 2) keeping state of "completed" computations for some time (allowed
>>> > lateness). If a late event arrives, you can update the result and emit
>>> an
>>> > update. In this case your downstream operators systems have to be able
>>> to
>>> > deal with updates.
>>> > 3) send the late events to a different channel via side outputs and
>>> handle
>>> > them later.
>>> >
>>> >
>>> >
>>> > 2017-12-12 12:14 GMT+01:00 Jinhua Luo :
>>> >>
>>> >> Yes, I know flink is flexible.
>>> >>
>>> >> But I am thinking when the event sequence is mess (e,g, branches of
>>> >> time-series events interleaved, but each branch has completely
>>> >> different time periods), then it's hard to apply them into streaming
>>> >> api, because no matter which way you generate watermark, the watermark
>>> >> cannot be backward or branching.
>>> >>
>>> >> Is there any best practice to handle late event and/or early event?
>>> >>
>>> >>
>>> >> 2017-12-12 18:24 GMT+08:00 Fabian Hueske :
>>> >> > Hi,
>>> >> >
>>> >> > this depends on how you generate watermarks [1].
>>> >> > You could generate watermarks with a four hour delay and be fine
>>> (at the
>>> >> > cost of a four hour latency) or have some checks that you don't
>>> >> > increment a
>>> >> > watermark by more than x minutes at a time.
>>> >> > These considerations are quite use case specific, so it's hard to
>>> give
>>> >> > an
>>> >> > advice that applies to all cases.
>>> >> >
>>> >> > There are also different strategies for how to handle late data in
>>> >> > windows.
>>> >> > You can drop it (default behavior), you can update previously
>>> emitted
>>> >> > results (allowed lateness) [2], or emit them to a side output [3].
>>> >> >
>>> >> > Flink is quite flexible when dealing with watermarks and late data.
>>> >> >
>>> >> > Best, Fabian
>>> >> >
>>> >> > [1]
>>> >> >
>>> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/event_timestamps_watermarks.html
>>> >> > [2]
>>> >> >
>>> >> > 

Re: Problems with taskmanagers in Mesos Cluster

2017-10-23 Thread Eron Wright
If I understand you correctly, the high-availability path isn't being
changed but other TM-related settings are, and the recovered TMs aren't
picking up the new configuration.   I don't think that Flink supports
on-the-fly reconfiguration of a Task Manager at this time.

As a workaround, to achieve a clean new session when you reconfigure Flink
via Marathon, update the HA path accordingly.

Would that work for you?



On Wed, Oct 18, 2017 at 6:52 AM, Manuel Montesino <
manuel.montes...@piksel.com> wrote:

> Hi,
>
> We have deployed a Mesos cluster with Marathon, we deploy flink sessions
> through marathon with multiple taskmanagers configured. Some times in
> previous stages usually change configuration on marathon json about memory
> and other stuff, but when redeploy the flink session the jobmanagers stop
> and start with new configuration, but the taskmanagers not reuse the same
> was configured. So we have to kill/stop the dockers of each taskmanager
> task.
>
> There is a way that kill or stop the taskmanagers when the session is
> redeployed?
>
> Some environment configuration from marathon json file related to
> taskmanagers:
>
> ```
> "flink_akka.ask.timeout": "1min",
> "flink_akka.framesize": "102400k",
> "flink_high-availability": "zookeeper",
> "flink_high-availability.zookeeper.path.root": "/flink",
> "flink_jobmanager.web.history": "200",
> "flink_mesos.failover-timeout": "86400",
> "flink_mesos.initial-tasks": "16",
> "flink_mesos.maximum-failed-tasks": "-1",
> "flink_mesos.resourcemanager.tasks.container.type": "docker",
> "flink_mesos.resourcemanager.tasks.mem": "6144",
> "flink_metrics.reporters": "jmx",
> "flink_metrics.reporter.jmx.class": "org.apache.flink.metrics.jmx.
> JMXReporter",
> "flink_state.backend": "org.apache.flink.contrib.streaming.state.
> RocksDBStateBackendFactory",
> "flink_taskmanager.maxRegistrationDuration": "10 min",
> "flink_taskmanager.network.numberOfBuffers": "8192",
> "flink_jobmanager.heap.mb": "768",
> "flink_taskmanager.debug.memory.startLogThread": "true",
> "flink_mesos.resourcemanager.tasks.cpus": "1.3",
> "flink_env.java.opts.taskmanager": "-XX:+UseG1GC -XX:MaxGCPauseMillis=200
> -XX:ConcGCThreads=1 -XX:InitiatingHeapOccupancyPercent=35
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50
> -XX:MaxMetaspaceFreeRatio=80 -XX:+DisableExplicitGC
> -Djava.awt.headless=true -XX:+PrintGCDetails -XX:+PrintGCDateStamps
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=10M",
> "flink_containerized.heap-cutoff-ratio": "0.67"
> ```
>
> Thanks in advance and kind regards,
>
> *Manuel Montesino*
> Devops Engineer
>
> *E* *manuel.montesino@piksel(dot)com*
>
> Marie Curie,1. Ground Floor. Campanillas, Malaga 29590
> *liberating viewing* | *piksel.com *
>
> [image: Piksel_Email.png]
>
> This message is private and confidential. If you have received this
> message in error, please notify the sender or serviced...@piksel.com and
> remove it from your system.
>
> Piksel Inc is a company registered in the United States, 2100 Powers
> Ferry Road SE, Suite 400, Atlanta, GA 30339
> 
>


Re: Classloader error after SSL setup

2017-10-04 Thread Eron Wright
By following Chesney's recommendation we will hopefully uncover an SSL
error that is being masked.  Another thing to try is to disable hostname
verification (it is enabled by default) to see whether the certificate is
being rejected.

On Wed, Oct 4, 2017 at 5:15 AM, Chesnay Schepler  wrote:

> something that would also help us narrow down the problematic area is to
> enable SSL for one component at a time and see
> which one causesd the job to fail.
>
>
> On 04.10.2017 14:11, Chesnay Schepler wrote:
>
> The configuration looks reasonable. Just to be sure, are the paths
> accessible by all nodes?
>
> As a first step, could you set the logging level to DEBUG (by modifying
> the 'conf/log4j.properties' file), resubmit the job (after a cluster
> restart) and check the Job- and TaskManager logs for any exception?
>
> On 04.10.2017 03:15, Aniket Deshpande wrote:
>
> Background: We have a setup of Flink 1.3.1 along with a secure MAPR
> cluster (Flink is running on mapr client nodes). We run this flink cluster
> via flink-jobmanager.sh foreground and flink-taskmanager.sh foreground command
> via Marathon.  In order for us to make this work, we had to add
> -Djavax.net.ssl.trustStore="$JAVA_HOME/jre/lib/security/cacerts" in
> flink-console.sh as extra JVM arg (otherwise, flink was taking MAPR's
> ssl_truststore as default truststore and then we were facing issues for any
> 3rd party jars like aws_sdk etc.). This entire setup was working fine as it
> is and we could submit our jars and the pipelines ran without any problem
>
>
> Problem: We started experimenting with enabling ssl for all communication
> for Flink. For this, we followed https://ci.apache.
> org/projects/flink/flink-docs-release-1.3/setup/security-ssl.html for
> generating CA and keystore. I added the following properties to
> flink-conf.yaml:
>
>
> security.ssl.enabled: true
> security.ssl.keystore: /opt/flink/certs/node1.keystore
> security.ssl.keystore-password: 
> security.ssl.key-password: 
> security.ssl.truststore: /opt/flink/certs/ca.truststore
> security.ssl.truststore-password: 
> jobmanager.web.ssl.enabled: true
> taskmanager.data.ssl.enabled: true
> blob.service.ssl.enabled: true
> akka.ssl.enabled: true
>
>
> We then spin up a cluster and tried submitting the same job which was
> working before. We get the following erros:
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
> user class: org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaConsumer09
> ClassLoader info: URL ClassLoader:
> Class not resolvable through given classloader.
> at org.apache.flink.streaming.api.graph.StreamConfig.
> getStreamOperator(StreamConfig.java:229)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> init>(OperatorChain.java:95)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:230)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
>
>
> This error disappears when we remove the ssl config properties i.e run
> flink cluster without ssl enabled.
>
>
> So, did we miss any steps for enabling ssl?
>
>
> P.S.: We tried removing the extra JVm arg mentioned above, but still get
> the same error.
>
> --
>
> Aniket
>
>
>
>


Re: Flink Application Jar file on Docker container

2017-09-27 Thread Eron Wright
There was also a talk about containerization at Flink Forward that touches
on some of your questions.
https://www.youtube.com/watch?v=w721NI-mtAA=2s=PLDX4T_cnKjD0JeULl1X6iTn7VIkDeYX_X=33

Eron

On Wed, Sep 27, 2017 at 9:12 AM, Stefan Richter  wrote:

> Hi,
>
> from the top of my head, I cannot see why this should not be possible,
> task managers just need to be able to connect to their job manager.
> Unfortunately, I cannot give a real guarantee here because I am not that
> deeply involved in this aspect of Flink.
>
> Best,
> Stefan
>
> Am 27.09.2017 um 12:39 schrieb Rahul Raj :
>
> Hi Stefan,
>
> I have a question in my mid out of curiosity Is it possible to run
> flink application within docker container by using flink cluster set up on
> host?
>
> Rahul Raj
>
> On 26 September 2017 at 17:29, Stefan Richter  > wrote:
>
>> Hi,
>>
>> if I correctly understood the approach outlined on github, you can start
>> a standalone job manager and the task manager get the JM information either
>> through the provided configuration or through Zookeeper. Take a look at the
>> „running section“, e.g.:
>>
>> 1) „Via Mesos/Marathon: Start a standalone JobManager (you need to
>> replace the flink_recovery_zookeeper_quorum variable with a valid
>> setting for your cluster) [...]“
>> 2) „Via standalone Docker: Start a standalone JobManager (with host
>> networking, binding on 127.0.0.1) […]“
>>
>> Best,
>> Stefan
>>
>>
>> Am 26.09.2017 um 12:43 schrieb Rahul Raj :
>>
>> Hi Stefan,
>>
>> Thanks a lot for your answer and sharing the link
>> https://github.com/mesoshq/flink. I went through this and saw its
>> spawning Jobmanager and taskmanager. Now I think, this should be happening.
>> First JobManager will be started on flink cluster on one node, then task
>> manager will be started on another node and both should be running in
>> docker containers on different nodes. Now, my question is how flink's
>> JobManager will get to know about the taskManagers as they are in in
>> different docker containers on different nodes? Will it happen via Mesos?
>>
>> Can we use mesos-appmaster.sh instead which is already built in flink for
>> deployment on mesos?
>>
>> Rahul Raj
>>
>> On 26 September 2017 at 15:32, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> as in my answer to your previous mail, I suggest to take a look at
>>> https://github.com/mesoshq/flink . Unfortunately, there is not yet a
>>> lot documentation about the internals of how this works, so I am also
>>> looping in Till who might know more about specific questions about how
>>> things work together exactly.
>>>
>>> Best,
>>> Stefan
>>>
>>>
>>> Am 26.09.2017 um 09:21 schrieb Rahul Raj :
>>>
>>> Currently I have a Flink Application Jar file running on Mesos cluster.
>>> The flink application simply reads data from Kafka and put it to HDFS.
>>>
>>> Now we are planning to create a docker image to  run this application
>>> jar file inside docker containers on Mesos cluster via Marathon.
>>>
>>> Below are the questions that I am looking answers for:
>>>
>>> 1. While building the docker image, how do I include flink-1.3.2 set up
>>> and  my mesos config in flink?
>>>
>>> 2. How shall I run my existing flink application jar?
>>>
>>> 3. Will running my flink application jar on docker containers will run
>>> it on mesos slaves on different docker containers? How docker , Flink,
>>> mesos , Marathon will work together in my case?
>>>
>>> Rahul Raj
>>>
>>>
>>>
>>
>>
>
>


Re: Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-20 Thread Eron Wright
It is not surprising to see fidelity issues with the YARN proxy.  I suggest
opening a ticket on Flink side to update the cancel-with-savepoint API to
take the target directory as a query string parameter (of course, backwards
compatibility should be maintained).

On Wed, Sep 20, 2017 at 1:55 AM, Nico Kruber <n...@data-artisans.com> wrote:

> Hi Emily,
> I'm not familiar with the details of the REST API either but if this is a
> problem with the proxy, maybe it is already interpreting the encoded URL
> and
> passes it on un-encoded - have you tried encoding the path again? That is,
> encoding the percent-signs:
>
> http://
> {ip}:20888/proxy/application_1504649135200_0001/jobs/
> 1a0fd176ec8aabb9b8464fa481f755f0/cancel-with-savepoint/target-directory/
> s3%253A%252F%252F%252Fremit-flink
>
>
> Nico
>
> On Wednesday, 20 September 2017 00:52:05 CEST Emily McMahon wrote:
> > Thanks Eron & Fabian.
> >
> > The issue was hitting a yarn proxy url vs the node itself. For example
> this
> > worked
> > http://
> > {ip}:37716/jobs/1a0fd176ec8aabb9b8464fa481f755
> f0/cancel-with-savepoint/targe
> > t-directory/s3%3A%2F%2F%2Fremit-flink
> >
> > But this did not
> > http://
> > {ip}:20888/proxy/application_1504649135200_0001/jobs/
> 1a0fd176ec8aabb9b8464fa
> > 481f755f0/cancel-with-savepoint/target-directory/s3%
> 3A%2F%2F%2Fremit-flink
> >
> > It's a bit confusing because the cancel api works with either and the
> proxy
> > url sometimes works as this was successful http://
> > {ip}:20888/proxy/application_1504649135200_0001/jobs/
> cca2dd609c716a7b0a19570
> > 0777e5b1f/cancel-with-savepoint/target-directory/tmp/
> >
> > Cheers,
> > Emily
> >
> > On Tue, Sep 19, 2017 at 2:37 PM, Eron Wright <eronwri...@gmail.com>
> wrote:
> > > Good news, it can be done if you carefully encode the target directory
> > > with percent-encoding, as per:
> > > https://tools.ietf.org/html/rfc3986#section-2.1
> > >
> > > For example, given the directory `s3:///savepoint-bucket/my-
> awesome-job`,
> > > which encodes to `s3%3A%2F%2F%2Fsavepoint-bucket%2Fmy-awesome-job`, I
> was
> > > able to submit the following URL:
> > > http://localhost:8081/jobs/5c360ded6e4b7d8db103e71d68b7c8
> > > 3d/cancel-with-savepoint/target-directory/s3%3A%2F%2F%
> > > 2Fsavepoint-bucket%2Fmy-awesome-job
> > >
> > > And see the following in the log:
> > > 2017-09-19 14:27:45,939 INFO
> > > org.apache.flink.runtime.jobmanager.JobManager>
> > >- Trying to cancel job 5c360ded6e4b7d8db103e71d68b7c83d
> > >
> > > with savepoint to s3:///savepoint-bucket/my-awesome-job
> > >
> > > -Eron
> > >
> > > On Tue, Sep 19, 2017 at 1:54 PM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> > >> Hi Emily,
> > >>
> > >> thanks for reaching out.
> > >> I'm not familiar with the details of the Rest API but Ufuk (in CC)
> might
> > >> be able to help you.
> > >>
> > >> Best, Fabian
> > >>
> > >> 2017-09-19 10:23 GMT+02:00 Emily McMahon <emil...@remitly.com>:
> > >>> I've tried every combination I can think of to pass an s3 path as the
> > >>> target directory (url encode, include trailing slash, etc)
> > >>>
> > >>> I can successfully pass a local path as the target directory (ie
> > >>> /jobs/$jobID/cancel-with-savepoint/target-directory/tmp) so I don't
> > >>> think there's a problem with the jobId or rest of the url. I also
> > >>> verified
> > >>> I can create the savepoint on s3 from the command line so it's not a
> > >>> permission issue.
> > >>>
> > >>> Here's the same question on stack overflow
> > >>> <https://stackoverflow.com/questions/45844298/send-flink-
> job-manager-a-f
> > >>> ully-qualified-path-to-a-savepoint-directory-using-the> (with the
> > >>> exception that they are getting a 502 whereas I'm getting a 404)
> > >>>
> > >>> using Flink 1.3.1
> > >>>
> > >>> Anyone have a working example?
> > >>>
> > >>> Thanks,
> > >>> Emily
>
>


Re: Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-19 Thread Eron Wright
Good news, it can be done if you carefully encode the target directory with
percent-encoding, as per:
https://tools.ietf.org/html/rfc3986#section-2.1

For example, given the directory `s3:///savepoint-bucket/my-awesome-job`,
which encodes to `s3%3A%2F%2F%2Fsavepoint-bucket%2Fmy-awesome-job`, I was
able to submit the following URL:
http://localhost:8081/jobs/5c360ded6e4b7d8db103e71d68b7c83d/cancel-with-savepoint/target-directory/s3%3A%2F%2F%2Fsavepoint-bucket%2Fmy-awesome-job

And see the following in the log:
2017-09-19 14:27:45,939 INFO
 org.apache.flink.runtime.jobmanager.JobManager- Trying to
cancel job 5c360ded6e4b7d8db103e71d68b7c83d with savepoint to
s3:///savepoint-bucket/my-awesome-job

-Eron

On Tue, Sep 19, 2017 at 1:54 PM, Fabian Hueske  wrote:

> Hi Emily,
>
> thanks for reaching out.
> I'm not familiar with the details of the Rest API but Ufuk (in CC) might
> be able to help you.
>
> Best, Fabian
>
> 2017-09-19 10:23 GMT+02:00 Emily McMahon :
>
>> I've tried every combination I can think of to pass an s3 path as the
>> target directory (url encode, include trailing slash, etc)
>>
>> I can successfully pass a local path as the target directory (ie
>> /jobs/$jobID/cancel-with-savepoint/target-directory/tmp) so I don't
>> think there's a problem with the jobId or rest of the url. I also verified
>> I can create the savepoint on s3 from the command line so it's not a
>> permission issue.
>>
>> Here's the same question on stack overflow
>> 
>>  (with
>> the exception that they are getting a 502 whereas I'm getting a 404)
>>
>> using Flink 1.3.1
>>
>> Anyone have a working example?
>>
>> Thanks,
>> Emily
>>
>
>


Re: Problem in Flink 1.3.2 with Mesos task managers offers

2017-09-19 Thread Eron Wright
Hello, the current behavior is that Flink holds onto received offers for up
to two minutes while it attempts to provision the TMs.   Flink can combine
small offers to form a single TM, to combat fragmentation that develops
over time in a Mesos cluster.   Are you saying that unused offers aren't
being released after two minutes?

There's a log entry you should see in the JM log whenever an offer is
released:
LOG.info(s"Declined offer ${lease.getId} from ${lease.hostname()} "
  + s"of ${lease.memoryMB()} MB, ${lease.cpuCores()} cpus.")

The timeout value isn't configurable at the moment, but if you're willing
to experiment by building Flink from source, you may adjust the two minute
timeout to something lower as follows.   In the `MesosFlinkResourceManager`
class, edit the `createOptimizer` method to call `withLeaseOfferExpirySecs`
on the `TaskScheduler.Builder` object.

Let us know if that helps and we'll make the timeout configurable.
-Eron

On Tue, Sep 19, 2017 at 8:58 AM, Francisco Gonzalez Barea <
francisco.gonza...@piksel.com> wrote:

> Hello guys,
>
> We have a flink 1.3.2 session deployed from Marathon json to Mesos with
> some of the following parameters as environment variables:
>
>
> *"flink_mesos.initial-tasks": "8",*
> *"flink_mesos.resourcemanager.tasks.mem": "4096",*
>
>
> And other environment variables including zookeeper, etc.
>
> The mesos cluster is used for diferents applications (kafka, ad-hoc...),
> and have fragmentation into the agents. Our problem is that the flink
> session is getting all offers, even small ones. In case there are not
> enough offers to suit that configuration, it gets all of them, so there are
> no resources and offers free for other applications.
>
> So the question would be what is the right configuration in these cases to
> avoid using all resources for the same flink session.
>
> Thanks in advance.
> Regards
>
> This message is private and confidential. If you have received this
> message in error, please notify the sender or serviced...@piksel.com and
> remove it from your system.
>
> Piksel Inc is a company registered in the United States, 2100 Powers
> Ferry Road SE, Suite 400, Atlanta, GA 30339
> 
>


Re: Securing Flink Monitoring REST API

2017-09-18 Thread Eron Wright
Unfortunately Flink does not yet support SSL mutual authentication nor any
form of client authentication.   There is an ongoing discussion about it:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Service-Authorization-redux-td18890.html

A workaround that I've seen is to use nginx as a frontend proxy.  Be sure
to lock down the underlying endpoints somehow.  If you choose to go this
route, Patrick Lucas gave a related talk recently (Flink in Containerland):
https://youtu.be/w721NI-mtAA

-Eron


On Mon, Sep 18, 2017 at 1:30 AM, Fabian Hueske  wrote:

> Hi,
>
> sorry for the late response.
> Flink uses Netty for network communication which supports SSL client
> authentication.
> I haven't tried it myself, but would think that this should work in Flink
> as well if you configure the certificates correctly.
>
> We should update the docs to cover this aspect.
> Feedback on this would be very welcome
>
> Thanks, Fabian
>
> 2017-09-06 14:23 GMT+02:00 avivros :
>
>> Does  jobmanager.web.ssl.enabled supports Client SSL Authentication?
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: User-based authentication in Flink

2017-09-15 Thread Eron Wright
I think the short-term approach is to place an nginx proxy in front, in
combination with some form of isolation of the underlying endpoint.   That
addresses the authentication piece but not fine-grained authorization.  Be
aware that the Flink JM is not multi-user due to lack of isolation among
jobs.   The trend is towards running each job in a separate Flink cluster,
on top of YARN, Kubernetes, or Mesos.

Related to FLIP-6, the WebUI is gradually becoming decoupled from the JM
and hopefully will be more multi-user friendly and capable of communicating
with numerous JMs.   Some of the precursors are being tracked in FLIP-7530
and FLINK-7083.

Please hop over to the dev list if you'd like to discuss the FLIP-6 stuff
and how it relates to multi-user scenarios.

Thanks
Eron

On Thu, Sep 14, 2017 at 10:36 PM, Kazuki Ono  wrote:

> Hello Folks,
>
> I need to use user-based authentication in Flink Dashboard for
> multi-tenancy.
> I read the document, however, I could not found how to enable the
> authentication.
>
> If authentication is not supported currently, I would like to contribute
> to the community.
> How can I proceed it?
>
> Regards,
> Kaz
>
>


Re: Flink flick cancel vs stop

2017-09-15 Thread Eron Wright
Aljoscha, would it be correct to characterize your idea as a 'pull' source
rather than the current 'push'?  It would be interesting to look at the
existing connectors to see how hard it would be to reverse their
orientation.   e.g. the source might require a buffer pool.

On Fri, Sep 15, 2017 at 9:05 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Also relevant for this discussion: Several people (including me) by now
> were floating the idea of reworking the source interface to take away the
> responsibility of stopping/canceling/continuing from a specific source
> implementation and to instead give that power to the system. Currently each
> source does basically this:
>
> class Source {
>   public void run(Context ctx, Lock lock) {
> while ("forever long I want and I don't care") {
>   synchronized (lock) {
> T output = ReadFrom.externalSystem();
>  updateReadPositionState();
> ctx.collect(output);
>   }
> }
>   }
> }
>
> Meaning that any stopping/canceling behaviour requires cooperation from
> the source implementation.
>
> This would be a different idea for a source interface:
>
> abstract class NewSource {
>   public abstract boolean start();
>   public abstract boolean advance();
>   public abstract void close();
>
>   public abstract T getCurrent();
>   public abstract Instant getCurrentTimestamp();
>   public abstract Instant getWatermark();
>
>   public abstract CheckpointMark getCheckpointMark();
> }
>
> Here the driver would sit outside and call the source whenever data should
> be provided. Stop/cancel would not be a feature of the source function but
> of the code that calls it.
>
> Best,
> Aljoscha
>
> On 14. Sep 2017, at 20:03, Eron Wright <eronwri...@gmail.com> wrote:
>
> I too am curious about stop vs cancel.  I'm trying to understand the
> motivations a bit more.
>
> The current behavior of stop is basically that the sources become bounded,
> leading to the job winding down.
>
> The interesting question is how best to support 'planned' maintenance
> procedures such as app upgrade and scale changes.   I think a good
> enhancement could be to stop precisely at checkpoint time to prevent
> emission of spurious records.  Today the behavior of 'cancel w/ savepoint'
> is at-least-once because the two operations aren't atomic.  Earlier I had
> assumed that 'stop' would evolve in this direction but I suppose we could
> improve the atomicity of 'cancel /w savepoint' rather than implicating
> 'stop'.
>
> A different direction for 'stop' might be to improve the determinism of
> bounding a streaming job such that the stop point is well-understood in
> terms of the source.  For example, stopping at a offset provided as a stop
> parameter.   Today I suppose one would rely on external state to remember
> the stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets.
>
> On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi <u...@apache.org> wrote:
>
>> Hey Elias,
>>
>> sorry for the delay here. No, stop is not deprecated but not fully
>> implemented yet. One missing part is migration of the existing source
>> functions as you say.
>>
>> Let me pull in Till for more details on this. @Till: Is there more
>> missing than migrating the sources?
>>
>> Here is the PR and discussion for reference:
>> https://github.com/apache/flink/pull/750
>>
>> I would also really love to see this fully implemented in Flink. I
>> don't expect this to happen for the upcoming 1.4 release though.
>>
>> – Ufuk
>>
>>
>> On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <fearsome.lucid...@gmail.com>
>> wrote:
>> > Anyone?
>> >
>> > On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <
>> fearsome.lucid...@gmail.com>
>> > wrote:
>> >>
>> >> I was wondering about the status of the flink stop command.  At first
>> >> blush it would seem as the preferable way to shutdown a Flink job, but
>> it
>> >> depends on StoppableFunction being implemented by sources and I notice
>> that
>> >> the Kafka source does not seem to implement it.  In addition, the
>> command
>> >> does not -s  --withSavepoint like cancel does.
>> >>
>> >> Is stop deprecated?
>> >
>> >
>>
>
>
>


Re: Flink flick cancel vs stop

2017-09-14 Thread Eron Wright
I too am curious about stop vs cancel.  I'm trying to understand the
motivations a bit more.

The current behavior of stop is basically that the sources become bounded,
leading to the job winding down.

The interesting question is how best to support 'planned' maintenance
procedures such as app upgrade and scale changes.   I think a good
enhancement could be to stop precisely at checkpoint time to prevent
emission of spurious records.  Today the behavior of 'cancel w/ savepoint'
is at-least-once because the two operations aren't atomic.  Earlier I had
assumed that 'stop' would evolve in this direction but I suppose we could
improve the atomicity of 'cancel /w savepoint' rather than implicating
'stop'.

A different direction for 'stop' might be to improve the determinism of
bounding a streaming job such that the stop point is well-understood in
terms of the source.  For example, stopping at a offset provided as a stop
parameter.   Today I suppose one would rely on external state to remember
the stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets.

On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi  wrote:

> Hey Elias,
>
> sorry for the delay here. No, stop is not deprecated but not fully
> implemented yet. One missing part is migration of the existing source
> functions as you say.
>
> Let me pull in Till for more details on this. @Till: Is there more
> missing than migrating the sources?
>
> Here is the PR and discussion for reference:
> https://github.com/apache/flink/pull/750
>
> I would also really love to see this fully implemented in Flink. I
> don't expect this to happen for the upcoming 1.4 release though.
>
> – Ufuk
>
>
> On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy 
> wrote:
> > Anyone?
> >
> > On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy  >
> > wrote:
> >>
> >> I was wondering about the status of the flink stop command.  At first
> >> blush it would seem as the preferable way to shutdown a Flink job, but
> it
> >> depends on StoppableFunction being implemented by sources and I notice
> that
> >> the Kafka source does not seem to implement it.  In addition, the
> command
> >> does not -s  --withSavepoint like cancel does.
> >>
> >> Is stop deprecated?
> >
> >
>


Re: Can't start cluster

2017-09-13 Thread Eron Wright
It appears that you're building Flink from source, and attempting to start
the cluster from the 'flink-dist/src/...' directory.   Please use
'flink-dist/target/...' instead since that's where the built distribution
is located.   For example, check:
flink-dist/target/flink-1.4-SNAPSHOT-bin/flink-1.4-SNAPSHOT/

Please refer to the updated instructions in Flink 1.3 documentation for
more information:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/cluster_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html


On Wed, Sep 13, 2017 at 6:57 AM, AndreaKinn  wrote:

> I printed also /flink-bin/bin folder:
>
> > root@giordano-2-2-100-1:~/flink-1.3.2/flink-dist/src/main/flink-bin/bin#
> > ls
> > config.sh  flink-console.sh  jobmanager.sh  start-cluster.sh
> > start-zookeeper-quorum.sh  stop-zookeeper-quorum.sh
> flink  flink-daemon.sh   pyflink.batstart-local.bat
> stop-cluster.shtaskmanager.sh
> flink.bat  historyserver.sh  pyflink.sh start-local.shstop-local.sh
> zookeeper.sh
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Handle event time

2017-09-11 Thread Eron Wright
As mentioned earlier, the watermark is the basis for reasoning about the
overall progression of time.   Many operators use the watermark to
correctly organize records, e.g. into the correct time-based window.
Within that window the records may still be unordered.   That said, some
operators do take pains to reorder the records, notably the Flink CEP
operator to correctly detect temporal patterns.  Basically, the operator
buffers records until a watermark arrives; all buffered records older than
the watermark may then be sorted and processed.

It is tempting to write a standalone operator that simply reorders records
as described, but subsequent repartitioning to downstream operators would
reintroduce disorder.  Therefore one must ensure that subsequent processing
is done with a 'forward' partitioning strategy.

Hope this helps!
Eron

On Fri, Sep 8, 2017 at 3:50 AM, AndreaKinn  wrote:

> Thank you, effectively I developed also a simple custom solution for
> watermark looking at flink doc but anyway I see unordered printed streams.
> I have a doubt about flink behaviour: if I understand, flink doesn't
> perform
> automatically reordering of records in a stream, so if for instance a
> record
> arrives in late what is the behaviour of flink? In the doc it's described
> that elements arrive after in late are dropped (allowed lateness default
> value is 0) but also using this watermark emitter:
>
> *public class CustomTimestampExtractor implements
> AssignerWithPeriodicWatermarks String, Double>>{
>
> private static final long serialVersionUID = 5448621759931440489L;
> private final long maxOutOfOrderness = 0;
> private long currentMaxTimestamp;
>
> @Override
> public long extractTimestamp(Tuple6 String,
> Double> element, long previousElementTimestamp) {
> long timestamp = element.f2.getTime();
> currentMaxTimestamp = Math.max(timestamp,
> currentMaxTimestamp);
> return timestamp;
> }
>
> @Override
> public Watermark getCurrentWatermark() {
> return new Watermark(currentMaxTimestamp -
> maxOutOfOrderness);
> }
> }*
>
> with maxOutOfOrderness = 0 I see unordered record in the stream.
>
> What I want to obtain is a fully ordered stream, is there a way to
> implement
> it?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Installing Apache Flink on Mesos Cluster without DC/OS

2017-09-11 Thread Eron Wright
Hi,
You do not need to install Flink onto the nodes.   The approach used by
Flink is that the task managers download a copy of Flink from the
appmaster.  The entire installation tree of Flink is downloaded (i.e. the
bin/lib/conf directories).   The only assumed dependency is Java, which may
be provided via a Docker image or via the JAVA_HOME environment variable
(see Mesos docs for --executor_environment_variables).

Let us know if you have further questions.

Hope this helps,
Eron

On Fri, Sep 8, 2017 at 10:22 AM, Rahul Raj  wrote:

> Hi,
>
> I am newbie in Apache Flink and our team is trying to set up an Apache
> Flink Cluster on Apaches Mesos. We have already installed Apache Mesos &
> Marathon with 3 Master nodes and 3 Slaves and now we are trying to install
> Apache Flink without DC/OS as mentioned here https://ci.apache.org/
> projects/flink/flink-docs-release-1.3/setup/mesos.html#mesos-without-dcos.
>
>
> I have couple of questions over here :
>
>1.
>
>Do we need to download Flink on all the nodes(master and slaves) and
>configure mesos.master in all nodes?
>2.
>
>Or Shall we download flink on only one master node and configure
>mesos.master over there?
>3.
>
>If flink needs to be downloaded on all the nodes then what should be
>the location of flink directory or if there is any script where I can
>specify that?
>4.
>
>Is running "mesos-appmaster.sh" on master node also responsible for
>running flink libraries and classes on slaves?
>
> Thanks
>
> Rahul Raj
>


Re: Jobmanager and Taskmanager

2017-09-11 Thread Eron Wright
Yes, a given host may be used as both a job manager and as a task manager.

On Mon, Sep 11, 2017 at 8:27 AM, AndreaKinn  wrote:

> Hi,
> I'm configuring a cluster composed by just three nodes.
> Looking at  cluster setup guide
>  release-0.8/cluster_setup.html>
> I'm setting in the jobmanager the addresses of the workers.
>
> A jobmanager can be used also as an additional taskmanager? i.e. can I put
> in workers list (in slaves file) also itself address?
>
> Regards,
> Andrea
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Flink Mesos Outstanding Offers - trouble launching task managers

2017-08-31 Thread Eron Wright
Please mail me more information, in particular the JM log and the
information on the 'offers' tab on the Mesos UI.   Also, are you using any
Mesos roles?

Thanks

On Thu, Aug 31, 2017 at 9:02 AM, prashantnayak <
prash...@intellifylearning.com> wrote:

> Hi Eron
>
> No, unfortunately we did not directly resolve it... we work around it for
> now by ensuring that our Mesos slaves are set up to correctly support the
> JobManager with offers.
>
> Prashant
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Flink Mesos Outstanding Offers - trouble launching task managers

2017-08-29 Thread Eron Wright
Hello, did you resolve this issue?

Thanks,
Eron Wright

On Wed, Jul 12, 2017 at 11:09 AM, Prashant Nayak <
prash...@intellifylearning.com> wrote:

>
> Hi
>
> We’re running Flink 1.3.1 on Mesos.
>
> From time-to-time, the Flink app master seems to have trouble with Mesos
> offers… At such time, it obviously ends up not launching the requested task
> managers (mesos.initial-tasks) and we’ve noticed situations where it
> launches zero tasks.  During such
> times we see a long list of “Outstanding Offers” in the Mesos UI.  At the
> same time, the app master logs have the following
>
>
> 2017-07-12 18:06:23.939 [flink-akka.actor.default-dispatcher-20] INFO
>  org.apache.flink.mesos.scheduler.LaunchCoordinator  - Processing 12
> task(s) against 0 new offer(s) plus outstanding offers.
> 2017-07-12 18:06:23.939 [flink-akka.actor.default-dispatcher-20] INFO
>  org.apache.flink.mesos.scheduler.LaunchCoordinator  - Resources
> considered: (note: expired offers not deducted from below)
> 2017-07-12 18:06:23.939 [flink-akka.actor.default-dispatcher-20] INFO
>  org.apache.flink.mesos.scheduler.LaunchCoordinator  -   10.80.xx.6 has
> 0.0 MB, 0.0 cpus
> 2017-07-12 18:06:23.939 [flink-akka.actor.default-dispatcher-20] INFO
>  org.apache.flink.mesos.scheduler.LaunchCoordinator  -   10.80.xx.233 has
> 0.0 MB, 0.0 cpus
> 2017-07-12 18:06:23.939 [flink-akka.actor.default-dispatcher-20] INFO
>  org.apache.flink.mesos.scheduler.LaunchCoordinator  - Waiting for more
> offers; 12 task(s) are not yet launched.
>
> The two Mesos agents above (10.80.xx.6, 10.80.xx.233) are listed as having
> offers outstanding to the Flink framework in the Mesos UI
>
> Appreciate any input on how to go about resolving such an issue.
>
> Thanks
> Prashant
>


Re: How to run a flink wordcount program

2017-08-17 Thread Eron Wright
Hello,
You can run the WordCount program directly within your IDE.  An embedded
Flink environment is automatically created.

-Eron

On Thu, Aug 17, 2017 at 8:03 AM, P. Ramanjaneya Reddy 
wrote:

> Thanks.
> But I want to run in debug mode..how to run without jar and using the
> wordcount class
>
>
>
> On 17 Aug 2017 8:30 p.m., "Chao Wang"  wrote:
>
>> The following quickstart offers an end-to-end instruction I think:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> quickstart/setup_quickstart.html
>>
>> Chao
>>
>> On 08/17/2017 08:25 AM, P. Ramanjaneya Reddy wrote:
>>
>>
>>
>> On Thu, Aug 17, 2017 at 6:42 PM, P. Ramanjaneya Reddy <
>> ramanji...@gmail.com> wrote:
>>
>>> Hi ALL,
>>>
>>> I'm new to flink and understanding the flink wordcount program.
>>>
>>> so downloaded the git hub
>>> https://github.com/apache/flink.git
>>>
>>> Can somebody help how to run wordcount example?
>>>
>>> Thanks
>>> Ramanjaneya
>>>
>>>
>>
>>


Re: [EXTERNAL] Re: Fink application failing with kerberos issue after running successfully without any issues for few days

2017-08-17 Thread Eron Wright
Raja,
According to those configuration values, the delegation token would be
automatically renewed every 24 hours, then expire entirely after 7 days.
You say that the job ran without issue for 'a few days'.  Can we conclude
that the job hit the 7-day DT expiration?

Flink supports the use of Kerberos keytabs as an alternative to delegation
tokens for exactly this reason, that delegation tokens eventually expire
and so aren't useful to a long-running program.   Consider making use of
keytabs here.

Hope this helps!
-Eron


On Thu, Aug 17, 2017 at 9:58 AM, Ted Yu  wrote:

> I think this needs to be done by the admin.
>
> On Thu, Aug 17, 2017 at 9:37 AM, Raja.Aravapalli <
> raja.aravapa...@target.com> wrote:
>
>>
>>
>> I don’t have access to the site.xml files, it is controlled by a support
>> team.
>>
>>
>>
>> Does flink has any configuration settings or api’s thru which we can
>> control this ?
>>
>>
>>
>>
>>
>> Regards,
>>
>> Raja.
>>
>>
>>
>> *From: *Ted Yu 
>> *Date: *Thursday, August 17, 2017 at 11:07 AM
>> *To: *Raja Aravapalli 
>> *Cc: *"user@flink.apache.org" 
>> *Subject: *Re: [EXTERNAL] Re: Fink application failing with kerberos
>> issue after running successfully without any issues for few days
>>
>>
>>
>> Can you try shortening renewal interval to something like 2880 ?
>>
>>
>>
>> Cheers
>>
>>
>>
>> On Thu, Aug 17, 2017 at 8:58 AM, Raja.Aravapalli <
>> raja.aravapa...@target.com> wrote:
>>
>> Hi Ted,
>>
>>
>>
>> Below is what I see in the environment:
>>
>>
>>
>> dfs.namenode.delegation.token.max-lifetime:  *60480*
>>
>> dfs.namenode.delegation.token.renew-interval:  *8640*
>>
>>
>>
>>
>>
>> Thanks.
>>
>>
>>
>>
>>
>> Regards,
>>
>> Raja.
>>
>>
>>
>> *From: *Ted Yu 
>> *Date: *Thursday, August 17, 2017 at 10:46 AM
>> *To: *Raja Aravapalli 
>> *Cc: *"user@flink.apache.org" 
>> *Subject: *[EXTERNAL] Re: Fink application failing with kerberos issue
>> after running successfully without any issues for few days
>>
>>
>>
>> What are the values for the following parameters ?
>>
>>
>>
>> dfs.namenode.delegation.token.max-lifetime
>>
>>
>>
>> dfs.namenode.delegation.token.renew-interval
>>
>>
>>
>> Cheers
>>
>>
>>
>> On Thu, Aug 17, 2017 at 8:24 AM, Raja.Aravapalli <
>> raja.aravapa...@target.com> wrote:
>>
>> Hi Ted,
>>
>>
>>
>> Find below the configuration I see in yarn-site.xml
>>
>>
>>
>> 
>>
>>   yarn.resourcemanager.proxy-user-privileges.enabled
>>
>>   true
>>
>> 
>>
>>
>>
>>
>>
>> Regards,
>>
>> Raja.
>>
>>
>>
>>
>>
>> *From: *Ted Yu 
>> *Date: *Wednesday, August 16, 2017 at 9:05 PM
>> *To: *Raja Aravapalli 
>> *Cc: *"user@flink.apache.org" 
>> *Subject: *[EXTERNAL] Re: hadoop
>>
>>
>>
>> Can you check the following config in yarn-site.xml ?
>>
>>
>>
>> yarn.resourcemanager.proxy-user-privileges.enabled (true)
>>
>>
>>
>> Cheers
>>
>>
>>
>> On Wed, Aug 16, 2017 at 4:48 PM, Raja.Aravapalli <
>> raja.aravapa...@target.com> wrote:
>>
>>
>>
>> Hi,
>>
>>
>>
>> I triggered an flink yarn-session on a running Hadoop cluster… and
>> triggering streaming application on that.
>>
>>
>>
>> But, I see after few days of running without any issues, the flink
>> application which is writing data to hdfs failing with below exception.
>>
>>
>>
>> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.
>> security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN
>> token xx for xx) can't be found in cache
>>
>>
>>
>>
>>
>> Can someone please help me how I can fix this. Thanks a lot.
>>
>>
>>
>>
>>
>>
>>
>> Regards,
>>
>> Raja.
>>
>>
>>
>>
>>
>>
>>
>
>


Re: Automate Job submission with container

2017-08-15 Thread Eron Wright
Flink 1.3 relies on a two-step approach of deploying the cluster on Mesos
and then deploying the job.   The cron task seems like a good approach;
maybe retry until the job is successfully deployed (using the 'detached'
option assumedly).   One complication is that the slots take some time to
come online, and the JM will reject the job submission until sufficient
slots are available.

One of the goals of the FLIP-6 improvement project is to allow for a
one-step deployment of cluster plus job. In the meantime please use a
workaround like you described.

Thanks

On Mon, Aug 14, 2017 at 6:04 PM, Biswajit Das  wrote:

> Hi There,
>
> A few weeks back I have posted here regarding flink docker container
> running on mesos. I'm able to run the same successfully; I'm heading
> towards full CI/CD deployment with the marathon, most of our deployment is
> automated via the marathon, and sometimes even I trigger metrics based auto
> scale from the marathon or another scaling service or move service with AWS
> spot instance randomly.
> Now how do I trigger the job automatically inside the container once JM &
> TM is up . I have tested few option and just wanted to validate with the
> group here. I have uber app jar already bundled inside the Docker container
> .
>
> Option .
>1. Schedule a one-time cron like task inside the container with flink
> run -d and some delay.
> 2.  Submit via REST, but it complicates the process as I have to find app
> DNS from my CI although I have this setup with Bamboo and HAProxy just want
> to avoid this complicated process.( given I have many apps running )
>
> I'm sure someone already came across the similar problem, please let me
> know if any one has any inputs or suggestion.
>
> ~ Biswajit
>


Re: kerberos yarn - failure in long running streaming application

2017-08-14 Thread Eron Wright
It sounds to me that the TGT is expiring (usually after 12 hours).   This
shouldn't happen in the keytab scenario because of a background thread
provided by Hadoop that periodically performs a re-login using the keytab.
  More details on the Hadoop internals here:
https://stackoverflow.com/a/34691071/3026310

To help narrow down the issue:
1. please share the stack trace (and, does the error occur on Job Manager
or on Task Manager?)
2. is kinit being called on the client prior to calling `flink run`?  (just
curious)
3. are you willing to share the Flink logs?

I'm happy to help if you prefer to share the the logs privately.

-Eron

On Mon, Aug 14, 2017 at 12:32 AM, Ted Yu  wrote:

> bq. security.kerberos.login.contexts: Client,KafkaClien
>
> Just curious: there is missing 't' at the end of the above line.
>
> Maybe a typo when composing the email ?
>
> On Sun, Aug 13, 2017 at 11:15 PM, Prabhu V  wrote:
>
>> Hi,
>>
>> I am running Flink-1.3.2 on yarn (Cloudera 2.6.0-cdh5.7.6). The
>> application stream data from kafka, groups by key, creates a session window
>> and writes to HDFS using a rich window function in the "window.apply"
>> method.
>>
>> The rich window function creates the sequence file thus
>>
>> SequenceFile.createWriter(
>> conf,
>> new Option[] {
>> Writer.file(new Path("flink-output/" + filePath)),
>> Writer.compression(CompressionType.BLOCK,
>> new DefaultCodec()),
>> Writer.keyClass(BytesWritable.class),
>> Writer.valueClass(BytesWritable.class) })
>>
>> The "conf" is created in the "open" method thus
>>
>> conf = HadoopFileSystem.getHadoopConfiguration();
>> for (Map.Entry entry :
>> parameters.toMap().entrySet()) {
>> conf.set(entry.getKey(), entry.getValue());
>> }
>>
>> where parameters is the flink.configuration.Configuration object that is
>> an argument to the open method
>>
>> The applications runs for about 10 hours before it fails with kerberos
>> error "Caused by: javax.security.sasl.SaslException: GSS initiate failed
>> [Caused by GSSException: No valid credentials provided (Mechanism level:
>> Failed to find any Kerberos tgt)]"
>>
>> The flink-conf.yaml has the following properties set.
>> security.kerberos.login.keytab: 
>> security.kerberos.login.principal:
>> security.kerberos.login.contexts: Client,KafkaClien
>>
>> Any help would be appreciated.
>>
>>
>> Thanks,
>> Prabhu
>>
>
>


Re: Using Hadoop 2.8.0 in Flink Project for S3A Path Style Access

2017-08-09 Thread Eron Wright
For reference: [FLINK-6466] Build Hadoop 2.8.0 convenience binaries

On Wed, Aug 9, 2017 at 6:41 AM, Aljoscha Krettek 
wrote:

> So you're saying that this works if you manually compile Flink for Hadoop
> 2.8.0? If yes, I think the solution is that we have to provide binaries for
> Hadoop 2.8.0. If we did that with a possible Flink 1.3.3 release and
> starting from Flink 1.4.0, would this be an option for you?
>
> Best,
> Aljoscha
>
> On 11. Jul 2017, at 10:47, Mustafa AKIN  wrote:
>
> Hi all,
>
> I am trying to use S3 backend with custom endpoint. However, it is not
> supported in hadoop-aws@2.7.3, I need to use at least 2.8.0 version. The
> underyling reason is that the requests are being sent as following
>
> DEBUG [main] (AmazonHttpClient.java:337) - Sending Request: HEAD
> http://mustafa.localhost:9000 / Headers:
>
> Because "fs.s3a.path.style.access" is not recognized in old version.I want
> the domain to remain same, the bucket name to be appended in the path (
> http://localhost:9000/mustafa/.. .)
>
> I cannot blindly increase aws-java-sdk version to latest, it causes:
>
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> com.amazonaws.ClientConfiguration
> at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(
> S3AFileSystem.java:182)
>
> So, If I increase the hadoop-aws to 2.8.0 with latest client, it causes
> the following error:
>
>
> According to, I need hadoop-aws@2.7.2 and
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/aws.html#provide-s3-filesystem-dependency
>
> Caused by: java.lang.IllegalAccessError: tried to access method
> org.apache.hadoop.metrics2.lib.MutableCounterLong.(
> Lorg/apache/hadoop/metrics2/MetricsInfo;J)V from class
> org.apache.hadoop.fs.s3a.S3AInstrumentation
> at org.apache.hadoop.fs.s3a.S3AInstrumentation.streamCounter(
> S3AInstrumentation.java:194)
>
>
> Should I be excluding hadoop-common from Flink somehow? Building flink
> from source with mvn clean install -DskipTests -Dhadoop.version=2.8.0 works
> but I want to manage it via maven as much as possible.
>
>
>


Re: Flink REST API async?

2017-08-07 Thread Eron Wright
When you submit a program via the REST API, the main method executes inside
the JobManager process.Unfortunately a static variable is used to
establish the execution environment that the program obtains from
`ExecutionEnvironment.getExecutionEnvironment()`.  From the stack trace it
appears that two main methods are executing simultaneously and one is
corrupting the other.

On Mon, Aug 7, 2017 at 8:21 AM, Francisco Gonzalez Barea <
francisco.gonza...@piksel.com> wrote:

> Hi there!
>
> We are doing some POCs submitting jobs remotely to Flink. We tried with
> Flink CLI and now we´re testing the Rest API.
>
> So the point is that when we try to execute a set of requests in an async
> way (using CompletableFutures) only a couple of them run successfully. For
> the rest we get the exception copied at the end of the email.
>
> Do you know the reason for this?
>
> Thanks in advance!!
> Regards,
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
>   at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:545)
>   at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:419)
>   at org.apache.flink.client.program.OptimizerPlanEnvironment.
> getOptimizedPlan(OptimizerPlanEnvironment.java:80)
>  at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(
> ClusterClient.java:318)
>  at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.
> getJobGraphAndClassLoader(JarActionHandler.java:72)
>  at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.
> handleJsonRequest(JarRunHandler.java:61)
>  at org.apache.flink.runtime.webmonitor.handlers.
> AbstractJsonRequestHandler.handleRequest(AbstractJsonRequestHandler.
> java:41)
>  at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.
> respondAsLeader(RuntimeMonitorHandler.java:109)
>  at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.
> channelRead0(RuntimeMonitorHandlerBase.java:97)
>  at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.
> channelRead0(RuntimeMonitorHandlerBase.java:44)
>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:339)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
>  at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
>  at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(
> DualAbstractHandler.java:57)
>  at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(
> DualAbstractHandler.java:20)
>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:339)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
>  at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(
> HttpRequestHandler.java:159)
>  at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(
> HttpRequestHandler.java:65)
>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:339)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
>  at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(
> ChannelInboundHandlerAdapter.java:86)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:339)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
>  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(
> ByteToMessageDecoder.java:242)
>  at io.netty.channel.CombinedChannelDuplexHandler.channelRead(
> CombinedChannelDuplexHandler.java:147)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:339)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
>  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(
> DefaultChannelPipeline.java:847)
>  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(
> AbstractNioByteChannel.java:131)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKey(
> NioEventLoop.java:511)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
> NioEventLoop.java:468)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(
> NioEventLoop.java:382)
>  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>  at io.netty.util.concurrent.SingleThreadEventExecutor$2.
> 

Re: blob store defaults to /tmp and files get deleted

2017-08-04 Thread Eron Wright
The directory referred to by `blob.storage.directory` is best described as
a local cache.  For recovery purposes the JARs are also stored in `
high-availability.storageDir`.At least that's my reading of the code in
1.2.   Maybe there's some YARN specific behavior too, sorry if this
information is incomplete.

https://github.com/apache/flink/blob/release-1.2.1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java#L106
https://github.com/apache/flink/blob/release-1.2.1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java#L362
https://github.com/apache/flink/blob/release-1.2.1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java#L58
https://github.com/apache/flink/blob/release-1.2.1/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java#L57
https://github.com/apache/flink/blob/release-1.2.1/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java#L135


On Fri, Aug 4, 2017 at 11:56 AM, Shannon Carey  wrote:

> Stephan,
>
> Regarding your last reply to http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/blob-store-
> defaults-to-tmp-and-files-get-deleted-td11720.html
>
> You mention "Flink (via the user code class loader) actually holds a
> reference to the JAR files in "/tmp", so even if "/tmp" get wiped, the JAR
> file remains usable by the class loader". In my understanding, even if
> that's true, it doesn't work over a failure of the JobManager/TaskManager
> process, because the handle would be lost and the file would be gone.
>
> We're still running Flink 1.2.1, so maybe we're missing out on some of the
> improvements that have been made. However, we recently had a problem with a
> batch (DataSet) job not restarting successfully, apparently after a
> JobManager failure. This particular job runs in AWS EMR (on Yarn) which
> means that only one JobManager is run at a time, and when it fails it gets
> restarted.
>
> Here's what I can see from the logs. When the job restarts, it goes from
> CREATED -> RUNNING state, and then logs:
>
> 23:23:56,798 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>(flink-akka.actor.default-dispatcher-55): Job com.expedia…MyJob (
> c58185a78dd64cfc9f12374bd1f9a679) switched from state RUNNING to
> SUSPENDED.
> java.lang.Exception: JobManager is no longer the leader.
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:319)
>
> I assume that's normal/expected, because the JobManager was restarted but
> some portion of the job state is still referring to the old one. Next,
> YarnJobManager logs: "Attempting to recover job
> c58185a78dd64cfc9f12374bd1f9a679." However, it subsequently fails:
>
> 2017-08-03 00:09:18,991 WARN  org.apache.flink.yarn.YarnJobManager
>(flink-akka.actor.default-dispatcher-96): Failed to
> recover job c58185a78dd64cfc9f12374bd1f9a679.
> java.lang.Exception: Failed to retrieve the submitted job graph from state
> handle.
> at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStor
> e.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:180)
> …
> Caused by: java.lang.RuntimeException: Unable to instantiate the hadoop
> input format
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.
> readObject(HadoopInputFormatBase.java:319)
> …
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:305)
> at org.apache.flink.runtime.state.RetrievableStreamStateHandle.
> retrieveState(RetrievableStreamStateHandle.java:58)
> at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStor
> e.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:178)
> ... 15 more
> Caused by: java.lang.ClassNotFoundException: org.apache.parquet.avro.
> AvroParquetInputFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.
> readObject(HadoopInputFormatBase.java:317)
> ... 69 more
>
> The missing class comes from our job, so it seems like the job jar isn't
> present on the classpath of the JobManager. When I look at the contents of
> our configured blob storage directory (we're not using /tmp), I see
> subfolders like:
>
> blobStore-7d40f1b9-7b06-400f-8c05-b5456adcd7f1
> blobStore-f2d7974c-7d86-4b11-a7fb-d1936a4593ed
>
> Only one of the two has a JAR in it, so it looks like there's a new
> directory created for each new JobManager. When I look in Zookeeper at
> nodes such as "/flink/main/jobgraphs/c58185a78dd64cfc9f12374bd1f9a679", I
> don't see those directories mentioned. I am wondering if someone can
> 

Re: json mapper

2017-08-03 Thread Eron Wright
I think your snippet looks good.  The Jackson ObjectMapper is designed to
be reused by numerous threads, and routinely stored as a static field.  It
is somewhat expensive to create.

Hope this helps,
-Eron

On Thu, Aug 3, 2017 at 7:46 AM, Nico Kruber  wrote:

> Hi Peter,
> I'm no Scala developer but I may be able to help with some concepts:
>
> * a static reference used inside a [Map]Function will certainly cause
> problems
> when executed in parallel in the same JVM, e.g. a TaskManager with multiple
> slots, depending on whether this static object is stateful and/or
> thread-safe
> * additionally, not all parallel instances of your map may be executed in
> the
> same JVM, e.g. on multiple TaskManagers, so you cannot assume that the
> state
> of the JsonMapper is consistent among them
> * if the ObjectMapper does not store any state that is worth recovering
> during
> a failure (none that I see from https://fasterxml.github.io/
> jackson-databind/
> javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html if that is
> the
> one you are using), then you don't need to put it into flink state but can
> either initialise it as a (non-static) member of your MapFunction class or
> even in your map function itself
> * for the correct use of keyed/non-keyed state, please refer to my other
> email
> or [1]
> * for 'class' vs. 'object': if you're using
> com.fasterxml.jackson.databind.ObjectMapper as described above, you'll
> have
> state again ("It will use instances of JsonParser and JsonGenerator for
> implementing actual reading/writing of JSON. " from the docs) but in
> general,
> it is a good question whether the singleton would work for stateless
> operators
> and whether it actually improves performance.
>
>
> Nico
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/
> state.html
>
> On Thursday, 3 August 2017 12:41:36 CEST Peter Ertl wrote:
> > Hi flink users,
> >
> > I just wanted to ask if this kind of scala map function is correct?
> >
> > object JsonMapper {
> >   private val mapper: ObjectMapper = new ObjectMapper()
> > }
> >
> > class JsonMapper extends MapFunction[String, ObjectNode] {
> >   override def map(value: String): ObjectNode =
> > JsonMapper.mapper.readValue(value, classOf[ObjectNode]) }
> >
> > Is using a static reference to ObjectMapper fine or will this cause
> issues
> > on a distributed cluster / with checkpoint / serializing state /
> whatever ?
> >
> > Or should I instead use a non-transient property initialized in ctor
> > (ObjectMapper is java.io.Serializable) ?
> >
> > Or should I initialize it with RichMapFunction.open into a transient
> > property?
> >
> > Also I am wondering if replacing 'class' with 'object' (=> singleton)
> >
> > object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ }
> >
> > is ok (actually the mapper is stateless so no obvious need to
> re-instantiate
> > it again and again ? )
> >
> > Thanks and best regards
> > Peter
>
>


Re: multiple users per flink deployment

2017-08-02 Thread Eron Wright
One of the key challenges is isolation, eg. ensuring that one job cannot
access the credentials of another.  The easiest solution today is to use
the YARN deployment mode, with a separate app per job.  Meanwhile,
improvements being made under the FLIP-6 banner for 1.4+ are lying
groundwork for a multiuser experience.

Hope this helps!

On Aug 2, 2017 8:29 AM, "Georg Heiler"  wrote:

> Thanks for the overview.
> Currently a single flink cluster seems to run all tasks with the same
> user. I would want to be able to run each flink job as a separate user
> instead.
>
> The update for separate read/write users is nice though.
> Tzu-Li (Gordon) Tai  schrieb am Mi. 2. Aug. 2017 um
> 10:59:
>
>> Hi,
>>
>> There’s been quite a few requests on this recently on the mailing lists
>> and also mentioned by some users offline, so I think we may need to start
>> with plans to probably support this.
>> I’m CC’ing Eron to this thread to see if he has any thoughts on this, as
>> he was among the first authors driving the Kerberos support in Flink.
>> I’m not really sure if such a feature support makes sense, given that all
>> jobs of a single Flink deployment have full privileges and therefore no
>> isolation in between.
>>
>> Related question: what external service are you trying to authenticate to
>> with different users?
>> If it is Kafka and perhaps you have different users for the consumer /
>> producer, that will be very soon available in 1.3.2, which includes a
>> version bump to Kafka 0.10 that allows multiple independent users within
>> the same JVM through dynamic JAAS configuration.
>> See this mail thread [1] for more detail on that.
>>
>> Cheers,
>> Gordon
>>
>> [1] http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Kafka-0-10-jaas-multiple-clients-td12831.html#a13317
>>
>> On 1 August 2017 at 6:16:08 PM, Georg Heiler (georg.kf.hei...@gmail.com)
>> wrote:
>>
>> Hi,
>>
>> flink currently only seems to support a single kerberos ticket for
>> deployment. Are there plans to support different users per each job?
>>
>> regards,
>> Georg
>>
>>


Re: Eventime window

2017-08-02 Thread Eron Wright
Note that the built-in `BoundedOutOfOrdernessTimestampExtractor` generates
watermarks based only on the timestamp of incoming events.   Without new
events, `BoundedOutOfOrdernessTimestampExtractor` will not advance the
event-time clock. That explains why the window doesn't trigger immediately
after 10s; a new event must arrive to advance the watermark.

As Timo said, a different implementation of `AssignerWithPeriodicWatermark
s` could behave differently.   Have a look at the implementation of `
TimestampsAndPeriodicWatermarksOperator` within Flink to better understand
how `AssignerWithPeriodicWatermarks` is invoked.

On Wed, Aug 2, 2017 at 8:13 AM, Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Thanks Timo. Basically my requirement is based on event time the window
> has to be created but the trigger has to happen either when it has received
> the next element >10s or 10s has passed. Exactly the same way as you
> described. Let me try the AssignerWithPeriodicWatermarks approach.
>
> Thanks,
> Govind
>
> On Aug 2, 2017, at 7:46 AM, Timo Walther  wrote:
>
> I forgot about the AssignerWithPeriodicWatermarks [1]. I think it could
> solve your problem easily.
>
> Timo
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_
> timestamps_watermarks.html#with-periodic-watermarks
>
> Am 02.08.17 um 16:30 schrieb Timo Walther:
>
> The question is what defines your `10 seconds`. In event-time the incoming
> events determine when 10 seconds have passed. Your description sounds like
> you want to have results after 10 seconds wall-clock/processing-time. So
> either you use a processing-time window or you implement a custom trigger
> that triggers both on event-time or on a timer that you have set after 10 s
> processing-time.
>
> Timo
>
>
> Am 02.08.17 um 16:20 schrieb Govindarajan Srinivasaraghavan:
>
> Thanks Timo. The next message will arrive only after a minute or so. Is
> there a way to evict whatever is there in window buffer just after 10
> seconds irrespective of whether a new message arrives or not.
>
> Thanks,
> Govind
>
> On Aug 2, 2017, at 6:56 AM, Timo Walther  wrote:
>
> Hi Govind,
>
> if the window is not triggered, this usually indicates that your timestamp
> and watermark assignment is not correct. According to your description, I
> don't think that you need a custom trigger/evictor. How often do events
> arrive from one device? There must be another event from the same device
> that has a timestamp >10s in order to trigger the window evaluation.
>
> Instead of using the Kafka timestamp, maybe you could also convert your
> timestamps to UTC in the TimestampExtractor.
>
> There are no official limitation. However, each window comes with some
> overhead. So you should choose your memory/state backends and parallelism
> accordingly.
>
> Hope that helps.
>
> Timo
>
>
> Am 02.08.17 um 06:54 schrieb Govindarajan Srinivasaraghavan:
>
> Hi,
>
> I have few questions regarding event time windowing. My scenario is
> devices from various timezones will send messages with timestamp and I need
> to create a window per device for 10 seconds. The messages will mostly
> arrive in order.
>
> Here is my sample code to perform windowing and aggregating the messages
> after the window to further process it.
>
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("STREAM1",
> new DeserializationSchema(),
> kafkaConsumerProperties);
>
> DataStream msgStream = streamEnv
> .addSource(consumer)
> .assignTimestampsAndWatermarks(new TimestampExtractor(Time.of(100,
> TimeUnit.MILLISECONDS))); // TimestampExtractor implements
> BoundedOutOfOrdernessTimestampExtractor
>
> KeyedStream keyByStream = msgStream.keyBy(new
> CustomKeySelector());
>
> WindowedStream windowedStream =
> keyByStream.window(TumblingEventTimeWindows.of(
> org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));
>
> SingleOutputStreamOperator aggregatedStream =
> windowedStream.apply(new AggregateEntries());
>
> My questions are
>
> - In the above code, data gets passed till the window function but even
> after window time the data is not received in the apply function. Do I have
> to supply a custom evictor or trigger?
>
> - Since the data is being received from multiple timezones and each device
> will have some time difference, would it be ok to assign the timestamp as
> that of received timestamp in the message at source (kafka). Will there be
> any issues with this?
>
> - Are there any limitations on the number of time windows that can be
> created at any given time? In my scenario if there are 1 million devices
> there will be 1 million tumbling windows.
>
> Thanks,
> Govind
>
>
>
>
>


Re: Are timers in ProcessFunction fault tolerant?

2017-05-25 Thread Eron Wright
Yes, registered timers are stored in managed keyed state and should be
fault-tolerant.

-Eron

On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia  wrote:

> With a checkpointed RocksDB based state backend, can I expect the
> registered processing timers to be fault tolerant? (along with the managed
> keyed state).
>
> Example -
> A task manager instance owns the key k1 (from a keyed stream) that has
> registered a processing timer with a timestamp thats a day ahead in the
> future. If this instance is killed, and the key is moved to another
> instance, will the onTimer trigger correctly on the other machine at the
> expected time with the same keyed state (for k1)?
>
> Thanks,
> Moiz
>


Re: Kafka 0.10 jaas multiple clients

2017-05-25 Thread Eron Wright
Gordon's suggestion seems like a good way to provide per-job credentials
based on application-specific properties.   In contrast, Flink's built-in
JAAS features are aimed at making the Flink cluster's Kerberos credentials
available to jobs.

I want to reiterate that all jobs (for a given Flink cluster) run with full
privilege in the JVM, and that Flink does not guarantee isolation (of
security information) between jobs.   My suggestion is to not run untrusted
job code in a shared Flink cluster.

On Wed, May 24, 2017 at 8:46 PM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Gwenhael,
>
> Follow-up for this:
>
> Turns out what you require is already available with Kafka 0.10, using
> dynamic JAAS configurations [1] instead of a static JAAS file like what
> you’re currently doing.
>
> The main thing to do is to set a “sasl.jaas.config” in the config
> properties for your individual Kafka consumer / producer.
> This will override any static JAAS configuration used.
> Note 2 things here: 1) static JAAS configurations are a JVM process-wide
> installation, meaning using that any separate Kafka client within the same
> process can always only share the same credentials and 2) the “KafkaClient”
> is a fixed JAAS lookup section key that the Kafka clients use, which I
> don’t think is modifiable. So using the static config approach would never
> work.
>
> An example “sasl.jaas.config” for plain logins:
> "org.apache.kafka.common.security.plain.PlainLoginModule required
> username= password=
>
> Simply have different values for each of the Kafka consumer / producers
> you’re using.
>
> Cheers,
> Gordon
>
>
> On 8 May 2017 at 4:42:07 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org)
> wrote:
>
> Hi Gwenhael,
>
> Sorry for the very long delayed response on this.
>
> As you noticed, the “KafkaClient” entry name seems to be a hardcoded thing
> on the Kafka side, so currently I don’t think what you’re asking for is
> possible.
>
> It seems like this could be made possible with some of the new
> authentication features in Kafka 0.10 that seems related: [1] [2].
>
> I’m not that deep into the authentication modules, but I’ll take a look
> and can keep you posted on this.
> Also looping in Eron (in CC) who could perhaps provide more insight on
> this at the same time.
>
> Cheers,
> Gordon
>
> [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 83+-+Allow+multiple+SASL+authenticated+Java+clients+in+
> a+single+JVM+process
> [2] https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 85%3A+Dynamic+JAAS+configuration+for+Kafka+clients
>
> On 26 April 2017 at 8:48:20 PM, Gwenhael Pasquiers (
> gwenhael.pasqui...@ericsson.com) wrote:
>
> Hello,
>
> Up to now we’ve been using kafka with jaas (plain login/password) the
> following way:
>
> -  yarnship the jaas file
>
> -  add the jaas file name into “flink-conf.yaml” using property
> “env.java.opts”
>
>
>
> How to support multiple secured kafka 0.10 consumers and producers (with
> different logins and password of course) ?
>
> From what I saw in the kafka sources, the entry name “KafkaClient” is
> hardcoded…
>
> Best Regards,
>
>
>
> Gwenhaël PASQUIERS
>
>


Re: How can I handle backpressure with event time.

2017-05-25 Thread Eron Wright
Try setting the assigner on the Kafka consumer, rather than on the
DataStream:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/
kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

I believe this will produce a per-partition assigner and forward only the
minimum watermark across all partitions.

Hope this helps,
-Eron

On Thu, May 25, 2017 at 3:21 AM, yunfan123 
wrote:

> For example, I want to merge two kafka topics (named topicA and topicB) by
> the specific key with a max timeout.
> I use event time and class BoundedOutOfOrdernessTimestampExtractor to
> generate water mark.
> When some partitions of topicA be delayed by backpressure, and the delays
> exceeds my max timeout.
> It results in all of my delayed partition in topicA (also corresponding
> data
> in topicB) can't be merged.
> What I want is if backpressure happens, consumers can only consume depends
> on my event time.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/How-can-I-
> handle-backpressure-with-event-time-tp13313.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>