high-availability.jobmanager.port vs jobmanager.rpc.port

2017-09-23 Thread Elias Levy
I am wondering why HA mode there is a need for a separate config parameter
to set the JM RPC port (high-availability.jobmanager.port) and why this
parameter accepts a range, unlike jobmanager.rpc.port.


History Server

2017-09-23 Thread Elias Levy
I am curious, why is the History Server a separate process and Web UI
instead of being part of the Web Dashboard within the Job Manager?


Re: LocalStreamEnvironment - configuration doesn't seem to be used in RichFunction operators

2017-09-23 Thread Tzu-Li (Gordon) Tai
Yes, I agree that the behavior can be quite surprising, and if not stated 
somewhere in the docs already we should update it.

Pass in a Serializable "injector"/proxy object in the constructor
In the "open" (or body of the function) get the things/initialize stuff I want 
that may or may not be Serializable, e.g. an HTTP client or database connection 
from that object
Don't use the Configuration instance since it doesn't do anything anyways
Yes, I think you’re on the right track with this :)

Cheers,
Gordon


On 22 September 2017 at 11:08:21 PM, Michael Kobit (mko...@gmail.com) wrote:

Thanks for the response.

That is a bit surprising that it is always a new instance given the various API 
signatures that take in a Configuration instance. The best practices docs 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-the-parameters-in-your-flink-program
 ) also sort of mention it, but I just noticed most of those seem like they are 
for the DataSet API rather than the DataStream API (I don't know if there is a 
big difference between the programming APIs there). I'm still new to these 
things, so I could be making invalid assumptions, too.

I think I have a simple idea for how to get dependency style injection working 
anyways by.
Pass in a Serializable "injector"/proxy object in the constructor
In the "open" (or body of the function) get the things/initialize stuff I want 
that may or may not be Serializable, e.g. an HTTP client or database connection 
from that object
Don't use the Configuration instance since it doesn't do anything anyways
I haven't thought through any possible security holes or considerations with 
this approach yet.

Thanks for the response, that clears up my confusion - now just to explore and 
find some better ways to test this stuff!

On Fri, Sep 22, 2017 at 11:51 AM Tzu-Li (Gordon) Tai  
wrote:
Hi,

The passing in of a Configuration instance in the open method is actually a 
leftover artifact of the DataStream API that remains only due to API backwards 
compatibility reasons.
There’s actually no way to modify what configuration is retrieved there (and it 
is actually always a new empty Configuration).

Normally, to inject dependencies into your operators, you would simply do that 
be supplying it through the constructor of the operator, and store it as class 
fields for future use in the operator work methods.
Make sure that they are serializable, as the operator will need to be 
serialized when deploying the job. I’m assuming that this should be possible 
for you anyway, since you were trying to write that information into the 
Configuration.

Hope this helps!

Cheers,
Gordon


On 20 September 2017 at 11:25:41 PM, Michael Kobit (mko...@gmail.com) wrote:

I'm new to Flink and in the process of trying to write a few operators and 
tests for them. One of the issues I've ran into is "how do I properly set up 
the dependencies for an operator". I've discovered the serialization 
constraints and learned about the execution some model as I've started to 
progress through it, but I'm still struggling to find an analog for dependency 
injection in Flink.

I was experimenting with different ways to supply configuration for the *Rich* 
functions to basically set themselves up and tear themselves down with their 
dependencies on open/close. I wanted to basically "inject" a dependency say 
like an HTTP client that caches, and then mock that dependency for a local test 
instead of actually making HTTP calls. It seemed like it could be done by 
getting and getting the correct implementation types from the config using some 
custom injector type (analogous to Spring or Guice dependency injection). I 
know I have to deal serialization of the operators, which is why I was thinking 
I could do this in open/close and have the magical injector be serializable 
(and possibly be part of the config). This may or may not be a bad idea 
already, but bear with me (and any feedback is very appreciated).

I was doing some local testing using StreamExecutionEnvironment, but wasn't 
able to actually pass in configuration options to the local stream execution.

