[jira] [Created] (FLINK-9641) Pulsar Source Connector

2018-06-21 Thread Chris Kellogg (JIRA)
Chris Kellogg created FLINK-9641:


 Summary: Pulsar Source Connector
 Key: FLINK-9641
 URL: https://issues.apache.org/jira/browse/FLINK-9641
 Project: Flink
  Issue Type: New Feature
  Components: Streaming Connectors
Reporter: Chris Kellogg


Pulsar (https://github.com/apache/incubator-pulsar) is a distributed pub-sub 
messaging system currently in apache incubation. It is a very active project 
and there are committers from various companies and good adoption. This pr will 
add a source function to allow Flink jobs to process messages from Pulsar 
topics.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9640) Checkpointing is aways aborted if any task has been finished

2018-06-21 Thread Hai Zhou (JIRA)
Hai Zhou created FLINK-9640:
---

 Summary: Checkpointing is aways aborted if any task has been 
finished
 Key: FLINK-9640
 URL: https://issues.apache.org/jira/browse/FLINK-9640
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0, 1.3.2, 1.4.0, 1.3.0, 1.6.0
Reporter: Hai Zhou
Assignee: Hai Zhou
 Fix For: 1.6.0


steps to reproduce:
1. build a standalone flink cluster.
2. submit a test job like this below:
{code:java}
public class DemoJob {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
env.setParallelism(4);
env.enableCheckpointing(3000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

DataStream inputStream = env.addSource(new 
StringGeneratorParallelSourceFunction());

inputStream.map(String::hashCode).print();

env.execute();
}

public static class StringGeneratorParallelSourceFunction extends 
RichParallelSourceFunction {
private static final Logger LOG = 
LoggerFactory.getLogger(StringGeneratorParallelSourceFunction.class);
private volatile boolean running = true;
private int index;
private int subtask_nums;

@Override
public void open(Configuration parameters) throws Exception {
index = getRuntimeContext().getIndexOfThisSubtask();
subtask_nums = getRuntimeContext().getNumberOfParallelSubtasks();
}

@Override
public void run(SourceContext ctx) throws Exception {

while (running) {
String data = UUID.randomUUID().toString();
ctx.collect(data);
LOG.info("subtask_index = {}, emit string = {}", index, data);
Thread.sleep(50);
if (index == subtask_nums / 2) {
running = false;
LOG.info("subtask_index = {}, finished.", index);
}
}
}

@Override
public void cancel() {
running = false;
}
}
}
{code}

3. observer jm and tm logs can be found.
*taskmanager.log*
{code:java}
2018-06-21 17:05:54,144 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 2, emit string = 5b0c2467-ad2e-4b53-b1a4-7a0f64560570
2018-06-21 17:05:54,151 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 0, emit string = 11af78b3-59ea-467c-a267-7c2238e44ffe
2018-06-21 17:05:54,195 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 2, finished.
2018-06-21 17:05:54,200 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) 
switched from RUNNING to FINISHED.
2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task 
- Freeing task resources for Source: Custom Source (3/4) 
(6b2a374bec5f31112811613537dd4fd9).
2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task 
- Ensuring all FileSystem streams are closed for task Source: 
Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) [FINISHED]
2018-06-21 17:05:54,202 INFO  org.apache.flink.yarn.YarnTaskManager 
- Un-registering task and sending final execution state FINISHED to 
JobManager for task Source: Custom Source (6b2a374bec5f31112811613537dd4fd9)
2018-06-21 17:05:54,211 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 0, emit string = f29f48fd-ca53-4a96-b596-93948c09581d
{code}

*jobmanager.log*
{code:java}
2018-06-21 17:05:52,682 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
(2/4) (3aee8bf5103065e8b57ebd0e214141ae) switched from DEPLOYING to RUNNING.
2018-06-21 17:05:52,683 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (2/4) 
(de90106d04f63cb9ea531308202e233f) switched from DEPLOYING to RUNNING.
2018-06-21 17:05:54,219 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to 
FINISHED.
2018-06-21 17:05:54,224 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (3/4) 
(8f523afb97dc848a9578f9cae5870421) switched from RUNNING to FINISHED.
2018-06-21 17:05:54,224 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
(3/4) (39f76a87d20cfc491e11f0b5b08ec5c2) switched from RUNNING to FINISHED.
2018-06-21 17:05:55,069 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 

Re: Am I forgetting to do something to get pull request accepted?

2018-06-21 Thread Jeff Carter
Ok, thanks!

On Thu, Jun 21, 2018, 1:18 AM Chesnay Schepler  wrote:

> The Flink project currently receives too many pull requests / has to few
> reviewers to ensure timely reviews.
>
> I assume you are talking about https://github.com/apache/flink/pull/5876
> If this is correct, something you could already do is squash all commits
> and rebase against master to get rid of the merge commits.
> The new InputFormat also shares a lot of code with the existing
> CassandraOutputFormat. Shared portions could be moved in abstract
> CassandraInputFormatBase class.
>
> On 20.06.2018 20:50, jpcarter...@gmail.com wrote:
> > I just want to make sure I am not missing anything to get my pull
> request accepted. All tests are passing so I figured it was going to be a
> short time after that. Is there anything else that I need to do or a reason
> like there is a freeze on accepting pull requests for the moment? Here is
> the link to it in Travis IO:
> https://travis-ci.org/Jicaar/flink/builds/386283019
> >
>
>


