Re: Setting number of TaskManagers

2016-08-25 Thread Robert Metzger
Hi Craig,

For the YARN session, you have to pass the the number of taskManagers using
the -n argument.
if you need to use a n environment variable, you can create a custom script
calling the yarn-session.sh script and passing the value of the env
variable to the script.

Regards,
Robert




On Wed, Aug 24, 2016 at 9:11 PM, Foster, Craig  wrote:

> Oh, sorry, I didn't specify I was using YARN and don't necessarily want to
> specify with the command line option.
>
>
>
> *From: *Greg Hogan 
> *Reply-To: *"user@flink.apache.org" 
> *Date: *Wednesday, August 24, 2016 at 12:07 PM
> *To: *"user@flink.apache.org" 
> *Subject: *Re: Setting number of TaskManagers
>
>
>
> The number of TaskManagers will be equal to the number of entries in the
> conf/slaves file.
>
>
>
> On Wed, Aug 24, 2016 at 3:04 PM, Foster, Craig 
> wrote:
>
> Is there a way to set the number of TaskManagers using a configuration
> file or environment variable? I'm looking at the docs for it and it says
> you can set slots but not the number of TMs.
>
>
>
> Thanks,
>
> Craig
>
>
>


Re: Flink long-running YARN configuration

2016-08-25 Thread Stephan Ewen
Hi Craig!

For YARN sessions, Flink will
  - (a) register the app master hostname/port/etc at Yarn, so you can get
them from example from the yarn UI and tools
  - (b) it will create a .yarn-properties file that contain the
hostname/ports info. Future calls to the command line pick up the info from
there.

/cc Robert

Greetings,
Stephan


On Thu, Aug 25, 2016 at 5:02 PM, Foster, Craig  wrote:

> I'm trying to understand Flink YARN configuration. The flink-conf.yaml
> file is supposedly the way to configure Flink, except when you launch Flink
> using YARN since that's determined for the AM. The following is
> contradictory or not completely clear:
>
>
>
> "The system will use the configuration in conf/flink-config.yaml. Please
> follow our configuration guide
> 
>  if you want to change something.
>
> Flink on YARN will overwrite the following configuration parameters
> jobmanager.rpc.address (because the JobManager is always allocated at
> different machines), taskmanager.tmp.dirs (we are using the tmp
> directories given by YARN) and parallelism.default if the number of slots
> has been specified."
>
>
>
> OK, so it will use conf/flink-config.yaml, except for
> jobmanager.rpc.address/port which will be decided by YARN and not
> necessarily reported to the user since those are dynamically allocated by
> YARN. That's fine with me, but if I want to make a "long-running" Flink
> cluster available for more than one user, where do I check in Flink for the
> Application Master hostname--or do I just have to scrape output of logs
> (which would definitely be undesirable)? First, I thought this would be
> written by Flink to conf/flink-config.yaml. It is not. Then I thought it
> must surely be written to the HDFS configuration directory (under something
> like hdfs://$USER/.flink/) for that application but that is merely copied
> from the original conf/flink-config.yaml and doesn't have an accurate
> configuration for the specified application. So is there an accurate config
> somewhere in HDFS or on the ResourceManager--i.e. where could I
> programmatically find that (outside of manipulating YARN app names or
> scraping)?
>
>
>
> Thanks,
>
> Craig
>
>
>
>
>
>
>


Re: Setting up zeppelin with flink

2016-08-25 Thread Trevor Grant
I'm glad you were able to work it out!

Your setup is somewhat unique, and as Zeppelin is the result of multiple
drive-by commits, interesting and unexpected things happen in the tail
cases.

Could you please report your problem and solution on the Zeppelin user
list?  What you've discovered may in fact be a bug or a regression caused
by some of the recent Spark 2.0/scala 2.11 mess (I see you installed
Zeppelin 0.6.1).  Suffice to say, I don't think this is a Flink issue.


Finally, out of curiosity- what jars did you copy to the interpreter/flink
directory to get this to work?  I'd like to check the Zeppelin/flink/pom.xml

Happy to be a sounding board if nothing else ;)

tg


Trevor Grant
Data Scientist
https://github.com/rawkintrevo
http://stackexchange.com/users/3002022/rawkintrevo
http://trevorgrant.org

*"Fortunate is he, who is able to know the causes of things."  -Virgil*


On Thu, Aug 25, 2016 at 8:57 AM, Frank Dekervel  wrote:

> Hello,
>
> Sorry for the spam, but i got it working after copying all scala libraries
> from another interpreter to the interpreter/flink directory. so i think the
> error is the scala libraries are missing from the binary release in the
> zeppelin/interpreters/flink/ directory. For now i'm adding the copy
> commands to the dockerfile, but I'm sure this is not the proper way to fix
> it, but i don't know maven enough to understand why the scala libs are
> missing for the flink interpreter but not for the ignite interpreter.
>
> I'm also unable to figure out why a local interpreter worked fine given
> the missing libraries ...
>
> greetings,
> Frank
>
>
> On Thu, Aug 25, 2016 at 3:08 PM, Frank Dekervel  wrote:
>
>> Hello,
>>
>> For reference, below is the dockerfile i used to build the zeppelin image
>> (basically just openjdk 8 with the latest binary release of zeppelin)
>> the "docker-entrypoint.sh" script is just starting zeppelin.sh (oneliner)
>>
>> FROM openjdk:alpine
>>
>> RUN apk add --no-cache bash snappy
>>
>> ARG ZEPPELIN_VERSION=0.6.1
>>
>> ARG INSTALL_PATH=/opt
>> ENV APP_HOME $INSTALL_PATH/zeppelin
>> ENV PATH $PATH:$APP_HOME/bin
>>
>> RUN set -x && \
>>   mkdir -p $INSTALL_PATH && \
>>   apk --update add --virtual build-dependencies curl && \
>>   curl -s $(curl -s https://www.apache.org/dyn/clo
>> ser.cgi\?preferred\=true)/zeppelin/zeppelin-0.6.1/zeppelin-$
>> ZEPPELIN_VERSION-bin-all.tgz | \
>>   tar xvz -C $INSTALL_PATH && \
>>   ln -s $INSTALL_PATH/zeppelin-$ZEPPELIN_VERSION-bin-all $APP_HOME && \
>>   addgroup -S zeppelin && adduser -D -S -H -G zeppelin -h $APP_HOME
>> zeppelin && \
>>   chown -R zeppelin:zeppelin $INSTALL_PATH && \
>>   chown -h zeppelin:zeppelin $APP_HOME && \
>>   apk del build-dependencies && \
>>   rm -rf /var/cache/apk/*
>>
>> # Configure container
>> USER zeppelin
>> ADD docker-entrypoint.sh $APP_HOME/bin/
>> ENTRYPOINT ["docker-entrypoint.sh"]
>> CMD ["sh", "-c"]
>>
>> greetings,
>> Frank
>>
>>
>> On Thu, Aug 25, 2016 at 2:58 PM, Frank Dekervel  wrote:
>>
>>> Hello Trevor,
>>>
>>> Thanks for your suggestion. The log does not explain a lot: on the flink
>>> side i don't see anything at all, on the zeppelin side i see this:
>>> Your suggestion sounds plausible, as i always start zeppelin, and then
>>> change the configuration from local to remote.. however, port 6123 locally
>>> doesn't seem to be open
>>>
>>> ==> zeppelin--94490c51d71e.log <==
>>>  INFO [2016-08-25 12:53:24,168] ({qtp846063400-48}
>>> InterpreterFactory.java[createInterpretersForNote]:576) - Create
>>> interpreter instance flink for note 2BW8NMCKW
>>>  INFO [2016-08-25 12:53:24,168] ({qtp846063400-48}
>>> InterpreterFactory.java[createInterpretersForNote]:606) - Interpreter
>>> org.apache.zeppelin.flink.FlinkInterpreter 795344042 created
>>>  INFO [2016-08-25 12:53:24,169] ({pool-1-thread-3}
>>> SchedulerFactory.java[jobStarted]:131) - Job
>>> paragraph_1471964818018_1833520437 started by scheduler
>>> org.apache.zeppelin.interpreter.remote.RemoteInterpretershar
>>> ed_session513606587
>>>  INFO [2016-08-25 12:53:24,170] ({pool-1-thread-3}
>>> Paragraph.java[jobRun]:252) - run paragraph 20160823-150658_99117457 using
>>> null org.apache.zeppelin.interpreter.LazyOpenInterpreter@2f67fcaa
>>>  INFO [2016-08-25 12:53:24,170] ({pool-1-thread-3}
>>> RemoteInterpreterProcess.java[reference]:148) - Run interpreter process
>>> [/opt/zeppelin/bin/interpreter.sh, -d, /opt/zeppelin/interpreter/flink,
>>> -p, 45769, -l, /opt/zeppelin/local-repo/2BVEQGGEN]
>>>  INFO [2016-08-25 12:53:24,672] ({pool-1-thread-3}
>>> RemoteInterpreter.java[init]:170) - Create remote interpreter
>>> org.apache.zeppelin.flink.FlinkInterpreter
>>>
>>> after doing %flink, i see this in ps auxw:
>>>
>>>  /usr/lib/jvm/java-1.8-openjdk/bin/java -Dfile.encoding=UTF-8
>>> -Dlog4j.configuration=file:///opt/zeppelin/conf/log4j.properties
>>> -Dzeppelin.log.file=/opt/zeppelin/logs/zeppelin-interpreter-flink--94490c51d71e.log
>>> -Xms1024m -Xmx1024m 

Re: Running multiple jobs on yarn (without yarn-session)

2016-08-25 Thread Maximilian Michels
Thanks Niels, actually I also created one :) We will fix this on the
master and for the 1.1.2 release.

On Thu, Aug 25, 2016 at 5:14 PM, Niels Basjes  wrote:
> I have this with a pretty recent version of the source version (not a
> release).
>
> Would be great if you see a way to fix this.
> I consider it fine if this requires an extra call to the system indicating
> that this is a 'mulitple job' situation.
>
> I created https://issues.apache.org/jira/browse/FLINK-4495 for you
>
> Niels Basjes
>
> On Thu, Aug 25, 2016 at 3:34 PM, Maximilian Michels  wrote:
>>
>> Hi Niels,
>>
>> This is with 1.1.1? We could fix this in the upcoming 1.1.2 release by
>> only using automatic shut down for detached jobs. In all other cases
>> we should be able to shutdown from the client side after running all
>> jobs. The only downside I see is that Flink clusters may actually
>> never be shutdown if the CLI somehow crashes or gets a network
>> partition.
>>
>> Best,
>> Max
>>
>> On Thu, Aug 25, 2016 at 12:04 PM, Niels Basjes  wrote:
>> > Hi,
>> >
>> > I created a small application that needs to run multiple (batch) jobs on
>> > Yarn and then terminate.
>> > In this case I'm exporting data from a list of HBase tables
>> >
>> > I essentially do right now the following:
>> >
>> > flink run -m yarn-cluster -yn 10  bla.jar ...
>> >
>> > And in my main I do
>> >
>> > foreach thing I need to do {
>> >ExecutionEnvironment env =
>> > ExecutionEnvironment.getExecutionEnvironment();
>> >env. ... define the batch job.
>> >env.execute
>> > }
>> >
>> > In the second job I submit I get an exception:
>> > java.lang.RuntimeException: Unable to tell application master to stop
>> > once
>> > the specified job has been finised
>> > at
>> >
>> > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184)
>> > at
>> >
>> > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202)
>> > at
>> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>> > at
>> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
>> > at
>> >
>> > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
>> > at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220)
>> > at com.bol.tools.hbase.export.Main.run(Main.java:81)
>> > at com.bol.tools.hbase.export.Main.main(Main.java:42)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at
>> >
>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> > at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:497)
>> > at
>> >
>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>> > at
>> >
>> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> > at
>> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>> > at
>> > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775)
>> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
>> > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995)
>> > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992)
>> > at
>> >
>> > org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
>> > at java.security.AccessController.doPrivileged(Native Method)
>> > at javax.security.auth.Subject.doAs(Subject.java:422)
>> > at
>> >
>> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>> > at
>> >
>> > org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
>> > at
>> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992)
>> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046)
>> > Caused by: java.util.concurrent.TimeoutException: Futures timed out
>> > after
>> > [1 milliseconds]
>> > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> > at
>> > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>> > at
>> >
>> > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>> > at scala.concurrent.Await$.result(package.scala:107)
>> > at scala.concurrent.Await.result(package.scala)
>> > at
>> >
>> > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182)
>> > ... 25 more
>> >
>> >
>> > How do I (without using yarn-session) tell the YarnClusterClient to
>> > simply
>> > 'keep running because there will be more jobs'?
>> >
>> > If I run this same code in a yarn-session it works but then I have the
>> > troubles of starting a (detached yarn-session) AND to terminate that
>> > thing
>> > again after my jobs have run.
>> >
>> 

Re: Elasticsearch connector and number of shards

2016-08-25 Thread Flavio Pompermaier
I've just added a JIRA improvement ticket for this (
https://issues.apache.org/jira/browse/FLINK-4491).

Best,
Flavio

On Wed, Jul 20, 2016 at 4:21 PM, Maximilian Michels  wrote:

> The connector doesn't cover this use case. Through the API you need to
> use the IndicesAdminClient:
> https://www.elastic.co/guide/en/elasticsearch/client/java-
> api/current/java-admin-indices.html
>
> Otherwise Elasticsearch creates an index with shards automatically. We
> could add support for configuring shards in the future.
>
> On Mon, Jul 18, 2016 at 11:42 AM, Flavio Pompermaier
>  wrote:
> > Indeed, we've tried with the parameter index.number_of_shards but it
> didn't
> > work so I fear that this parameter is not handled by the current
> > implementation..am I wrong?
> >
> > On Mon, Jul 18, 2016 at 11:37 AM, Ufuk Celebi  wrote:
> >>
> >> I've never used the Elasticsearch sink, but the docs say:
> >>
> >> "Note how a Map of Strings is used to configure the Sink. The
> >> configuration keys are documented in the Elasticsearch
> >> documentationhere. Especially important is the cluster.name parameter
> >> that must correspond to the name of your cluster."
> >>
> >> The config keys for index creation are found here:
> >>
> >> https://www.elastic.co/guide/en/elasticsearch/reference/
> current/indices-create-index.html
> >>
> >> Does this help? If not, maybe @Aljoscha can chime in here.
> >>
> >> – Ufuk
> >>
> >>
> >> On Mon, Jul 18, 2016 at 11:23 AM, Flavio Pompermaier
> >>  wrote:
> >> > Hi to all,
> >> >
> >> > we tried to use the streaming ES connector of Flink 1.1-SNAPSHOT and
> we
> >> > wanted to set the number of shards when creating a new index. Is that
> >> > possible at the moment?
> >> >
> >> > Best,
> >> > Flavio
> >
> >
>


Re: Dealing with Multiple sinks in Flink

2016-08-25 Thread vinay patil
Hi Max,

Here is the code for Timestamp assigner and watermark generation.
PFA

Regards,
Vinay Patil

On Thu, Aug 25, 2016 at 7:39 AM, Maximilian Michels [via Apache Flink User
Mailing List archive.]  wrote:

> I'm assuming there is something wrong with your Watermark/Timestamp
> assigner. Could you share some of the code?
>
> On Wed, Aug 24, 2016 at 9:54 PM, vinay patil <[hidden email]
> > wrote:
>
> > Hi,
> >
> > Just an update, the window is not getting triggered when I change the
> > parallelism to more than 1.
> >
> > Can you please explain why this is happening ?
> >
> > Regards,
> > Vinay Patil
> >
> > On Wed, Aug 24, 2016 at 9:55 AM, vinay patil [via Apache Flink User
> Mailing
> > List archive.] <[hidden email]> wrote:
> >>
> >> Hi Max,
> >>
> >> I tried writing to local file as well, its giving me the same issue, I
> >> have attached the logs and dummy pipeline code.logs.txtdummy_pipeline.txt
>
> >>
> >> 
> >> If you reply to this email, your message will be added to the
> discussion
> >> below:
> >>
> >> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Dealing-with-Multiple-sinks-in-Flink-tp8643p8664.html
> >> To start a new topic under Apache Flink User Mailing List archive.,
> email
> >> [hidden email]
> >> To unsubscribe from Apache Flink User Mailing List archive., click
> here.
> >> NAML
> >
> >
> >
> > 
> > View this message in context: Re: Dealing with Multiple sinks in Flink
> >
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive
> > at Nabble.com.
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Dealing-with-Multiple-sinks-in-Flink-tp8643p8685.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>


MyTimestampExtractor.java (4K) 





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dealing-with-Multiple-sinks-in-Flink-tp8643p8692.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Setting up zeppelin with flink

2016-08-25 Thread Frank Dekervel
Hello,

Sorry for the spam, but i got it working after copying all scala libraries
from another interpreter to the interpreter/flink directory. so i think the
error is the scala libraries are missing from the binary release in the
zeppelin/interpreters/flink/ directory. For now i'm adding the copy
commands to the dockerfile, but I'm sure this is not the proper way to fix
it, but i don't know maven enough to understand why the scala libs are
missing for the flink interpreter but not for the ignite interpreter.

I'm also unable to figure out why a local interpreter worked fine given the
missing libraries ...

greetings,
Frank


On Thu, Aug 25, 2016 at 3:08 PM, Frank Dekervel  wrote:

> Hello,
>
> For reference, below is the dockerfile i used to build the zeppelin image
> (basically just openjdk 8 with the latest binary release of zeppelin)
> the "docker-entrypoint.sh" script is just starting zeppelin.sh (oneliner)
>
> FROM openjdk:alpine
>
> RUN apk add --no-cache bash snappy
>
> ARG ZEPPELIN_VERSION=0.6.1
>
> ARG INSTALL_PATH=/opt
> ENV APP_HOME $INSTALL_PATH/zeppelin
> ENV PATH $PATH:$APP_HOME/bin
>
> RUN set -x && \
>   mkdir -p $INSTALL_PATH && \
>   apk --update add --virtual build-dependencies curl && \
>   curl -s $(curl -s https://www.apache.org/dyn/
> closer.cgi\?preferred\=true)/zeppelin/zeppelin-0.6.1/
> zeppelin-$ZEPPELIN_VERSION-bin-all.tgz | \
>   tar xvz -C $INSTALL_PATH && \
>   ln -s $INSTALL_PATH/zeppelin-$ZEPPELIN_VERSION-bin-all $APP_HOME && \
>   addgroup -S zeppelin && adduser -D -S -H -G zeppelin -h $APP_HOME
> zeppelin && \
>   chown -R zeppelin:zeppelin $INSTALL_PATH && \
>   chown -h zeppelin:zeppelin $APP_HOME && \
>   apk del build-dependencies && \
>   rm -rf /var/cache/apk/*
>
> # Configure container
> USER zeppelin
> ADD docker-entrypoint.sh $APP_HOME/bin/
> ENTRYPOINT ["docker-entrypoint.sh"]
> CMD ["sh", "-c"]
>
> greetings,
> Frank
>
>
> On Thu, Aug 25, 2016 at 2:58 PM, Frank Dekervel  wrote:
>
>> Hello Trevor,
>>
>> Thanks for your suggestion. The log does not explain a lot: on the flink
>> side i don't see anything at all, on the zeppelin side i see this:
>> Your suggestion sounds plausible, as i always start zeppelin, and then
>> change the configuration from local to remote.. however, port 6123 locally
>> doesn't seem to be open
>>
>> ==> zeppelin--94490c51d71e.log <==
>>  INFO [2016-08-25 12:53:24,168] ({qtp846063400-48}
>> InterpreterFactory.java[createInterpretersForNote]:576) - Create
>> interpreter instance flink for note 2BW8NMCKW
>>  INFO [2016-08-25 12:53:24,168] ({qtp846063400-48}
>> InterpreterFactory.java[createInterpretersForNote]:606) - Interpreter
>> org.apache.zeppelin.flink.FlinkInterpreter 795344042 created
>>  INFO [2016-08-25 12:53:24,169] ({pool-1-thread-3}
>> SchedulerFactory.java[jobStarted]:131) - Job
>> paragraph_1471964818018_1833520437 started by scheduler
>> org.apache.zeppelin.interpreter.remote.RemoteInterpretershar
>> ed_session513606587
>>  INFO [2016-08-25 12:53:24,170] ({pool-1-thread-3}
>> Paragraph.java[jobRun]:252) - run paragraph 20160823-150658_99117457 using
>> null org.apache.zeppelin.interpreter.LazyOpenInterpreter@2f67fcaa
>>  INFO [2016-08-25 12:53:24,170] ({pool-1-thread-3}
>> RemoteInterpreterProcess.java[reference]:148) - Run interpreter process
>> [/opt/zeppelin/bin/interpreter.sh, -d, /opt/zeppelin/interpreter/flink,
>> -p, 45769, -l, /opt/zeppelin/local-repo/2BVEQGGEN]
>>  INFO [2016-08-25 12:53:24,672] ({pool-1-thread-3}
>> RemoteInterpreter.java[init]:170) - Create remote interpreter
>> org.apache.zeppelin.flink.FlinkInterpreter
>>
>> after doing %flink, i see this in ps auxw:
>>
>>  /usr/lib/jvm/java-1.8-openjdk/bin/java -Dfile.encoding=UTF-8
>> -Dlog4j.configuration=file:///opt/zeppelin/conf/log4j.properties
>> -Dzeppelin.log.file=/opt/zeppelin/logs/zeppelin-interpreter-flink--94490c51d71e.log
>> -Xms1024m -Xmx1024m -XX:MaxPermSize=512m -cp ::/opt/zeppelin/interpreter/fl
>> ink/*::/opt/zeppelin/lib/zeppelin-interpreter-0.6.1.jar
>> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer 45769
>>
>> /usr/lib/jvm/java-1.8-openjdk/bin/java -Dfile.encoding=UTF-8 -Xms1024m
>> -Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=file:///
>> opt/zeppelin/conf/log4j.properties -Dzeppelin.log.file=/opt/zeppe
>> lin/logs/zeppelin--94490c51d71e.log -cp 
>> ::/opt/zeppelin/lib/*:/opt/zeppelin/*::/opt/zeppelin/conf
>> org.apache.zeppelin.server.ZeppelinServer
>>
>> the stdout of zeppelin flink process after doing a stackdump looks like
>> this (note the exception, i didn't notice it before)
>>
>> zeppelin_1 | Exception in thread "pool-1-thread-2"
>> java.lang.NoClassDefFoundError: scala/collection/Seq
>> zeppelin_1 |at java.lang.Class.forName0(Native Method)
>> zeppelin_1 |at java.lang.Class.forName(Class.java:264)
>> zeppelin_1 |at org.apache.zeppelin.interprete
>> r.remote.RemoteInterpreterServer.createInterpreter(RemoteInt
>> 

Re: Running multiple jobs on yarn (without yarn-session)

2016-08-25 Thread Maximilian Michels
Hi Niels,

This is with 1.1.1? We could fix this in the upcoming 1.1.2 release by
only using automatic shut down for detached jobs. In all other cases
we should be able to shutdown from the client side after running all
jobs. The only downside I see is that Flink clusters may actually
never be shutdown if the CLI somehow crashes or gets a network
partition.

Best,
Max

On Thu, Aug 25, 2016 at 12:04 PM, Niels Basjes  wrote:
> Hi,
>
> I created a small application that needs to run multiple (batch) jobs on
> Yarn and then terminate.
> In this case I'm exporting data from a list of HBase tables
>
> I essentially do right now the following:
>
> flink run -m yarn-cluster -yn 10  bla.jar ...
>
> And in my main I do
>
> foreach thing I need to do {
>ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>env. ... define the batch job.
>env.execute
> }
>
> In the second job I submit I get an exception:
> java.lang.RuntimeException: Unable to tell application master to stop once
> the specified job has been finised
> at
> org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184)
> at
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
> at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220)
> at com.bol.tools.hbase.export.Main.run(Main.java:81)
> at com.bol.tools.hbase.export.Main.main(Main.java:42)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
> at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995)
> at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992)
> at
> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
> at
> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [1 milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at scala.concurrent.Await.result(package.scala)
> at
> org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182)
> ... 25 more
>
>
> How do I (without using yarn-session) tell the YarnClusterClient to simply
> 'keep running because there will be more jobs'?
>
> If I run this same code in a yarn-session it works but then I have the
> troubles of starting a (detached yarn-session) AND to terminate that thing
> again after my jobs have run.
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes


Re: Setting up zeppelin with flink

2016-08-25 Thread Frank Dekervel
Hello,

For reference, below is the dockerfile i used to build the zeppelin image
(basically just openjdk 8 with the latest binary release of zeppelin)
the "docker-entrypoint.sh" script is just starting zeppelin.sh (oneliner)

FROM openjdk:alpine

RUN apk add --no-cache bash snappy

ARG ZEPPELIN_VERSION=0.6.1

ARG INSTALL_PATH=/opt
ENV APP_HOME $INSTALL_PATH/zeppelin
ENV PATH $PATH:$APP_HOME/bin

RUN set -x && \
  mkdir -p $INSTALL_PATH && \
  apk --update add --virtual build-dependencies curl && \
  curl -s $(curl -s
https://www.apache.org/dyn/closer.cgi\?preferred\=true)/zeppelin/zeppelin-0.6.1/zeppelin-$ZEPPELIN_VERSION-bin-all.tgz
| \
  tar xvz -C $INSTALL_PATH && \
  ln -s $INSTALL_PATH/zeppelin-$ZEPPELIN_VERSION-bin-all $APP_HOME && \
  addgroup -S zeppelin && adduser -D -S -H -G zeppelin -h $APP_HOME
zeppelin && \
  chown -R zeppelin:zeppelin $INSTALL_PATH && \
  chown -h zeppelin:zeppelin $APP_HOME && \
  apk del build-dependencies && \
  rm -rf /var/cache/apk/*

# Configure container
USER zeppelin
ADD docker-entrypoint.sh $APP_HOME/bin/
ENTRYPOINT ["docker-entrypoint.sh"]
CMD ["sh", "-c"]

greetings,
Frank


On Thu, Aug 25, 2016 at 2:58 PM, Frank Dekervel  wrote:

> Hello Trevor,
>
> Thanks for your suggestion. The log does not explain a lot: on the flink
> side i don't see anything at all, on the zeppelin side i see this:
> Your suggestion sounds plausible, as i always start zeppelin, and then
> change the configuration from local to remote.. however, port 6123 locally
> doesn't seem to be open
>
> ==> zeppelin--94490c51d71e.log <==
>  INFO [2016-08-25 12:53:24,168] ({qtp846063400-48} InterpreterFactory.java[
> createInterpretersForNote]:576) - Create interpreter instance flink for
> note 2BW8NMCKW
>  INFO [2016-08-25 12:53:24,168] ({qtp846063400-48} InterpreterFactory.java[
> createInterpretersForNote]:606) - Interpreter 
> org.apache.zeppelin.flink.FlinkInterpreter
> 795344042 created
>  INFO [2016-08-25 12:53:24,169] ({pool-1-thread-3} 
> SchedulerFactory.java[jobStarted]:131)
> - Job paragraph_1471964818018_1833520437 started by scheduler
> org.apache.zeppelin.interpreter.remote.RemoteInterpretershared_
> session513606587
>  INFO [2016-08-25 12:53:24,170] ({pool-1-thread-3}
> Paragraph.java[jobRun]:252) - run paragraph 20160823-150658_99117457 using
> null org.apache.zeppelin.interpreter.LazyOpenInterpreter@2f67fcaa
>  INFO [2016-08-25 12:53:24,170] ({pool-1-thread-3}
> RemoteInterpreterProcess.java[reference]:148) - Run interpreter process
> [/opt/zeppelin/bin/interpreter.sh, -d, /opt/zeppelin/interpreter/flink,
> -p, 45769, -l, /opt/zeppelin/local-repo/2BVEQGGEN]
>  INFO [2016-08-25 12:53:24,672] ({pool-1-thread-3}
> RemoteInterpreter.java[init]:170) - Create remote interpreter
> org.apache.zeppelin.flink.FlinkInterpreter
>
> after doing %flink, i see this in ps auxw:
>
>  /usr/lib/jvm/java-1.8-openjdk/bin/java -Dfile.encoding=UTF-8
> -Dlog4j.configuration=file:///opt/zeppelin/conf/log4j.properties
> -Dzeppelin.log.file=/opt/zeppelin/logs/zeppelin-interpreter-flink--94490c51d71e.log
> -Xms1024m -Xmx1024m -XX:MaxPermSize=512m -cp ::/opt/zeppelin/interpreter/
> flink/*::/opt/zeppelin/lib/zeppelin-interpreter-0.6.1.jar
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer 45769
>
> /usr/lib/jvm/java-1.8-openjdk/bin/java -Dfile.encoding=UTF-8 -Xms1024m
> -Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=file:///
> opt/zeppelin/conf/log4j.properties -Dzeppelin.log.file=/opt/
> zeppelin/logs/zeppelin--94490c51d71e.log -cp 
> ::/opt/zeppelin/lib/*:/opt/zeppelin/*::/opt/zeppelin/conf
> org.apache.zeppelin.server.ZeppelinServer
>
> the stdout of zeppelin flink process after doing a stackdump looks like
> this (note the exception, i didn't notice it before)
>
> zeppelin_1 | Exception in thread "pool-1-thread-2" 
> java.lang.NoClassDefFoundError:
> scala/collection/Seq
> zeppelin_1 |at java.lang.Class.forName0(Native Method)
> zeppelin_1 |at java.lang.Class.forName(Class.java:264)
> zeppelin_1 |at org.apache.zeppelin.interpreter.remote.
> RemoteInterpreterServer.createInterpreter(RemoteInterpreterServer.java:
> 148)
> zeppelin_1 |at org.apache.zeppelin.interpreter.thrift.
> RemoteInterpreterService$Processor$createInterpreter.getResult(
> RemoteInterpreterService.java:1409)
> zeppelin_1 |at org.apache.zeppelin.interpreter.thrift.
> RemoteInterpreterService$Processor$createInterpreter.getResult(
> RemoteInterpreterService.java:1394)
> zeppelin_1 |at org.apache.thrift.ProcessFunction.process(
> ProcessFunction.java:39)
> zeppelin_1 |at org.apache.thrift.TBaseProcessor.process(
> TBaseProcessor.java:39)
> zeppelin_1 |at org.apache.thrift.server.TThreadPoolServer$
> WorkerProcess.run(TThreadPoolServer.java:285)
> zeppelin_1 |at java.util.concurrent.
> ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> zeppelin_1 |at java.util.concurrent.
> 

Re: Setting up zeppelin with flink

2016-08-25 Thread Frank Dekervel
Hello Trevor,

Thanks for your suggestion. The log does not explain a lot: on the flink
side i don't see anything at all, on the zeppelin side i see this:
Your suggestion sounds plausible, as i always start zeppelin, and then
change the configuration from local to remote.. however, port 6123 locally
doesn't seem to be open

==> zeppelin--94490c51d71e.log <==
 INFO [2016-08-25 12:53:24,168] ({qtp846063400-48}
InterpreterFactory.java[createInterpretersForNote]:576) - Create
interpreter instance flink for note 2BW8NMCKW
 INFO [2016-08-25 12:53:24,168] ({qtp846063400-48}
InterpreterFactory.java[createInterpretersForNote]:606) - Interpreter
org.apache.zeppelin.flink.FlinkInterpreter 795344042 created
 INFO [2016-08-25 12:53:24,169] ({pool-1-thread-3}
SchedulerFactory.java[jobStarted]:131) - Job
paragraph_1471964818018_1833520437 started by scheduler
org.apache.zeppelin.interpreter.remote.RemoteInterpretershared_session513606587
 INFO [2016-08-25 12:53:24,170] ({pool-1-thread-3}
Paragraph.java[jobRun]:252) - run paragraph 20160823-150658_99117457 using
null org.apache.zeppelin.interpreter.LazyOpenInterpreter@2f67fcaa
 INFO [2016-08-25 12:53:24,170] ({pool-1-thread-3}
RemoteInterpreterProcess.java[reference]:148) - Run interpreter process
[/opt/zeppelin/bin/interpreter.sh, -d, /opt/zeppelin/interpreter/flink, -p,
45769, -l, /opt/zeppelin/local-repo/2BVEQGGEN]
 INFO [2016-08-25 12:53:24,672] ({pool-1-thread-3}
RemoteInterpreter.java[init]:170) - Create remote interpreter
org.apache.zeppelin.flink.FlinkInterpreter

after doing %flink, i see this in ps auxw:

 /usr/lib/jvm/java-1.8-openjdk/bin/java -Dfile.encoding=UTF-8
-Dlog4j.configuration=file:///opt/zeppelin/conf/log4j.properties
-Dzeppelin.log.file=/opt/zeppelin/logs/zeppelin-interpreter-flink--94490c51d71e.log
-Xms1024m -Xmx1024m -XX:MaxPermSize=512m -cp
::/opt/zeppelin/interpreter/flink/*::/opt/zeppelin/lib/zeppelin-interpreter-0.6.1.jar
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer 45769

/usr/lib/jvm/java-1.8-openjdk/bin/java -Dfile.encoding=UTF-8 -Xms1024m
-Xmx1024m -XX:MaxPermSize=512m
-Dlog4j.configuration=file:///opt/zeppelin/conf/log4j.properties
-Dzeppelin.log.file=/opt/zeppelin/logs/zeppelin--94490c51d71e.log -cp
::/opt/zeppelin/lib/*:/opt/zeppelin/*::/opt/zeppelin/conf
org.apache.zeppelin.server.ZeppelinServer

the stdout of zeppelin flink process after doing a stackdump looks like
this (note the exception, i didn't notice it before)

zeppelin_1 | Exception in thread "pool-1-thread-2"
java.lang.NoClassDefFoundError: scala/collection/Seq
zeppelin_1 |at java.lang.Class.forName0(Native Method)
zeppelin_1 |at java.lang.Class.forName(Class.java:264)
zeppelin_1 |at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.createInterpreter(RemoteInterpreterServer.java:148)
zeppelin_1 |at
org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Processor$createInterpreter.getResult(RemoteInterpreterService.java:1409)
zeppelin_1 |at
org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Processor$createInterpreter.getResult(RemoteInterpreterService.java:1394)
zeppelin_1 |at
org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
zeppelin_1 |at
org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
zeppelin_1 |at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285)
zeppelin_1 |at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
zeppelin_1 |at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
zeppelin_1 |at java.lang.Thread.run(Thread.java:745)
zeppelin_1 | Caused by: java.lang.ClassNotFoundException:
scala.collection.Seq
zeppelin_1 |at
java.net.URLClassLoader.findClass(URLClassLoader.java:381)
zeppelin_1 |at
java.lang.ClassLoader.loadClass(ClassLoader.java:424)
zeppelin_1 |at
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
zeppelin_1 |at
java.lang.ClassLoader.loadClass(ClassLoader.java:357)
zeppelin_1 |... 11 more
zeppelin_1 | 2016-08-25 12:56:03
zeppelin_1 | Full thread dump OpenJDK 64-Bit Server VM (25.92-b14 mixed
mode):
zeppelin_1 |
zeppelin_1 | "pool-1-thread-5" #15 prio=5 os_prio=0
tid=0x5567976e8000 nid=0x108 waiting on condition [0x7fa83ca8d000]
zeppelin_1 |java.lang.Thread.State: WAITING (parking)
zeppelin_1 |at sun.misc.Unsafe.park(Native Method)
zeppelin_1 |- parking to wait for  <0xebc3dae0> (a
java.util.concurrent.SynchronousQueue$TransferStack)
zeppelin_1 |at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
zeppelin_1 |at
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:458)
zeppelin_1 |at

Re: Delaying starting the jobmanager in yarn?

2016-08-25 Thread Maximilian Michels
Hi Niels,

If you're using 1.1.1, then you can instantiate the
YarnClusterDescriptor and supply it with the Flink jar and
configuration and subsequently call `deploy()` on it to receive a
ClusterClient for Yarn which you can submit programs using the
`run(PackagedProgram program, String args)` method. You can also
cancel jobs or shutdown the cluster from the ClusterClient.

Cheers,
Max

On Thu, Aug 25, 2016 at 10:24 AM, Niels Basjes  wrote:
> Hi,
>
> We have a situation where we need to start a flink batch job on a yarn
> cluster the moment an event arrives over a queue.
> These events occur at a very low rate (like once or twice a week).
>
> The idea we have is to run an application that listens to the queue and
> executes the batch when it receives a message.
>
> We found that if we start this using 'flink run -m yarn-cluster ..." the
> moment we run this the jobmanager in yarn is started and the resources for
> these batches is claimed immediately.
>
> What is the recommended way to only claim these resources when we actually
> have a job to run?
> Can we 'manually' start and stop the jobmanager in yarn in some way from our
> java code?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes


Re: "Failed to retrieve JobManager address" in Flink 1.1.1 with JM HA

2016-08-25 Thread Hironori Ogibayashi
Max,

Thank you for the fix!

Regards,
Hironori

2016-08-24 18:37 GMT+09:00 Maximilian Michels :
> Hi Hironori,
>
> That's what I thought. So it won't be an issue for most users who do
> not comment out the JobManager url from the config. Still, the
> information printed is not correct. The issue has just been fixed.
>
> You will have to wait for the next minor release 1.1.2 or build the
> 'release-1.1' Git branch.
>
> Best,
> Max
>
> On Wed, Aug 24, 2016 at 11:14 AM, Hironori Ogibayashi
>  wrote:
>> Ufuk, Max,
>>
>> Thank you for your answer and opening JIRA.
>> I will wait for the fix.
>>
>> As Max mentioned, I first commented out jobmanager.rpc.address,
>> jobmanager.rpc.port. When I tried setting localhost and 6123
>> respectively, it worked.
>>
>> Regards,
>> Hironori
>>
>> 2016-08-24 0:54 GMT+09:00 Maximilian Michels :
>>> Created an issue and fix should be there soon:
>>> https://issues.apache.org/jira/browse/FLINK-4454
>>>
>>> Thanks,
>>> Max
>>>
>>> On Tue, Aug 23, 2016 at 4:38 PM, Maximilian Michels  wrote:
 Hi!

 Yes, this is a bug. However, there seems to be something wrong with
 the config directory because Flink fails to load the default value
 ("localhost") from the config. If you had a default value for the job
 manager in flink-conf.yaml, it wouldn't fail but only display a wrong
 job manager url. Note that it still connects to the right job manager
 afterwards.

 Sorry for the trouble.

 Thanks,
 Max

 On Tue, Aug 23, 2016 at 11:02 AM, Ufuk Celebi  wrote:
> You are right that this config key is not needed in this case.
>
> The ClusterClient has been refactored between Flink 1.0 and 1.1 and
> the config parsing might be too strict in this case. It expects the
> IPC address to be set, which is not necessary as you say. It should be
> very easy to fix for 1.1.2. Let's confirm that it is actually a bug
> with Max and file an issue afterwards.
>
> @Max: can you confirm whether this is correct?
>
>
> On Tue, Aug 23, 2016 at 7:24 AM, Hironori Ogibayashi
>  wrote:
>> Hello,
>>
>> After I upgraded to 1.1.1, I am getting error when submitting job with
>> "flink run"
>>
>> The command and result is like this. It has been working with Flink 
>> 1.0.3.
>>
>> ---
>>  % FLINK_CONF_DIR=~/opt/flink/conf ~/opt/flink/flink-1.1.1/bin/flink
>> run -c MyJob target/my-flink-job.jar
>>
>> 
>>  The program finished with the following exception:
>>
>> java.lang.RuntimeException: Failed to retrieve JobManager address
>> at 
>> org.apache.flink.client.program.ClusterClient.getJobManagerAddressFromConfig(ClusterClient.java:244)
>> at 
>> org.apache.flink.client.program.StandaloneClusterClient.getClusterIdentifier(StandaloneClusterClient.java:78)
>> at 
>> org.apache.flink.client.CliFrontend.createClient(CliFrontend.java:887)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:237)
>> at 
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>> at 
>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>> Caused by: java.lang.IllegalArgumentException: hostname can't be null
>> at 
>> java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
>> at java.net.InetSocketAddress.(InetSocketAddress.java:216)
>> at 
>> org.apache.flink.client.program.ClusterClient.getJobManagerAddressFromConfig(ClusterClient.java:242)
>> ... 5 more
>> ---
>>
>> I am using JobManager HA and I set "recovery.mode: zookeeper",
>> recovery.zookeeper.quorum, recovery.zookeeper.path.root is my
>> flink-conf.yaml.
>> So, the client should be able to get JobManager address from zookeeper.
>> If I explicitly specify JobManager address with -m option, it works.
>>
>> Am I missing something?
>>
>> Regards,
>> Hironori Ogibayashi


Re: Dealing with Multiple sinks in Flink

2016-08-25 Thread Maximilian Michels
I'm assuming there is something wrong with your Watermark/Timestamp
assigner. Could you share some of the code?

On Wed, Aug 24, 2016 at 9:54 PM, vinay patil  wrote:
> Hi,
>
> Just an update, the window is not getting triggered when I change the
> parallelism to more than 1.
>
> Can you please explain why this is happening ?
>
> Regards,
> Vinay Patil
>
> On Wed, Aug 24, 2016 at 9:55 AM, vinay patil [via Apache Flink User Mailing
> List archive.] <[hidden email]> wrote:
>>
>> Hi Max,
>>
>> I tried writing to local file as well, its giving me the same issue, I
>> have attached the logs and dummy pipeline code.logs.txtdummy_pipeline.txt
>>
>> 
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dealing-with-Multiple-sinks-in-Flink-tp8643p8664.html
>> To start a new topic under Apache Flink User Mailing List archive., email
>> [hidden email]
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>
>
>
> 
> View this message in context: Re: Dealing with Multiple sinks in Flink
>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.


Re: Regarding Global Configuration in Flink

2016-08-25 Thread Maximilian Michels
Hi!

Are you referring to the GlobalConfiguration class? That used to be a
singleton class in Flink version < 1.2.x which would load the
configuration only once per VM, if it found a config file. It allowed
operations that could change that config after it had been loaded. It
has since then been refactored to simply load the local configuration
if requested and not store it or manipulate it in any way.

Regarding your question: No, config changes won't be propagated to
jobs. The resulting Configuration object resides on the JVM heap.
Thus, the same memory limits apply as to any regular Java object.

Cheers,
Max

On Thu, Aug 25, 2016 at 6:33 AM, Janardhan Reddy
 wrote:
> Hi,
>
> Is global configuration same for all jobs in a Flink cluster.
>
> Is it a good idea to write a custom source which polls some external source
> every x minutes and updates the global config. Will the config change be
> propagated across all jobs?
>
> What happens when the size of global config grows too big?  Where is the
> global config stored


Running multiple jobs on yarn (without yarn-session)

2016-08-25 Thread Niels Basjes
Hi,

I created a small application that needs to run multiple (batch) jobs on
Yarn and then terminate.
In this case I'm exporting data from a list of HBase tables

I essentially do right now the following:

flink run -m yarn-cluster -yn 10  bla.jar ...

And in my main I do

foreach thing I need to do {
   ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
   env. ... define the batch job.
   env.execute
}

In the second job I submit I get an exception:
java.lang.RuntimeException: Unable to tell application master to stop once
the specified job has been finised
at
org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184)
at
org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220)
at com.bol.tools.hbase.export.Main.run(Main.java:81)
at com.bol.tools.hbase.export.Main.main(Main.java:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995)
at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992)
at
org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at
org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at scala.concurrent.Await.result(package.scala)
at
org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182)
... 25 more


How do I (without using yarn-session) tell the YarnClusterClient to simply
'keep running because there will be more jobs'?

If I run this same code in a yarn-session it works but then I have the
troubles of starting a (detached yarn-session) AND to terminate that thing
again after my jobs have run.

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: How to set a custom JAVA_HOME when run flink on YARN?

2016-08-25 Thread Maximilian Michels
Preferably, you set that directly in the config using

env.java.home: /path/to/java/home

If unset, Flink will use the $JAVA_HOME environment variable.

Cheers,
Max

On Thu, Aug 25, 2016 at 10:39 AM, Renkai  wrote:
> I think I solved myself,just add  -yD yarn.taskmanager.env.JAVA_HOME=xx in
> the command line, a little hard to find the solution though.
>
>
>
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-set-a-custom-JAVA-HOME-when-run-flink-on-YARN-tp8676p8681.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Re: How to set a custom JAVA_HOME when run flink on YARN?

2016-08-25 Thread Renkai
I think I solved myself,just add  -yD yarn.taskmanager.env.JAVA_HOME=xx in
the command line, a little hard to find the solution though.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-set-a-custom-JAVA-HOME-when-run-flink-on-YARN-tp8676p8681.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Delaying starting the jobmanager in yarn?

2016-08-25 Thread Niels Basjes
Hi,

We have a situation where we need to start a flink batch job on a yarn
cluster the moment an event arrives over a queue.
These events occur at a very low rate (like once or twice a week).

The idea we have is to run an application that listens to the queue and
executes the batch when it receives a message.

We found that if we start this using 'flink run -m yarn-cluster ..." the
moment we run this the jobmanager in yarn is started and the resources for
these batches is claimed immediately.

What is the recommended way to only claim these resources when we actually
have a job to run?
Can we 'manually' start and stop the jobmanager in yarn in some way from
our java code?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: flink - Working with State example

2016-08-25 Thread Aljoscha Krettek
Hi,
you mean the directory is completely empty? Can you check in the JobManager
dashboard whether it reports any successful checkpoints for the job? One
possible explanation is an optimization that the FsStateBackend performs:
when the state is very small it will not actually be written to files but
stored in the meta data of the checkpoint that is sent to the JobManager.
This would explain why there are no files. You can set the threshold size
for this optimization with an additional FsStateBackend constructor
parameter, i.e. new FsStateBackend("file:///home/buvana/flink/checkpoints",
0) to disable this optimization.

Cheers,
Aljoscha

On Fri, 12 Aug 2016 at 21:12 Ramanan, Buvana (Nokia - US) <
buvana.rama...@nokia-bell-labs.com> wrote:

> Hi Kostas,
>
> I am trying to use FsStateBackend as the backend for storing state. And
> configure it as follows in the code:
>
>StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend(new
> FsStateBackend("file:///home/buvana/flink/checkpoints"));
> env.enableCheckpointing(1);
>
> everything else is same as the code I shared with you previously.
>
> When I execute, I see that a directory is created under
> /home/buvana/flink/checkpoints, but there is nothing under that directory.
> I was expecting to find some file / sub dir there.
>
> Please explain.
>
> Thanks,
> Buvana
>
> -Original Message-
> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
> Sent: Friday, August 12, 2016 1:37 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
>
> No problem!
>
> Regards,
> Kostas
>
> > On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) <
> buvana.rama...@nokia-bell-labs.com> wrote:
> >
> > Kostas,
> > Good catch! That makes it working! Thank you so much for the help.
> > Regards,
> > Buvana
> >
> > -Original Message-
> > From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
> > Sent: Thursday, August 11, 2016 11:22 AM
> > To: user@flink.apache.org
> > Subject: Re: flink - Working with State example
> >
> > Hi Buvana,
> >
> > At a first glance, your snapshotState() should return a Double.
> >
> > Kostas
> >
> >> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <
> buvana.rama...@nokia-bell-labs.com> wrote:
> >>
> >> Thank you Kostas & Ufuk. I get into the following compilation error
> when I use checkpointed interface. Pasting the code & message as follows:
> >>
> >> Is the Serializable definition supposed to be from java.io.Serializable
> or somewhere else?
> >>
> >> Thanks again,
> >> Buvana
> >>
> >> ==
> >> ==
> >> Code:
> >>
> >> import org.apache.flink.api.common.functions.FlatMapFunction;
> >> import org.apache.flink.api.common.functions.MapFunction;
> >> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
> >> import org.apache.flink.configuration.Configuration;
> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
> >> import org.apache.flink.api.common.typeinfo.TypeHint;
> >> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> >>
> >> import java.io.Serializable;
> >> import org.apache.flink.api.java.tuple.Tuple2;
> >> import org.apache.flink.streaming.api.datastream.DataStream;
> >> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> >> import
> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >> import
> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> >> import
> >> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> >> import org.apache.flink.util.Collector;
> >>
> >> import java.util.Properties;
> >>
> >> /**
> >> * Created by buvana on 8/9/16.
> >> */
> >> public class stateful {
> >>   private static String INPUT_KAFKA_TOPIC = null;
> >> ---
> >> --- skipping the main as it’s the same as before except for class name
> >> change -
> >> ---
> >>  public static class MapStateful extends
> RichFlatMapFunction>
> >>   implements Checkpointed {
> >>
> >>   private Double prev_tuple = null;
> >>
> >>   @Override
> >>   public void flatMap(String incString, Collector Double>> out) {
> >>   try {
> >>   Double value = Double.parseDouble(incString);
> >>   System.out.println("value = " + value);
> >>   System.out.println(prev_tuple);
> >>
> >>   Double value2 = value - prev_tuple;
> >>   prev_tuple = value;
> >>
> >>   Tuple2 tp = new Tuple2();
> >>   tp.setField(INPUT_KAFKA_TOPIC, 0);
> >>   tp.setField(value2, 1);
> >>   out.collect(tp);
> >>   } catch (NumberFormatException e) {
> >>   System.out.println("Could not convert to Float" +
>