I tried it these ways:
Create with a config - StreamExecutionEnvironment.createLocalEnvironment(1, 
configuration);
Configure the created LocalStreamEnvironment by 
env.getConfig().setGlobalJobParameters(configuration)
Configure thte DataStremSource by 
source.getExecutionConfig().setGlobalJobParameters(configuration)
Configure the SingleOutputStreamOperator by 
mapped.getExecutionConfig().setGlobalJobParameters(configuration)
All 4 of those failed, so I felt like I am doing something wrong here, and 
wanted to reach out.

Here is the example code where all of those tests failing:

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import 

Re: question on sideoutput from ProcessWindow function

2017-09-23 Thread Chen Qin
Be lated update.

actually @phoenixjiangnan is already working on this
​https://issues.apache.org/jira/browse/FLINK-7635



On Sat, Sep 23, 2017 at 8:26 AM, Ufuk Celebi  wrote:

> +1
>
> Created an issue here: https://issues.apache.org/jira/browse/FLINK-7677
>
>
> On Thu, Sep 14, 2017 at 11:51 AM, Aljoscha Krettek 
> wrote:
> > Hi,
> >
> > Chen is correct! I think it would be nice, though, to also add that
> > functionality for ProcessWindowFunction and I think this should be easy
> to
> > do since the interface is very similar to ProcessFunction and we could
> also
> > add that to the Context.
> >
> > Best,
> > Aljoscha
> >
> > On 9. Sep 2017, at 06:22, Chen Qin  wrote:
> >
> > Hi Prabhu,
> >
> > That is good question, the short answer is not yet, only ProcessFunction
> was
> > given flexibility of doing customized sideoutput at the moment.
> > Window Function wasn't given such flexibility partially due to sideoutput
> > initially targeting late arriving event for window use cases.
> >
> > +@Aljoscha might have better picture on this question.
> >
> > Thanks,
> > Chen
> >
> > On Fri, Sep 8, 2017 at 7:19 PM, Prabhu V  wrote:
> >>
> >> Hi,
> >>
> >> Can we have a side output from a process window function ?
> >>
> >> I am currently genrating a side output from a process function. The main
> >> output of the process function is then Windowed and a
> ProcessWindowFunction
> >> is applied on the windows.
> >>
> >> Can I add to the SideOutpuStream from the ProcessWindowFunction. I am
> >> unable to find any api that enables this functionality.
> >>
> >> Thanks,
> >> Prabhu
> >
> >
> >
>


Unable to run flink job after adding a new dependency (cannot find Main-class)

2017-09-23 Thread Federico D'Ambrosio
Hello everyone,

I'd like to submit to you this weird issue I'm having, hoping you could
help me.
Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink 1.3.2
compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
So, I'm trying to implement an sink for Hive so I added the following
dependency in my build.sbt:

"org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
"1.2.1000.2.6.1.0-129"

in order to use hive streaming capabilities.

After importing this dependency, not even using it, if I try to flink run
the job I get

org.apache.flink.client.program.ProgramInvocationException: The program's
entry point class 'package.MainObj' was not found in the jar file.

If I remove the dependency, everything goes back to normal.
What is weird is that if I try to use sbt run in order to run job, *it does
find the Main class* and obviously crash because of the missing flink core
dependencies (AbstractStateBackend missing and whatnot).

Here are the complete dependencies of the project:

"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
"org.apache.flink" %% "flink-cep-scala" % flinkVersion,
"org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
"1.2.1000.2.6.1.0-129",
"org.joda" % "joda-convert" % "1.8.3",
"com.typesafe.play" %% "play-json" % "2.6.2",
"org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2",
"org.scalactic" %% "scalactic" % "3.0.1",
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"de.javakaffee" % "kryo-serializers" % "0.42"

Could it be an issue of dependencies conflicts between mongo-hadoop and
hive hadoop versions (respectively 2.7.1 and  2.7.3.2.6.1.0-129, even
though no issue between mongodb-hadoop and flink)? I'm even starting to
think that Flink cannot handle that well big jars (before the new
dependency it was 44M, afterwards it became 115M) when it comes to
classpath loading?

Any help would be really appreciated,
Kind regards,
Federico


Re: question on sideoutput from ProcessWindow function

2017-09-23 Thread Ufuk Celebi
+1