Re: [WEBSITE] Proposal to rework the Flink website

2018-06-21 Thread Fabian Hueske
Hi,

I've merged the proposed changes for the website.
As usual, we can incrementally refine and improve it.

Best,
Fabian

2018-06-15 16:16 GMT+02:00 Fabian Hueske :

> Hi,
>
> I'm planning to put the reworked website online next week.
> You can have a look at PR #109 [1] to check the changes and give feedback.
>
> Btw. I've created FLINK-9541 to track the effort of adding a sitemap.xml.
>
> Cheers,
> Fabian
>
> [1] https://github.com/apache/flink-web/pull/109
> [2] https://issues.apache.org/jira/browse/FLINK-9541
>
>
> 2018-06-06 12:57 GMT+02:00 Fabian Hueske :
>
>> Thanks for the feedback so far.
>>
>> +1 for adding a sitemap.xml and robots.txt to the website.
>>
>> I think we can make this a separate issue. I'll create a JIRA for that.
>>
>> Any other thoughts or feedback?
>>
>> Thanks, Fabian
>>
>> 2018-06-06 9:34 GMT+02:00 Aljoscha Krettek :
>>
>>> Yes, making sure that google search results point to the most recent doc
>>> would be very good. :+1
>>>
>>> Also +1 to the general effort, of course.
>>>
>>> > On 5. Jun 2018, at 20:19, Ken Krugler 
>>> wrote:
>>> >
>>> > Along these lines, it would help to add a sitemap (and the robots.txt
>>> required to reference it) for flink.apache.org and ci.apache.org (for
>>> /projects/flink)
>>> >
>>> > You can see what Tomcat did along these lines -
>>> http://tomcat.apache.org/robots.txt references
>>> http://tomcat.apache.org/sitemap.xml, which is a sitemap index file
>>> pointing to http://tomcat.apache.org/sitemap-main.xml
>>> >
>>> > By doing this, you can emphasize more recent versions of docs. There
>>> are other benefits, but reducing poor Google search results (to me) is the
>>> biggest win.
>>> >
>>> > E.g.  https://www.google.com/search?q=flink+reducingstate <
>>> https://www.google.com/search?q=flink+reducingstate> (search on flink
>>> reducing state) shows the 1.3 Javadocs (hit #1), master (1.6-SNAPSHOT)
>>> Javadocs (hit #2), and then many pages of other results.
>>> >
>>> > Whereas the Javadocs for 1.5 >> s/flink/flink-docs-release-1.5/api/java/org/apache/flink/api
>>> /common/state/ReducingState.html> and 1.4 >> s/flink/flink-docs-release-1.4/api/java/org/apache/flink/api
>>> /common/state/ReducingState.html> are nowhere to be found.
>>> >
>>> > Thoughts?
>>> >
>>> > — Ken
>>> >
>>> >> On Jun 5, 2018, at 9:46 AM, Stephan Ewen  wrote:
>>> >>
>>> >> Big +1 to this!
>>> >>
>>> >> I would like to contribute to this effort and help strengthen the way
>>> Flink
>>> >> presents itself.
>>> >>
>>> >>
>>> >> On Tue, Jun 5, 2018 at 11:56 AM, Fabian Hueske 
>>> wrote:
>>> >>
>>> >>> Hi everybody,
>>> >>>
>>> >>> I've opened a PR [1] that reworks parts of the Flink website (
>>> >>> flink.apache.org).
>>> >>>
>>> >>> My goal is to improve the structure of the website and provide more
>>> >>> valuable information about the project and the community.
>>> >>>
>>> >>> A visitor (who doesn't know Flink yet) should be able to easily find
>>> >>> answers to the following questions:
>>> >>> * What is Apache Flink?
>>> >>> * Does it address my use case?
>>> >>> * Is it credible? / Who is using it?
>>> >>>
>>> >>> To achieve that, I have:
>>> >>> * Rework menu structure into three sections to address different
>>> audiences:
>>> >>> - Potential users (see above)
>>> >>> - Users
>>> >>> - Contributors
>>> >>> * Reworked start page: updated the figure, added a feature grid,
>>> moved
>>> >>> "Powered By" section up
>>> >>> * Replaced Features page by more detailed "What is Flink?" pages
>>> >>> * Reworked "Use Cases" page
>>> >>>
>>> >>> The PR should also improve the page for users who have questions
>>> about
>>> >>> Flink or need help.
>>> >>> For that, I have:
>>> >>> * Added a "Getting Help" page (less content than the detailed
>>> community
>>> >>> page)
>>> >>> * Removed IRC channel info
>>> >>>
>>> >>> Please give feedback, suggest improvements, and proof read the new
>>> texts.
>>> >>>
>>> >>> Thanks, Fabian
>>> >>>
>>> >>> [1] https://github.com/apache/flink-web/pull/109
>>> >>>
>>> >
>>> > 
>>> > http://about.me/kkrugler
>>> > +1 530-210-6378
>>> >
>>>
>>>
>>
>


[jira] [Created] (FLINK-9639) Order "User Configuration" Alphabetically on Flink Dashboard

2018-06-21 Thread Guilherme Nobre (JIRA)
Guilherme Nobre created FLINK-9639:
--

 Summary: Order "User Configuration" Alphabetically on Flink 
Dashboard
 Key: FLINK-9639
 URL: https://issues.apache.org/jira/browse/FLINK-9639
 Project: Flink
  Issue Type: Improvement
  Components: Web Client, Webfrontend
Affects Versions: 1.4.0
Reporter: Guilherme Nobre


The *User Configuration* list is not ordered alphabetically. Would be nice to 
do so, even more for people that like me have configurations with prefixes. 
This way the configurations would be nicely ordered :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9638) Add helper script to run single e2e test

2018-06-21 Thread Florian Schmidt (JIRA)
Florian Schmidt created FLINK-9638:
--

 Summary: Add helper script to run single e2e test
 Key: FLINK-9638
 URL: https://issues.apache.org/jira/browse/FLINK-9638
 Project: Flink
  Issue Type: Improvement
Reporter: Florian Schmidt
Assignee: Florian Schmidt






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9637) Add public user documentation for TTL feature

2018-06-21 Thread Andrey Zagrebin (JIRA)
Andrey Zagrebin created FLINK-9637:
--

 Summary: Add public user documentation for TTL feature
 Key: FLINK-9637
 URL: https://issues.apache.org/jira/browse/FLINK-9637
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: Andrey Zagrebin
 Fix For: 1.6.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9636) Network buffer leaks in requesting a batch of segments during canceling

2018-06-21 Thread zhijiang (JIRA)
zhijiang created FLINK-9636:
---

 Summary: Network buffer leaks in requesting a batch of segments 
during canceling
 Key: FLINK-9636
 URL: https://issues.apache.org/jira/browse/FLINK-9636
 Project: Flink
  Issue Type: Bug
  Components: Network
Affects Versions: 1.6.0
Reporter: zhijiang


In {{NetworkBufferPool#requestMemorySegments}}, {{numTotalRequiredBuffers}} is 
increased {{numRequiredBuffers}} first.

If {{InterruptedExeption}} is thrown during polling segments from the available 
queue, the requested segments will be recycled back to {{NetworkBufferPool}}, 
{{numTotalRequiredBuffers}} is decreased by the number of polled segments which 
is not inconsistent with {{numRequiredBuffers}}. So {{numTotalRequiredBuffers}} 
in {{NetworkBufferPool}} leaks in this case, and we can also decrease 
{{numRequiredBuffers}} to fix this bug.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Running a Scala Job doesn't produce print output

2018-06-21 Thread Mano Swerts
Hi guys,

I am going to answer my own question ;) I looked at a Scala example in the 
Flink Github repo, which uses ExecutionEnvironment.getExecutionEnvironment to 
obtain the environment. That apparently doesn’t work.

When I change this to StreamExecutionEnvironment.getExecutionEnvironment, as 
used in the Flink Maven archetype, it works fine.

I don’t know whether this is a bug or the example needs updating. At least now 
this has been recorded for others struggling with the same issue in the future.

— Mano

On 21 Jun 2018, at 11:27, Mano Swerts 
mailto:mano.swe...@ixxus.com>> wrote:

Hi guys,

I have a question. I have been playing around with Fink this week and created 
some basic Java jobs that work fine. Now I am trying to run one in Scala.

Running this code in the Scala REP prints the expected output:

env.fromElements(1, 2, 3).map(i => " Integer: " + i).print()

However, having it packaged in a JAR which I then deploy through the user 
interface doesn’t give me any output at all. I can start the job and it 
finishes without exceptions, but I don’t see the result of the print() 
statement in the log. The class looks like this:


package com.ixxus.playground.fmk.flink

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._

object LearnDocumentEntityRelationship {

def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val params: ParameterTool = ParameterTool.fromArgs(args)

env.fromElements(1, 2, 3).map(i => " Integer: " + i).print()

env.execute("Scala example")
}
}


I did notice that the job name isn’t what I pass to env.execute. It is named 
“Flink Java Job”:




I can’t find anything online however about this phenomenon. Does anyone have 
any idea?

Thanks.

— Mano



Re: Running a Scala Job doesn't produce print output

2018-06-21 Thread Till Rohrmann
Please verify that you have set the correct main class manifest in you
pom.xml when you build the user code jar. Alternatively you can specify the
class to execute via `bin/flink run -c CLASS_TO_EXECUTE` if your user code
jar contains multiple classes.

Cheers,
Till

On Thu, Jun 21, 2018 at 11:28 AM Mano Swerts  wrote:

> Hi guys,
>
> I have a question. I have been playing around with Fink this week and
> created some basic Java jobs that work fine. Now I am trying to run one in
> Scala.
>
> Running this code in the Scala REP prints the expected output:
>
> *env.fromElements(1, 2, 3).map(i => " Integer: " + i).print()*
>
> However, having it packaged in a JAR which I then deploy through the user
> interface doesn’t give me any output at all. I can start the job and it
> finishes without exceptions, but I don’t see the result of the print()
> statement in the log. The class looks like this:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *package com.ixxus.playground.fmk.flink
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.api.scala._ object LearnDocumentEntityRelationship
> { def main(args: Array[String]) { val env =
> ExecutionEnvironment.getExecutionEnvironment val params:
> ParameterTool = ParameterTool.fromArgs(args)
> env.fromElements(1, 2, 3).map(i => " Integer: " + i).print()
> env.execute("Scala example") } }*
>
>
> I did notice that the job name isn’t what I pass to env.execute. It is
> named “Flink Java Job”:
>
>
>
> I can’t find anything online however about this phenomenon. Does anyone
> have any idea?
>
> Thanks.
>
> — Mano
>


Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Till Rohrmann
The reason why you still have to do it is because we still have to support
the legacy mode where the client needs to know the JobManager RPC address.
Once we remove the legacy mode, we could change the
HighAvailabilityServices such that we have client facing HA services which
only retrieve the rest server endpoint and cluster internal HA services
which need to know the cluster components address at cluster startup.