Created an issue here: https://issues.apache.org/jira/browse/FLINK-7677


On Thu, Sep 14, 2017 at 11:51 AM, Aljoscha Krettek  wrote:
> Hi,
>
> Chen is correct! I think it would be nice, though, to also add that
> functionality for ProcessWindowFunction and I think this should be easy to
> do since the interface is very similar to ProcessFunction and we could also
> add that to the Context.
>
> Best,
> Aljoscha
>
> On 9. Sep 2017, at 06:22, Chen Qin  wrote:
>
> Hi Prabhu,
>
> That is good question, the short answer is not yet, only ProcessFunction was
> given flexibility of doing customized sideoutput at the moment.
> Window Function wasn't given such flexibility partially due to sideoutput
> initially targeting late arriving event for window use cases.
>
> +@Aljoscha might have better picture on this question.
>
> Thanks,
> Chen
>
> On Fri, Sep 8, 2017 at 7:19 PM, Prabhu V  wrote:
>>
>> Hi,
>>
>> Can we have a side output from a process window function ?
>>
>> I am currently genrating a side output from a process function. The main
>> output of the process function is then Windowed and a ProcessWindowFunction
>> is applied on the windows.
>>
>> Can I add to the SideOutpuStream from the ProcessWindowFunction. I am
>> unable to find any api that enables this functionality.
>>
>> Thanks,
>> Prabhu
>
>
>


Re: Get EOF from PrometheusReporter in JM

2017-09-23 Thread Tony Wei
Hi Chesnay,

I built another flink cluster using version 1.4, set the log level to
DEBUG, and I found that the root cause might be this exception:
*java.lang.NullPointerException:
Value returned by gauge lastCheckpointExternalPath was null*.

I updated `CheckpointStatsTracker` to ignore external path when it is null,
and this exception didn't happen again. The prometheus reporter works as
well.

I have created a Jira issue for it:
https://issues.apache.org/jira/browse/FLINK-7675
, and I will submit the
PR after I passed Travis CI for my repository.

Best Regards,
Tony Wei



2017-09-22 22:20 GMT+08:00 Tony Wei :

> Hi Chesnay,
>
> I didn't try it in 1.4, so I have no idea if this also occurs in 1.4.
> For my setting for logging, It have already set to INFO level, but there
> wasn't any error or warning in log file as well.
>
> Best Regards,
> Tony Wei
>
> 2017-09-22 22:07 GMT+08:00 Chesnay Schepler :
>
>> The Prometheus reporter should work with 1.3.2.
>>
>> Does this also occur with the reporter that currently exists in 1.4? (to
>> rule out new bugs from the PR).
>>
>> To investigate this further, please set the logging level to WARN and try
>> again, as all errors in the metric system are logged on that level.
>>
>>
>> On 22.09.2017 10:33, Tony Wei wrote:
>>
>> Hi,
>>
>> I have built the Prometheus reporter package from this PR
>> https://github.com/apache/flink/pull/4586, and used it on Flink 1.3.2 to
>> record every default metrics and those from `FlinkKafkaConsumer`.
>>
>> Originally, everything was fine. I could get those metrics in TM from
>> Prometheus just like I saw on Flink Web UI.
>> However, when I turned to JM, I found Prometheus gives this error to me: Get
>> http://localhost:9249/metrics: EOF.
>> I checked the log on JM and saw nothing in it. There was no error message
>> and 9249 port was still alive.
>>
>> To figure out what happened, I created another cluster and I found
>> Prometheus could connect to Flink cluster if there is no running job. After
>> JM triggered or completed the first checkpoint, Prometheus started getting
>> ERR_EMPTY_RESPONSE from JM, but not for TM. There was still no error in
>> log file and 9249 port was still alive.
>>
>> I was wondering where did the error occur. Flink or Prometheus reporter?
>> Or It is incorrect to use Prometheus reporter on Flink 1.3.2 ? Thank you.
>>
>> Best Regards,
>> Tony Wei
>>
>>
>>
>


Re: akka timeout

2017-09-23 Thread Steven Wu
just to close the thread. akka death watch was triggered by high GC pause,
which is caused by memory leak in our code during Flink job restart.