Cheers,
Till

On Thu, Jun 21, 2018 at 11:38 AM Sampath Bhat 
wrote:

> hi
> Yes I've specified the rest.address for the flink client to connect to the
> rest.address and the rest.address is valid and working fine but my question
> is why am I supposed to give jobmanager.rpc.address for flink client to
> connect to flink cluster if flink client depends only on rest.address?
>
> On Thu, Jun 21, 2018 at 12:41 PM, Till Rohrmann 
> wrote:
>
>> Hi,
>>
>> if the rest.address is different from the jobmanager.rpc.address, then
>> you should specify that in the flink-conf.yaml and Flink will connect to
>> rest.address. Only if rest.address is not specified, the system will fall
>> back to use the jobmanager.rpc.address. Currently, the rest server endpoint
>> runs in the same JVM as the cluster entrypoint and all JobMasters.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 21, 2018 at 8:46 AM Sampath Bhat 
>> wrote:
>>
>>> Hello Till
>>>
>>> Thanks for clarification. But I've few questions based on your reply.
>>>
>>> In non-HA setups we need the jobmanager.rpc.address to derive the
>>> hostname
>>> of the rest server.
>>> why is there dependency on jobmanager.rpc.address to get the hostname
>>> rest
>>> server? This holds good only for normal deployments such as on bare
>>> metal,
>>> virtual machine where flink cluster runs as another process in a machine.
>>> But if we try deploy flink on kubernetes then there could be possiblity
>>> where jobmanager.rpc.address and rest.address different from each other.
>>>
>>> So if rest.address is not provided in flink-conf.yaml then looking for
>>> jobmanager.rpc.address for deriving the hostname of rest server makes
>>> sense, but when the user has already provided the rest.address but flink
>>> still looks into jobmanager.rpc.address for getting hostname of rest
>>> server
>>> is an unwanted dependency IMO.
>>>
>>> In HA setup the rpc.address is obtained from zookeeper so user need not
>>> worry about unnecessary properties while submitting job.
>>>
>>> On Wed, Jun 20, 2018 at 1:25 PM, Till Rohrmann 
>>> wrote:
>>>
>>> > It will, but it defaults to jobmanager.rpc.address if no rest.address
>>> has
>>> > been specified.
>>> >
>>> > On Wed, Jun 20, 2018 at 9:49 AM Chesnay Schepler 
>>> > wrote:
>>> >
>>> >> Shouldn't the non-HA case be covered by rest.address?
>>> >>
>>> >> On 20.06.2018 09:40, Till Rohrmann wrote:
>>> >>
>>> >> Hi Sampath,
>>> >>
>>> >> it is no longer possible to not start the rest server endpoint by
>>> setting
>>> >> rest.port to -1. If you do this, then the cluster won't start. The
>>> comment
>>> >> in the flink-conf.yaml holds only true for the legacy mode.
>>> >>
>>> >> In non-HA setups we need the jobmanager.rpc.address to derive the
>>> >> hostname of the rest server. The jobmanager.rpc.port is no longer
>>> needed
>>> >> for the client but only for the other cluster components (TMs). When
>>> using
>>> >> the HA mode, then every address will be retrieved from ZooKeeper.
>>> >>
>>> >> I hope this clarifies things.
>>> >>
>>> >> Cheers,
>>> >> Till
>>> >>
>>> >> On Wed, Jun 20, 2018 at 9:24 AM Chesnay Schepler 
>>> >> wrote:
>>> >>
>>> >>> I was worried this might be the case.
>>> >>>
>>> >>> The rest.port handling was simply copied from the legacy web-server,
>>> >>> which explicitly allowed shutting it down.
>>> >>> It may (I'm not entirely sure) also not be necessary for all
>>> deployment
>>> >>> modes; for example if the job is baked into the job/taskmanager
>>> images.
>>> >>>
>>> >>> I'm not quite sure whether the rpc address is actually required for
>>> the
>>> >>> REST job submission, or only since we still rely partly on some
>>> legacy
>>> >>> code (ClusterClient). Maybe Till (cc) knows the answer to that.
>>> >>>
>>> >>> > Adding on to this point you made - " the rpc address is still
>>> >>> *required *due
>>> >>> > to some technical implementations; it may be that you can set this
>>> to
>>> >>> some
>>> >>> > arbitrary value however."
>>> >>> >
>>> >>> > For job submission to happen successfully we should give specific
>>> rpc
>>> >>> > address and not any arbitrary value. If any arbitrary value is
>>> given
>>> >>> the
>>> >>> > job submission fails with the following error -
>>> >>> > org.apache.flink.client.deployment.ClusterRetrieveException:
>>> Couldn't
>>> >>> > retrieve standalone cluster
>>> >>> >  at
>>> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>>> >>> retrieve(StandaloneClusterDescriptor.java:51)
>>> >>> >  at
>>> >>> > 

Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Sampath Bhat
hi
Yes I've specified the rest.address for the flink client to connect to the
rest.address and the rest.address is valid and working fine but my question
is why am I supposed to give jobmanager.rpc.address for flink client to
connect to flink cluster if flink client depends only on rest.address?

On Thu, Jun 21, 2018 at 12:41 PM, Till Rohrmann 
wrote:

> Hi,
>
> if the rest.address is different from the jobmanager.rpc.address, then you
> should specify that in the flink-conf.yaml and Flink will connect to
> rest.address. Only if rest.address is not specified, the system will fall
> back to use the jobmanager.rpc.address. Currently, the rest server endpoint
> runs in the same JVM as the cluster entrypoint and all JobMasters.
>
> Cheers,
> Till
>
> On Thu, Jun 21, 2018 at 8:46 AM Sampath Bhat 
> wrote:
>
>> Hello Till
>>
>> Thanks for clarification. But I've few questions based on your reply.
>>
>> In non-HA setups we need the jobmanager.rpc.address to derive the hostname
>> of the rest server.
>> why is there dependency on jobmanager.rpc.address to get the hostname rest
>> server? This holds good only for normal deployments such as on bare metal,
>> virtual machine where flink cluster runs as another process in a machine.
>> But if we try deploy flink on kubernetes then there could be possiblity
>> where jobmanager.rpc.address and rest.address different from each other.
>>
>> So if rest.address is not provided in flink-conf.yaml then looking for
>> jobmanager.rpc.address for deriving the hostname of rest server makes
>> sense, but when the user has already provided the rest.address but flink
>> still looks into jobmanager.rpc.address for getting hostname of rest
>> server
>> is an unwanted dependency IMO.
>>
>> In HA setup the rpc.address is obtained from zookeeper so user need not
>> worry about unnecessary properties while submitting job.
>>
>> On Wed, Jun 20, 2018 at 1:25 PM, Till Rohrmann 
>> wrote:
>>
>> > It will, but it defaults to jobmanager.rpc.address if no rest.address
>> has
>> > been specified.
>> >
>> > On Wed, Jun 20, 2018 at 9:49 AM Chesnay Schepler 
>> > wrote:
>> >
>> >> Shouldn't the non-HA case be covered by rest.address?
>> >>
>> >> On 20.06.2018 09:40, Till Rohrmann wrote:
>> >>
>> >> Hi Sampath,
>> >>
>> >> it is no longer possible to not start the rest server endpoint by
>> setting
>> >> rest.port to -1. If you do this, then the cluster won't start. The
>> comment
>> >> in the flink-conf.yaml holds only true for the legacy mode.
>> >>
>> >> In non-HA setups we need the jobmanager.rpc.address to derive the
>> >> hostname of the rest server. The jobmanager.rpc.port is no longer
>> needed
>> >> for the client but only for the other cluster components (TMs). When
>> using
>> >> the HA mode, then every address will be retrieved from ZooKeeper.
>> >>
>> >> I hope this clarifies things.
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Wed, Jun 20, 2018 at 9:24 AM Chesnay Schepler 
>> >> wrote:
>> >>
>> >>> I was worried this might be the case.
>> >>>
>> >>> The rest.port handling was simply copied from the legacy web-server,
>> >>> which explicitly allowed shutting it down.
>> >>> It may (I'm not entirely sure) also not be necessary for all
>> deployment
>> >>> modes; for example if the job is baked into the job/taskmanager
>> images.
>> >>>
>> >>> I'm not quite sure whether the rpc address is actually required for
>> the
>> >>> REST job submission, or only since we still rely partly on some legacy
>> >>> code (ClusterClient). Maybe Till (cc) knows the answer to that.
>> >>>
>> >>> > Adding on to this point you made - " the rpc address is still
>> >>> *required *due
>> >>> > to some technical implementations; it may be that you can set this
>> to
>> >>> some
>> >>> > arbitrary value however."
>> >>> >
>> >>> > For job submission to happen successfully we should give specific
>> rpc
>> >>> > address and not any arbitrary value. If any arbitrary value is given
>> >>> the
>> >>> > job submission fails with the following error -
>> >>> > org.apache.flink.client.deployment.ClusterRetrieveException:
>> Couldn't
>> >>> > retrieve standalone cluster
>> >>> >  at
>> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>> >>> retrieve(StandaloneClusterDescriptor.java:51)
>> >>> >  at
>> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>> >>> retrieve(StandaloneClusterDescriptor.java:31)
>> >>> >  at
>> >>> > org.apache.flink.client.cli.CliFrontend.runProgram(
>> >>> CliFrontend.java:249)
>> >>> >  at org.apache.flink.client.cli.
>> CliFrontend.run(CliFrontend.
>> >>> java:210)
>> >>> >  at
>> >>> > org.apache.flink.client.cli.CliFrontend.parseParameters(
>> >>> CliFrontend.java:1020)
>> >>> >  at
>> >>> > org.apache.flink.client.cli.CliFrontend.lambda$main$9(
>> >>> CliFrontend.java:1096)
>> >>> >  at
>> >>> > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(
>> >>> NoOpSecurityContext.java:30)

Re: Ordering of stream from different kafka partitions

2018-06-21 Thread Amol S - iProgrammer
Hello andrey,

Thanks for the help.

I am trying to implement your above given code code

 sourceStream
.setParallelism(4)
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<>(…) {…})
.windowAll(TumblingEventTimeWindows.of(Time...))
.process(new OrderTheRecords()))

but I am facing issues to write *OrderTheRecords *class as I am new to this
framework can you please suggest me what is optimal way to sort the records?

I have implemented below ProcessWindowFunction code

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class OrderTheRecords extends ProcessWindowFunction {

  @Override
public void process(Long s, Context context, Iterable
iterable, Collector collector) throws Exception {
for (Oplog oplog : iterable) {
collector.collect(oplog);
}

}
}


public class Oplog {

private OplogTimestamp ts;
private String op;
private BasicDBObject o;

}

here *ts* represents even timestamp.

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com 


On Wed, Jun 20, 2018 at 6:51 PM, Andrey Zagrebin 
wrote:

> Hi Amol,
>
> In above code also it will sort the records in specific time window only.
>
>
> All windows will be emitted as watermark passes the end of the window. The
> watermark only increases. So the non-overlapping windows should be also
> sorted by time and as a consequence the records across windows either, if
> this is the concern about sorting records only in a specific time window.
>
> 1. How should I create N consumers dynamically based on partition count?
>
>
> sourceStream.setParallelism(N), each Flink consumer parallel subtask will
> serve one Kafka partition.
>
> 2. Is number of consumers dynamically grows as number of partition
> increased in middle of execution?
>
>
> Dynamically added Kafka partitions will be eventually discovered by Flink
> consumers (flink.partition-discovery.interval-millis) and picked up by
> some consumer. Flink job has be rescaled separately.
>
> Currently parallelism of Flink operator cannot be changed while the job is
> running. The way to go now is to use savepoint/checkpoint, stop the job and
> start the new one with changed parallelism from the
> previous savepoint/checkpoint (see Flink docs). New job will pick up from
> partition offsets of previous job.
>
> 3. How to create partition specific kafka consumer in flink?
>
>
> The partition-consumer assignment is now implementation specific for Flink.
> There is an open issue for custom assignment https://issues.apac
> he.org/jira/browse/FLINK-8570 e.g. if you need specific locality of
> keys/consumers.
>
> I would simply suggest to assign some key to each record and let all
> records for particular key to go into the same Kafka partition. On the
> Flink side if a corresponding keyBy() is applied to the Kafka source, all
> the records for this particular key will go to the same parallel subtask of
> subsequent operator, sorted by time if they were originally sorted in its
> Kafka partition. This is more scalable approach than total global ordering.
>
> Cheers,
> Andrey
>
> On 20 Jun 2018, at 13:17, Amol S - iProgrammer 
> wrote:
>
> Hello Andrey,
>
> In above code also it will sort the records in specific time window only.
> Anyways we agreed to create N number of partitions with N number of
> consumers based on some key as order is maintained per kafka partition.
>
> I have some questions about this.
>
> 1. How should I create N consumers dynamically based on partition count?
> 2. Is number of consumers dynamically grows as number of partition
> increased in middle of execution?
> 3. How to create partition specific kafka consumer in flink?
>
> ---
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
>
>
> *iProgrammer Solutions Pvt. Ltd.*
>
>
>
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com 
> 
>
> On Wed, Jun 20, 2018 at 2:38 PM, Andrey Zagrebin  >
> wrote:
>
> Hi,
>
> Good point, sorry for confusion, BoundedOutOfOrdernessTimestampExtractor
> of course does not buffer records, you need to apply windowing
> (e.g. TumblingEventTimeWindows) for that and then sort the window output
> by time and emit records in sorted order.
>
> You 