noted that akka.ask.timeout wasn't related to akka death watch, which Flink
has documented and linked.

On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu  wrote:

> this is a stateless job. so we don't use RocksDB.
>
> yeah. network can also be a possibility. will keep it in the radar.
> unfortunately, our metrics system don't have the tcp metrics when running
> inside containers.
>
> On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger 
> wrote:
>
>> Hi,
>> are you using the RocksDB state backend already?
>> Maybe writing the state to disk would actually reduce the pressure on the
>> GC (but of course it'll also reduce throughput a bit).
>>
>> Are there any known issues with the network? Maybe the network bursts on
>> restart cause the timeouts?
>>
>>
>> On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu  wrote:
>>
>>> Bowen,
>>>
>>> Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC
>>> pause and akka timeout was happening. So maybe memory allocation and GC
>>> wasn't really an issue. I also recently learned that JVM can pause for
>>> writing to GC log for disk I/O. that is another lead I am pursuing.
>>>
>>> Thanks,
>>> Steven
>>>
>>> On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li 
>>> wrote:
>>>
 Hi Steven,
 Yes, GC is a big overhead, it may cause your CPU utilization to
 reach 100%, and every process stopped working. We ran into this a while 
 too.

 How much memory did you assign to TaskManager? How much the your
 CPU utilization when your taskmanager is considered 'killed'?

 Bowen



 On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu 
 wrote:

> Till,
>
> Once our job was restarted for some reason (e.g. taskmangaer container
> got killed), it can stuck in continuous restart loop for hours. Right now,
> I suspect it is caused by GC pause during restart, our job has very high
> memory allocation in steady state. High GC pause then caused akka timeout,
> which then caused jobmanager to think taksmanager containers are
> unhealthy/dead and kill them. And the cycle repeats...
>
> But I hasn't been able to prove or disprove it yet. When I was asking
> the question, I was still sifting through metrics and error logs.
>
> Thanks,
> Steven
>
>
> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <
> till.rohrm...@gmail.com> wrote:
>
>> Hi Steven,
>>
>> quick correction for Flink 1.2. Indeed the MetricFetcher does not
>> pick up the right timeout value from the configuration. Instead it uses a
>> hardcoded 10s timeout. This has only been changed recently and is already
>> committed in the master. So with the next release 1.4 it will properly 
>> pick
>> up the right timeout settings.
>>
>> Just out of curiosity, what's the instability issue you're observing?
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu 
>> wrote:
>>
>>> Till/Chesnay, thanks for the answers. Look like this is a
>>> result/symptom of underline stability issue that I am trying to track 
>>> down.
>>>
>>> It is Flink 1.2.
>>>
>>> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <
>>> ches...@apache.org> wrote:
>>>
 The MetricFetcher always use the default akka timeout value.


 On 18.08.2017 09:07, Till Rohrmann wrote:

 Hi Steven,

 I thought that the MetricFetcher picks up the right timeout from
 the configuration. Which version of Flink are you using?

 The timeout is not a critical problem for the job health.

 Cheers,
 Till

 On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu 
 wrote:

>
> We have set akka.ask.timeout to 60 s in yaml file. I also
> confirmed the setting in Flink UI. But I saw akka timeout of 10 s for
> metric query service. two questions
> 1) why doesn't metric query use the 60 s value configured in yaml
> file? does it always use default 10 s value?
> 2) could this cause heartbeat failure between task manager and job
> manager? or is this jut non-critical failure that won't affect job 
> health?
>
> Thanks,
> Steven
>
> 2017-08-17 23:34:33,421 WARN 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask
> timed out on [Actor[akka.tcp://flink@1.2.3.4
> :39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]]

Can i use Avro serializer as default instead of kryo serializer in Flink for scala API ?

2017-09-23 Thread shashank agarwal
Hello Team,

As our schema evolves due to business logics. We want to use expendable
schema like Avro as default serializer and deserializer for flink program
and states.

My doubt is, We are using Scala API in our flink program, But Avro default
supports Java POJO. So how we can use this in our scala APi should we have
to use serializer like Avro4s ? Or we can use default Avro in our Scala
flink app than what will be the steps ?

Please guide.

-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things




‌