Running a Scala Job doesn't produce print output

2018-06-21 Thread Mano Swerts
Hi guys,

I have a question. I have been playing around with Fink this week and created 
some basic Java jobs that work fine. Now I am trying to run one in Scala.

Running this code in the Scala REP prints the expected output:

env.fromElements(1, 2, 3).map(i => " Integer: " + i).print()

However, having it packaged in a JAR which I then deploy through the user 
interface doesn’t give me any output at all. I can start the job and it 
finishes without exceptions, but I don’t see the result of the print() 
statement in the log. The class looks like this:


package com.ixxus.playground.fmk.flink

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._

object LearnDocumentEntityRelationship {

def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val params: ParameterTool = ParameterTool.fromArgs(args)

env.fromElements(1, 2, 3).map(i => " Integer: " + i).print()

env.execute("Scala example")
}
}


I did notice that the job name isn’t what I pass to env.execute. It is named 
“Flink Java Job”:

[cid:2702EFEA-4621-48AD-8259-8671011EB519]


I can’t find anything online however about this phenomenon. Does anyone have 
any idea?

Thanks.

— Mano


[jira] [Created] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-06-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9635:


 Summary: Local recovery scheduling can cause spread out of tasks
 Key: FLINK-9635
 URL: https://issues.apache.org/jira/browse/FLINK-9635
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
 Fix For: 1.6.0, 1.5.1


In order to make local recovery work, Flink's scheduling was changed such that 
it tries to be rescheduled to its previous location. In order to not occupy 
slots which have state of other tasks cached, the strategy will request a new 
slot if the old slot identified by the previous allocation id is no longer 
present. This also applies to newly allocated slots because there is no 
distinction between new or already used. This behaviour can cause that every 
tasks gets deployed to its own slot if the {{SlotPool}} has released all slots 
in the meantime, for example.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9634) Deactivate previous location based scheduling if local recovery is disabled

2018-06-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9634:


 Summary: Deactivate previous location based scheduling if local 
recovery is disabled
 Key: FLINK-9634
 URL: https://issues.apache.org/jira/browse/FLINK-9634
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.6.0, 1.5.1


With Flink 1.5.0 we introduced local recovery. In order to make local recovery 
work we had to change the scheduling to be aware of the previous location of 
the {{Execution}}. This scheduling strategy is also active if local recovery is 
deactivated. I suggest to also disable the scheduling strategy if local 
recovery is not enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.

2018-06-21 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9633:
-

 Summary: Flink doesn't use the Savepoint path's filesystem to 
create the OuptutStream on Task.
 Key: FLINK-9633
 URL: https://issues.apache.org/jira/browse/FLINK-9633
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0, 1.5.1


Currently, flink use the Savepoint's filesystem to create the meta output 
stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses 
the Checkpoint's filesystem to create the checkpoint data output stream. When 
the Savepoint & Checkpoint in different filesystem this will lead to 
problematic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Am I forgetting to do something to get pull request accepted?

2018-06-21 Thread Chesnay Schepler
The Flink project currently receives too many pull requests / has to few 
reviewers to ensure timely reviews.


I assume you are talking about https://github.com/apache/flink/pull/5876
If this is correct, something you could already do is squash all commits 
and rebase against master to get rid of the merge commits.
The new InputFormat also shares a lot of code with the existing 
CassandraOutputFormat. Shared portions could be moved in abstract 
CassandraInputFormatBase class.


On 20.06.2018 20:50, jpcarter...@gmail.com wrote:

I just want to make sure I am not missing anything to get my pull request 
accepted. All tests are passing so I figured it was going to be a short time 
after that. Is there anything else that I need to do or a reason like there is 
a freeze on accepting pull requests for the moment? Here is the link to it in 
Travis IO: https://travis-ci.org/Jicaar/flink/builds/386283019





Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Till Rohrmann
Hi,

if the rest.address is different from the jobmanager.rpc.address, then you
should specify that in the flink-conf.yaml and Flink will connect to
rest.address. Only if rest.address is not specified, the system will fall
back to use the jobmanager.rpc.address. Currently, the rest server endpoint
runs in the same JVM as the cluster entrypoint and all JobMasters.

Cheers,
Till

On Thu, Jun 21, 2018 at 8:46 AM Sampath Bhat 
wrote:

> Hello Till
>
> Thanks for clarification. But I've few questions based on your reply.
>
> In non-HA setups we need the jobmanager.rpc.address to derive the hostname
> of the rest server.
> why is there dependency on jobmanager.rpc.address to get the hostname rest
> server? This holds good only for normal deployments such as on bare metal,
> virtual machine where flink cluster runs as another process in a machine.
> But if we try deploy flink on kubernetes then there could be possiblity
> where jobmanager.rpc.address and rest.address different from each other.
>
> So if rest.address is not provided in flink-conf.yaml then looking for
> jobmanager.rpc.address for deriving the hostname of rest server makes
> sense, but when the user has already provided the rest.address but flink
> still looks into jobmanager.rpc.address for getting hostname of rest server
> is an unwanted dependency IMO.
>
> In HA setup the rpc.address is obtained from zookeeper so user need not
> worry about unnecessary properties while submitting job.
>
> On Wed, Jun 20, 2018 at 1:25 PM, Till Rohrmann 
> wrote:
>
> > It will, but it defaults to jobmanager.rpc.address if no rest.address has
> > been specified.
> >
> > On Wed, Jun 20, 2018 at 9:49 AM Chesnay Schepler 
> > wrote:
> >
> >> Shouldn't the non-HA case be covered by rest.address?
> >>
> >> On 20.06.2018 09:40, Till Rohrmann wrote:
> >>
> >> Hi Sampath,
> >>
> >> it is no longer possible to not start the rest server endpoint by
> setting
> >> rest.port to -1. If you do this, then the cluster won't start. The
> comment
> >> in the flink-conf.yaml holds only true for the legacy mode.
> >>
> >> In non-HA setups we need the jobmanager.rpc.address to derive the
> >> hostname of the rest server. The jobmanager.rpc.port is no longer needed
> >> for the client but only for the other cluster components (TMs). When
> using
> >> the HA mode, then every address will be retrieved from ZooKeeper.
> >>
> >> I hope this clarifies things.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Jun 20, 2018 at 9:24 AM Chesnay Schepler 
> >> wrote:
> >>
> >>> I was worried this might be the case.
> >>>
> >>> The rest.port handling was simply copied from the legacy web-server,
> >>> which explicitly allowed shutting it down.
> >>> It may (I'm not entirely sure) also not be necessary for all deployment
> >>> modes; for example if the job is baked into the job/taskmanager images.
> >>>
> >>> I'm not quite sure whether the rpc address is actually required for the
> >>> REST job submission, or only since we still rely partly on some legacy
> >>> code (ClusterClient). Maybe Till (cc) knows the answer to that.
> >>>
> >>> > Adding on to this point you made - " the rpc address is still
> >>> *required *due
> >>> > to some technical implementations; it may be that you can set this to
> >>> some
> >>> > arbitrary value however."
> >>> >
> >>> > For job submission to happen successfully we should give specific rpc
> >>> > address and not any arbitrary value. If any arbitrary value is given
> >>> the
> >>> > job submission fails with the following error -
> >>> > org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't
> >>> > retrieve standalone cluster
> >>> >  at
> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
> >>> retrieve(StandaloneClusterDescriptor.java:51)
> >>> >  at
> >>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
> >>> retrieve(StandaloneClusterDescriptor.java:31)
> >>> >  at
> >>> > org.apache.flink.client.cli.CliFrontend.runProgram(
> >>> CliFrontend.java:249)
> >>> >  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.
> >>> java:210)
> >>> >  at
> >>> > org.apache.flink.client.cli.CliFrontend.parseParameters(
> >>> CliFrontend.java:1020)
> >>> >  at
> >>> > org.apache.flink.client.cli.CliFrontend.lambda$main$9(
> >>> CliFrontend.java:1096)
> >>> >  at
> >>> > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(
> >>> NoOpSecurityContext.java:30)
> >>> >  at
> >>> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
> >>> > Caused by: java.net.UnknownHostException: flinktest-flink-
> >>> jobmanager1233445:
> >>> > Name or service not known
> >>> >   (Random name flinktest-flink-jobmanager1233445)
> >>> >  at java.net.Inet6AddressImpl.lookupAllHostAddr(Native
> Method)
> >>> >  at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.
> >>> java:928)
> >>> >  at
> >>> > 

Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Sampath Bhat
Hello Till

Thanks for clarification. But I've few questions based on your reply.

In non-HA setups we need the jobmanager.rpc.address to derive the hostname
of the rest server.
why is there dependency on jobmanager.rpc.address to get the hostname rest
server? This holds good only for normal deployments such as on bare metal,
virtual machine where flink cluster runs as another process in a machine.
But if we try deploy flink on kubernetes then there could be possiblity
where jobmanager.rpc.address and rest.address different from each other.

So if rest.address is not provided in flink-conf.yaml then looking for
jobmanager.rpc.address for deriving the hostname of rest server makes
sense, but when the user has already provided the rest.address but flink
still looks into jobmanager.rpc.address for getting hostname of rest server
is an unwanted dependency IMO.

In HA setup the rpc.address is obtained from zookeeper so user need not
worry about unnecessary properties while submitting job.

On Wed, Jun 20, 2018 at 1:25 PM, Till Rohrmann  wrote:

> It will, but it defaults to jobmanager.rpc.address if no rest.address has
> been specified.
>
> On Wed, Jun 20, 2018 at 9:49 AM Chesnay Schepler 
> wrote:
>
>> Shouldn't the non-HA case be covered by rest.address?
>>
>> On 20.06.2018 09:40, Till Rohrmann wrote:
>>
>> Hi Sampath,
>>
>> it is no longer possible to not start the rest server endpoint by setting
>> rest.port to -1. If you do this, then the cluster won't start. The comment
>> in the flink-conf.yaml holds only true for the legacy mode.
>>
>> In non-HA setups we need the jobmanager.rpc.address to derive the
>> hostname of the rest server. The jobmanager.rpc.port is no longer needed
>> for the client but only for the other cluster components (TMs). When using
>> the HA mode, then every address will be retrieved from ZooKeeper.
>>
>> I hope this clarifies things.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 20, 2018 at 9:24 AM Chesnay Schepler 
>> wrote:
>>
>>> I was worried this might be the case.
>>>
>>> The rest.port handling was simply copied from the legacy web-server,
>>> which explicitly allowed shutting it down.
>>> It may (I'm not entirely sure) also not be necessary for all deployment
>>> modes; for example if the job is baked into the job/taskmanager images.
>>>
>>> I'm not quite sure whether the rpc address is actually required for the
>>> REST job submission, or only since we still rely partly on some legacy
>>> code (ClusterClient). Maybe Till (cc) knows the answer to that.
>>>
>>> > Adding on to this point you made - " the rpc address is still
>>> *required *due
>>> > to some technical implementations; it may be that you can set this to
>>> some
>>> > arbitrary value however."
>>> >
>>> > For job submission to happen successfully we should give specific rpc
>>> > address and not any arbitrary value. If any arbitrary value is given
>>> the
>>> > job submission fails with the following error -
>>> > org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't
>>> > retrieve standalone cluster
>>> >  at
>>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>>> retrieve(StandaloneClusterDescriptor.java:51)
>>> >  at
>>> > org.apache.flink.client.deployment.StandaloneClusterDescriptor.
>>> retrieve(StandaloneClusterDescriptor.java:31)
>>> >  at
>>> > org.apache.flink.client.cli.CliFrontend.runProgram(
>>> CliFrontend.java:249)
>>> >  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.
>>> java:210)
>>> >  at
>>> > org.apache.flink.client.cli.CliFrontend.parseParameters(
>>> CliFrontend.java:1020)
>>> >  at
>>> > org.apache.flink.client.cli.CliFrontend.lambda$main$9(
>>> CliFrontend.java:1096)
>>> >  at
>>> > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(
>>> NoOpSecurityContext.java:30)
>>> >  at
>>> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
>>> > Caused by: java.net.UnknownHostException: flinktest-flink-
>>> jobmanager1233445:
>>> > Name or service not known
>>> >   (Random name flinktest-flink-jobmanager1233445)
>>> >  at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
>>> >  at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.
>>> java:928)
>>> >  at
>>> > java.net.InetAddress.getAddressesFromNameService(
>>> InetAddress.java:1323)
>>> >  at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
>>> >  at java.net.InetAddress.getAllByName(InetAddress.java:1192)
>>> >  at java.net.InetAddress.getAllByName(InetAddress.java:1126)
>>> >  at java.net.InetAddress.getByName(InetAddress.java:1076)
>>> >  at
>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.
>>> getRpcUrl(AkkaRpcServiceUtils.java:171)
>>> >  at
>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.
>>> getRpcUrl(AkkaRpcServiceUtils.java:136)
>>> >  at
>>> >