Re: [DISCUSS] FLIP-6 - Flink Deployment and Process Model - Standalone, Yarn, Mesos, Kubernetes, etc.

2016-07-29 Thread Wright, Eron
The design looks great - it solves for very diverse deployment modes, allows 
for heterogeneous TMs, and promotes job isolation.

Some feedback:

*Dispatcher*
The dispatcher concept here expands nicely on what was introduced in the Mesos 
design doc (MESOS-1984).  The most significant difference being the job-centric 
orientation of the dispatcher API.   FLIP-6 seems to eliminate the concept of a 
session (or, defines it simply as the lifecycle of a JM); is that correct?
Do you agree I should revise the Mesos dispatcher design to be job-centric? 

I'll be taking the first crack at implementing the dispatcher (for Mesos only) 
in MESOS-1984 (T2).   I’ll keep FLIP-6 in mind as I go.

The dispatcher's backend behavior will vary significantly for Mesos vs 
standalone vs others.   Assumedly a base class with concrete implementations 
will be introduced.  To echo the FLIP-6 design as I understand it:

1) Standalone
   a) The dispatcher process starts an RM, dispatcher frontend, and "local" 
dispatcher backend at startup.
   b) Upon job submission, the local dispatcher backend creates an in-process 
JM actor for the job.
   c) The JM allocates slots as normal.   The RM draws from its pool of 
registered TM, which grows and shrinks due (only) to external events.

2) Mesos
   a) The dispatcher process starts a dispatcher frontend and "Mesos" 
dispatcher backend at startup.
   b) Upon job submission, the Mesos dispatcher backend creates a Mesos task 
(dubbed an "AppMaster") which contains a JM/RM for the job.   
   c) The system otherwise functions as described in the Mesos design doc.

*Client*
I'm concerned about the two code paths that the client uses to launch a job 
(with-dispatcher vs without-dispatcher).   Maybe it could be unified by saying 
that the client always calls the dispatcher, and that the dispatcher is 
hostable in either the client or in a separate process.  The only variance 
would be the client-to-dispatcher transport (local vs HTTP).

*RM*
On the issue of RM statefulness, we can say that the RM does not persist slot 
allocation (the ground truth is in the TM), but may persist other information 
(related to cluster manager interaction).  For example, the Mesos RM persists 
the assigned framework identifier and per-task planning information (as is 
highly recommended by the Mesos development guide).

On RM fencing, I was already wondering whether to add it to the Mesos RM, so it 
is nice to see it being introduced more generally.   My rationale is, the 
dispatcher cannot guarantee that only a single RM is running, because orphaned 
tasks are possible in certain Mesos failure situations.   Similarly, I’m unsure 
whether YARN provides a strong guarantee about the AM.

*User Code*
Having job code on the system classpath seems possible in only a subset of 
cases.   The variability may be complex.   How important is this optimization?

*Security Implications*
It should be noted that the standalone embodiment doesn't offer isolation 
between jobs.  The whole system will have a single security context (as it does 
now).   

Meanwhile, the ‘high-trust’ nature of the dispatcher in other scenarios is 
rightly emphasized.  The fact that user code shouldn't be run in the dispatcher 
process (except in standalone) must be kept in mind.   The design doc of 
FLINK-3929 (section C2) has more detail on that.


-Eron


> On Jul 28, 2016, at 2:22 AM, Maximilian Michels  wrote:
> 
> Hi Stephan,
> 
> Thanks for the nice wrap-up of ideas and discussions we had over the
> last months (not all on the mailing list though because we were just
> getting started with the FLIP process). The document is very
> comprehensive and explains the changes in great details, even up to
> the message passing level.
> 
> What I really like about the FLIP is that we delegate multi-tenancy
> away from the JobManager to the resource management framework and the
> dispatchers. This will help to make the JobManager component cleaner
> and simpler. The prospect of having the user jars directly in the
> system classpath of the workers, instead of dealing with custom class
> loaders, is very nice.
> 
> The model we have for acquiring and releasing resources wouldn't work
> particularly well with all the new deployment options, so +1 on a new
> task slot request/offer system and +1 for making the ResourceManager
> responsible for TaskManager registration and slot management. This is
> well aligned with the initial idea of the ResourceManager component.
> 
> We definitely need good testing for these changes since the
> possibility of bugs increases with the additional number of messages
> introduced.
> 
> The only thing that bugs me is whether we make the Standalone mode a
> bit less nice to use. The initial bootstrapping of the nodes via the
> local dispatchers and the subsequent registration of TaskManagers and
> allocation of slots could cause some delay. It's not a major concern
> though because it will take little time 

Re: Discard out-of-order events

2016-07-29 Thread Maximilian Michels
@Gyula: This is documented in the JavaDoc of the `allowedLateness(..)`
method and in the docs:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#dealing-with-late-data

@Kevin: Thanks for the explanation, I'll get back to you soon (sort of
in a rush).

Cheers,
Max

On Fri, Jul 29, 2016 at 10:57 AM, Gyula Fóra  wrote:
> Hi Max,
>
> So if I understand correctly the window operators now, by default,  discard
> late elements?
> Is this documented somewhere?
>
> Gyula
>
> Maximilian Michels  ezt írta (időpont: 2016. júl. 29., P,
> 10:40):
>
>> Hi!
>>
>> I'm not sure whether I understand your question. The purpose of Event
>> Time is to be able to process out-of-order events. Do you want to
>> discard late elements? In the upcoming Flink 1.1.0 you can set the
>> `allowedLateness` on a windowed stream. The default is 0, so late
>> elements are discarded; late elements are elements which arrive after
>> the Watermark has reached the operator.
>>
>> Cheers,
>> Max
>>
>> On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobs 
>> wrote:
>> > Is it possible to discard events that are out-of-order (in terms of event
>> > time)?
>> >
>>


Re: [VOTE] Release Apache Flink 1.1.0 (RC1)

2016-07-29 Thread Aljoscha Krettek
When running "mvn clean verify" with Hadoop version 2.6.1 the
Zookeeper/Leader Election tests fail with this:

java.lang.NoSuchMethodError:
org.apache.curator.utils.PathUtils.validatePath(Ljava/lang/String;)Ljava/lang/String;
at
org.apache.curator.framework.imps.NamespaceImpl.(NamespaceImpl.java:37)
at
org.apache.curator.framework.imps.CuratorFrameworkImpl.(CuratorFrameworkImpl.java:113)
at
org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:124)
at
org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:101)
at
org.apache.flink.runtime.util.ZooKeeperUtils.createLeaderRetrievalService(ZooKeeperUtils.java:143)
at
org.apache.flink.runtime.util.LeaderRetrievalUtils.createLeaderRetrievalService(LeaderRetrievalUtils.java:70)
at
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderRetrievalTest.testTimeoutOfFindConnectingAddress(ZooKeeperLeaderRetrievalTest.java:187)

I'll continue testing other parts and other Hadoop versions.

On Wed, 27 Jul 2016 at 11:51 Ufuk Celebi  wrote:

> Dear Flink community,
>
> Please vote on releasing the following candidate as Apache Flink version
> 1.1.0.
>
> I've CC'd u...@flink.apache.org as users are encouraged to help
> testing Flink 1.1.0 for their specific use cases. Please feel free to
> report issues and successful tests on dev@flink.apache.org.
>
> The commit to be voted on:
> 3a18463 (http://git-wip-us.apache.org/repos/asf/flink/commit/3a18463)
>
> Branch:
> release-1.1.0-rc1
> (
> https://git1-us-west.apache.org/repos/asf/flink/repo?p=flink.git;a=shortlog;h=refs/heads/release-1.1.0-rc1
> )
>
> The release artifacts to be voted on can be found at:
> http://people.apache.org/~uce/flink-1.1.0-rc1/
>
> The release artifacts are signed with the key with fingerprint 9D403309:
> http://www.apache.org/dist/flink/KEYS
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapacheflink-1098
>
> There is also a Google doc to coordinate the testing efforts. This is
> a copy of the release document found in our Wiki:
>
> https://docs.google.com/document/d/1cDZGtnGJKLU1fLw8AE_FzkoDLOR8amYT2oc3mD0_lw4/edit?usp=sharing
>
> -
>
> Thanks to everyone who contributed to this release candidate.
>
> The vote is open for the next 3 days (not counting the weekend) and
> passes if a majority of at least three +1 PMC votes are cast.
>
> The vote ends on Monday August 1st, 2016.
>
> [ ] +1 Release this package as Apache Flink 1.1.0
> [ ] -1 Do not release this package, because ...
>


Re: FlinkKafkaConsumer09

2016-07-29 Thread Maximilian Michels
Thanks!

On Fri, Jul 29, 2016 at 11:43 AM, Gordon Tai (戴資力)  wrote:
> Hi Max,
>
> Sure, I was planning to do so, but wanted to see if it was a reasonable
> feature to add before opening a JIRA :)
> Here's the new JIRA: https://issues.apache.org/jira/browse/FLINK-4280
>
> Regards,
> Gordon
>
> On Fri, Jul 29, 2016 at 4:03 PM, Maximilian Michels  wrote:
>
>> Hi Tai,
>>
>> Should definitely be possible. Would you mind opening a JIRA issue
>> with the description you posted?
>>
>> Thanks,
>> Max
>>
>> On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon  wrote:
>> > Hi Kevin,
>> >
>> > Just a re-clarification: for Kafka 0.9 it would be “earliest”, &
>> “smallest”
>> > for the older Kafka 0.8.
>> >
>> > I’m wondering whether or not it is reasonable to add a Flink-specific way
>> > to set the consumer’s starting position to “earliest” and “latest”,
>> without
>> > respecting the external Kafka offset store. Perhaps we can change the
>> > current behaviour (checking committed offsets in Kafka as starting point)
>> > as a user option, and add new options to read from “earliest” and
>> “latest”
>> > regardless of the groupId and externally committed offsets. I think this
>> > better matches how users usually interpret the functionality of setting
>> > starting positions, while also keeping the “auto.offset.reset” behaviour
>> > that frequent Kafka users are used to. Also, this would also more clearly
>> > define that under the context of Flink, the external Kafka offset store
>> is
>> > used only to expose the consumers progress to the outside world, and not
>> > used to manipulate how topics are read.
>> >
>> > Just an idea I have in mind, not sure if it would be a reasonable add.
>> It’d
>> > be great to hear what other think of this.
>> >
>> > Regards,
>> > Gordon
>> >
>> >
>> > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch)
>> wrote:
>> >
>> > Thank you Gordon and Max,
>> >
>> > Thank you Gordon, that explains the behaviour a bit better to me. I am
>> > now adding the timestamp to the group ID and that is a good workaround
>> > for now. The "smallest" option is unfortunately not available in this
>> > version of the FlinkKafkaConsumer class.
>> >
>> > Cheers,
>> > Kevin
>> >
>> >
>> > On 28.07.2016 10:39, Maximilian Michels wrote:
>> >> Hi Kevin,
>> >>
>> >> You need to use properties.setProperty("auto.offset.reset",
>> >> "smallest") for Kafka 9 to start from the smallest offset. Note, that
>> >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
>> >> "earliest") to achieve the same behavior.
>> >>
>> >> Kafka keeps track of the offsets per group id. If you have already
>> >> read from a topic with a certain group id and want to restart from the
>> >> smallest offset available, you need to generate a unique group id.
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs 
>> > wrote:
>> >>> Hi,
>> >>>
>> >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
>> > class. I
>> >>> am using Flink 1.0.3.
>> >>>
>> >>> These are my properties:
>> >>>
>> >>> val properties = new Properties()
>> >>> properties.setProperty("bootstrap.servers", config.urlKafka)
>> >>> properties.setProperty("group.id", COLLECTOR_NAME)
>> >>> properties.setProperty("auto.offset.reset", *"earliest"*)
>> >>>
>> >>> According to the new consumer API of Kafka, this should result in the
>> >>> following:
>> >>>
>> >>> /auto.offset.reset: * smallest : automatically reset the offset to the
>> >>> smallest offset/ (source:
>> >>> https://kafka.apache.org/documentation.html#newconsumerapi)
>> >>>
>> >>> However, it starts from the latest item in my topic. Is this a bug or
>> am
>> > I
>> >>> doing something wrong?
>> >>>
>> >>> Regards,
>> >>> Kevin
>> >>>
>>
>
>
>
> --
> Tzu-Li (Gordon) Tai


[jira] [Created] (FLINK-4284) DataSet/CEP link to non-existant "Linking with Flink" section

2016-07-29 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4284:
---

 Summary: DataSet/CEP link to non-existant "Linking with Flink" 
section
 Key: FLINK-4284
 URL: https://issues.apache.org/jira/browse/FLINK-4284
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.1.0
Reporter: Chesnay Schepler


Relevant section for DataSet (apis/batch/index.md; L 57-60):

{code}
The following program is a complete, working example of WordCount. You can copy 
 paste the code
to run it locally. You only have to include the correct Flink's library into 
your project
(see Section [Linking with Flink](#linking-with-flink)) and specify the 
imports. Then you are ready
to go!
{code}

Relevant section for CEP(apis/streaming/libs/cep.md; L 45-48:

{code}
## Getting Started

If you want to jump right in, you have to [set up a Flink program]({{ 
site.baseurl }}/apis/batch/index.html#linking-with-flink).
Next, you have to add the FlinkCEP dependency to the `pom.xml` of your project.
{code}

The CEP doc probably shouldn't refer to the DataSet documentation at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4283) ExecutionGraphRestartTest fails

2016-07-29 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4283:
---

 Summary: ExecutionGraphRestartTest fails
 Key: FLINK-4283
 URL: https://issues.apache.org/jira/browse/FLINK-4283
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.1.0
 Environment: Ubuntu 14.04
W10
Reporter: Chesnay Schepler


I encounter reliable failures for the following tests:

testRestartAutomatically(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest)
  Time elapsed: 120.089 sec  <<< FAILURE!
java.lang.AssertionError: expected: but was:
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:144)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testRestartAutomatically(ExecutionGraphRestartTest.java:155)

taskShouldNotFailWhenFailureRateLimitWasNotExceeded(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest)
  Time elapsed: 2.055 sec  <<< FAILURE!
java.lang.AssertionError: expected: but was:
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:144)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.taskShouldNotFailWhenFailureRateLimitWasNotExceeded(ExecutionGraphRestartTest.java:180)

testFailingExecutionAfterRestart(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest)
  Time elapsed: 120.079 sec  <<< FAILURE!
java.lang.AssertionError: expected: but was:
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:144)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testFailingExecutionAfterRestart(ExecutionGraphRestartTest.java:397)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4282) Add Offset Parameter to WindowAssigners

2016-07-29 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-4282:
---

 Summary: Add Offset Parameter to WindowAssigners
 Key: FLINK-4282
 URL: https://issues.apache.org/jira/browse/FLINK-4282
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Aljoscha Krettek


Currently, windows are always aligned to EPOCH, which basically means days are 
aligned with GMT. This is somewhat problematic for people living in different 
timezones.

And offset parameter would allow to adapt the window assigner to the timezone.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink

2016-07-29 Thread Aljoscha Krettek
About processing time and timestamps:

The timestamp is either set in the source of in an
in-between TimestampAssigner that can be used with
DataStream.assignTimestampsAndWatermarks(). However, the timestamp in the
element is normally not a "processing-time timestamp". I think it might
make sense to split the functionality for the evictors into two parts: one
that implicitly sets a timestamp and one that uses these timestamps. It
could look like this:

DataStream input = ...
// this makes the current processing time explicit in the tuples:
DataStream> withTimestamps = input.map(new
ReifyProcessingTIme());
withTimestamps
  .keyBy(...)
  .window(..)
  .evictor(new ProcessingTimeEvictor())
  .apply(...)

where ProcessingTimeEvictor looks like this:

class ProcessingTimeEvictor extends Evictor> {
  void evictBefore(Iterable>, ...);
  void evictAfter ...
}

This would make everything that is happening explicit in the type
signatures and explicit for the user.

Cheers,
Aljoscha

On Thu, 28 Jul 2016 at 18:32 Aljoscha Krettek  wrote:

> Hi,
> in fact, changing it to Iterable would simplify things because then we
> would not have to duplicate code for the EvictingWindowOperator any more.
> It could be a very thin subclass of WindowOperator.
>
> Cheers,
> Aljoscha
>
> On Wed, 27 Jul 2016 at 03:56 Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
>> Hi Aljoscha,
>>
>> Regarding your concern - to not  expose the StreamRecord in the Evictor,
>> were you able to find any alternative?
>>
>> I tried to make the methods take Iterable input similar to the
>> WindowFunction, but that didn't work since we have to clear the state and
>> add the elements back to the state (to fix the bug mentioned in the
>> previous mail)
>>
>> If you think the interface that accepts Iterable
>> elements is
>> good enough, I have the changes ready.
>>
>> Thanks,
>> Vishnu
>>
>> On Mon, Jul 25, 2016 at 7:48 AM, Aljoscha Krettek 
>> wrote:
>>
>> > Hi,
>> > the elements are currently not being removed from the buffers. That's a
>> bug
>> > that we could fix while adding the new Evictor interface.
>> >
>> > Cheers,
>> > Aljoscha
>> >
>> > On Mon, 25 Jul 2016 at 13:00 Radu Tudoran 
>> wrote:
>> >
>> > > Hi Aljoscha,
>> > >
>> > > Can you point us to the way it is handled now. Is there anything else
>> for
>> > > the removing of elements other than the skip in
>> EvictingWindowOperator.
>> > Is
>> > > there something as it was before version 1.x where you had an explicit
>> > > remove from window buffers?
>> > >
>> > > Dr. Radu Tudoran
>> > > Research Engineer - Big Data Expert
>> > > IT R Division
>> > >
>> > >
>> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > European Research Center
>> > > Riesstrasse 25, 80992 München
>> > >
>> > > E-mail: radu.tudo...@huawei.com
>> > > Mobile: +49 15209084330
>> > > Telephone: +49 891588344173
>> > >
>> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>> > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>> > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>> > > This e-mail and its attachments contain confidential information from
>> > > HUAWEI, which is intended only for the person or entity whose address
>> is
>> > > listed above. Any use of the information contained herein in any way
>> > > (including, but not limited to, total or partial disclosure,
>> > reproduction,
>> > > or dissemination) by persons other than the intended recipient(s) is
>> > > prohibited. If you receive this e-mail in error, please notify the
>> sender
>> > > by phone or email immediately and delete it!
>> > >
>> > >
>> > > -Original Message-
>> > > From: Aljoscha Krettek [mailto:aljos...@apache.org]
>> > > Sent: Monday, July 25, 2016 11:45 AM
>> > > To: dev@flink.apache.org
>> > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink
>> > >
>> > > Hi,
>> > > I think there is not yet a clear specification for how the actual
>> removal
>> > > of elements from the buffer will work. I think naively one can do:
>> > >
>> > > Iterable currentElements = state.get()
>> > > evictor.evict(currentElements); // this will remove some stuff from
>> > there,
>> > > or mark for removal
>> > >
>> > > state.clear()
>> > > // the Iterable does not loop over the removed/marked elements
>> > > for (E element : currentElements) {
>> > >   state.add(element)
>> > > }
>> > >
>> > > This is very costly but the only way I see of doing this right now
>> with
>> > > every state backend.
>> > >
>> > > Cheers,
>> > > Aljoscha
>> > >
>> > > On Mon, 25 Jul 2016 at 09:46 Radu Tudoran 
>> > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > Thanks for the clarification. Can 

[jira] [Created] (FLINK-4281) Wrap all Calcite Exceptions in Flink Exceptions

2016-07-29 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4281:
---

 Summary: Wrap all Calcite Exceptions in Flink Exceptions
 Key: FLINK-4281
 URL: https://issues.apache.org/jira/browse/FLINK-4281
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Timo Walther


Some exceptions are already wrapped in Flink exceptions but there are still 
exceptions thrown by Calcite. I would propose that all Exceptions thrown by the 
Table API are Flink's Exceptions, esp. the FlinkPlannerImpl exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: FlinkKafkaConsumer09

2016-07-29 Thread 戴資力
Hi Max,

Sure, I was planning to do so, but wanted to see if it was a reasonable
feature to add before opening a JIRA :)
Here's the new JIRA: https://issues.apache.org/jira/browse/FLINK-4280

Regards,
Gordon

On Fri, Jul 29, 2016 at 4:03 PM, Maximilian Michels  wrote:

> Hi Tai,
>
> Should definitely be possible. Would you mind opening a JIRA issue
> with the description you posted?
>
> Thanks,
> Max
>
> On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon  wrote:
> > Hi Kevin,
> >
> > Just a re-clarification: for Kafka 0.9 it would be “earliest”, &
> “smallest”
> > for the older Kafka 0.8.
> >
> > I’m wondering whether or not it is reasonable to add a Flink-specific way
> > to set the consumer’s starting position to “earliest” and “latest”,
> without
> > respecting the external Kafka offset store. Perhaps we can change the
> > current behaviour (checking committed offsets in Kafka as starting point)
> > as a user option, and add new options to read from “earliest” and
> “latest”
> > regardless of the groupId and externally committed offsets. I think this
> > better matches how users usually interpret the functionality of setting
> > starting positions, while also keeping the “auto.offset.reset” behaviour
> > that frequent Kafka users are used to. Also, this would also more clearly
> > define that under the context of Flink, the external Kafka offset store
> is
> > used only to expose the consumers progress to the outside world, and not
> > used to manipulate how topics are read.
> >
> > Just an idea I have in mind, not sure if it would be a reasonable add.
> It’d
> > be great to hear what other think of this.
> >
> > Regards,
> > Gordon
> >
> >
> > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch)
> wrote:
> >
> > Thank you Gordon and Max,
> >
> > Thank you Gordon, that explains the behaviour a bit better to me. I am
> > now adding the timestamp to the group ID and that is a good workaround
> > for now. The "smallest" option is unfortunately not available in this
> > version of the FlinkKafkaConsumer class.
> >
> > Cheers,
> > Kevin
> >
> >
> > On 28.07.2016 10:39, Maximilian Michels wrote:
> >> Hi Kevin,
> >>
> >> You need to use properties.setProperty("auto.offset.reset",
> >> "smallest") for Kafka 9 to start from the smallest offset. Note, that
> >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
> >> "earliest") to achieve the same behavior.
> >>
> >> Kafka keeps track of the offsets per group id. If you have already
> >> read from a topic with a certain group id and want to restart from the
> >> smallest offset available, you need to generate a unique group id.
> >>
> >> Cheers,
> >> Max
> >>
> >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs 
> > wrote:
> >>> Hi,
> >>>
> >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
> > class. I
> >>> am using Flink 1.0.3.
> >>>
> >>> These are my properties:
> >>>
> >>> val properties = new Properties()
> >>> properties.setProperty("bootstrap.servers", config.urlKafka)
> >>> properties.setProperty("group.id", COLLECTOR_NAME)
> >>> properties.setProperty("auto.offset.reset", *"earliest"*)
> >>>
> >>> According to the new consumer API of Kafka, this should result in the
> >>> following:
> >>>
> >>> /auto.offset.reset: * smallest : automatically reset the offset to the
> >>> smallest offset/ (source:
> >>> https://kafka.apache.org/documentation.html#newconsumerapi)
> >>>
> >>> However, it starts from the latest item in my topic. Is this a bug or
> am
> > I
> >>> doing something wrong?
> >>>
> >>> Regards,
> >>> Kevin
> >>>
>



-- 
Tzu-Li (Gordon) Tai


[jira] [Created] (FLINK-4279) [py] Set flink dependencies to provided

2016-07-29 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4279:
---

 Summary: [py] Set flink dependencies to provided
 Key: FLINK-4279
 URL: https://issues.apache.org/jira/browse/FLINK-4279
 Project: Flink
  Issue Type: Improvement
  Components: Python API
Affects Versions: 1.1.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Discard out-of-order events

2016-07-29 Thread Kevin Jacobs

Goodmorning :-),

Thank you for your answer. Let me explain my problem more thoroughly 
(maybe other options are possible here, not necessary with allowedLateness).


The most compact description of my problem would be Stream Enrichment. 
More concrete, suppose I have two streams, where I want to enrich one 
stream with another stream:


val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", 
"a", "c", "b", "a", "c")


val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is 
B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A"))



The goal here, is to mix information from the infoStream into the 
mainStream so the enriched stream would contain the following tuples:


("a", "Whoops, it is A")
("a", "Whoops, it is A")
("b", "It is B")
("a", "Whoops, it is A")
("a", "Whoops, it is A")
("b", "It is B")
("b", "It is B")
("a", "Whoops, it is A")
("c", "It is C")
("b", "It is B")
("a", "Whoops, it is A")
("c", "It is C")


It is not a requirement that the enriched stream has the same ordering 
on the elements as the mainStream. However, it is important that new 
elements in the infoStream override older elements from the infoStream. 
You can see here that (4, "a", "Whoops, it is A") arrived later than (1, 
"a", "It is F") (if you look at the event time, which is the first 
element of every tuple). So, the infoStream (at t=4) should contain the 
following tuples:


(4, "a", "Whoops, it is A")
(2, "b", "It is B")
(3, "c", "It is C")


So, what I thought of, is iterating over the infoStream can keep 
relevant records. Then coGroup it with the mainStream to enrich it. This 
approach works fine:


val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", 
"a", "c", "b", "a", "c")
val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is 
B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A"))

.keyBy(1)
.iterate(iteration => {
val filtered = iteration.keyBy(1).maxBy(0)
(iteration, filtered)
})

mainStream
.coGroup(infoStream)
.where[String]((x: String) => x)
.equalTo(_._2)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(50))) {
(first: Iterator[String], second: Iterator[(Int, String, 
String)], out: Collector[(String, String)]) => {
val records = scala.collection.mutable.MutableList[(Int, 
String, String)]()

for ((record: (Int, String, String)) <- second) {
records += record
}

for ((key: String) <- first) {
var bestDescription = "?"
for (record <- records) {
if (record._2 == key) {
bestDescription = record._3
}
}
out.collect((key, bestDescription))
}
}
}
.print()


My questions for you:
- Can I make this more efficient?
- Is there a way of mixing datasets and datastreams? That would be 
really awesome (for at least this use case).
- Is there a way to ensure checkpoints, since I am using an iterative 
stream here?
- Can I get rid of the TumblingProcessingTimeWindows? Because in fact, 
all of this can be done by Apache Spark. It would be great if Apache 
Flink could archieve a higher throughput rate than Apache Spark in this 
use case.


I am curious to your answers!

Cheers,
Kevin


On 29.07.2016 10:40, Maximilian Michels wrote:

Hi!

I'm not sure whether I understand your question. The purpose of Event
Time is to be able to process out-of-order events. Do you want to
discard late elements? In the upcoming Flink 1.1.0 you can set the
`allowedLateness` on a windowed stream. The default is 0, so late
elements are discarded; late elements are elements which arrive after
the Watermark has reached the operator.

Cheers,
Max

On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobs  wrote:

Is it possible to discard events that are out-of-order (in terms of event
time)?





Re: Discard out-of-order events

2016-07-29 Thread Gyula Fóra
Hi Max,

So if I understand correctly the window operators now, by default,  discard
late elements?
Is this documented somewhere?

Gyula

Maximilian Michels  ezt írta (időpont: 2016. júl. 29., P,
10:40):

> Hi!
>
> I'm not sure whether I understand your question. The purpose of Event
> Time is to be able to process out-of-order events. Do you want to
> discard late elements? In the upcoming Flink 1.1.0 you can set the
> `allowedLateness` on a windowed stream. The default is 0, so late
> elements are discarded; late elements are elements which arrive after
> the Watermark has reached the operator.
>
> Cheers,
> Max
>
> On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobs 
> wrote:
> > Is it possible to discard events that are out-of-order (in terms of event
> > time)?
> >
>


Re: Discard out-of-order events

2016-07-29 Thread Maximilian Michels
Hi!

I'm not sure whether I understand your question. The purpose of Event
Time is to be able to process out-of-order events. Do you want to
discard late elements? In the upcoming Flink 1.1.0 you can set the
`allowedLateness` on a windowed stream. The default is 0, so late
elements are discarded; late elements are elements which arrive after
the Watermark has reached the operator.

Cheers,
Max

On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobs  wrote:
> Is it possible to discard events that are out-of-order (in terms of event
> time)?
>


Re: FlinkKafkaConsumer09

2016-07-29 Thread Maximilian Michels
Hi Tai,

Should definitely be possible. Would you mind opening a JIRA issue
with the description you posted?

Thanks,
Max

On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon  wrote:
> Hi Kevin,
>
> Just a re-clarification: for Kafka 0.9 it would be “earliest”, & “smallest”
> for the older Kafka 0.8.
>
> I’m wondering whether or not it is reasonable to add a Flink-specific way
> to set the consumer’s starting position to “earliest” and “latest”, without
> respecting the external Kafka offset store. Perhaps we can change the
> current behaviour (checking committed offsets in Kafka as starting point)
> as a user option, and add new options to read from “earliest” and “latest”
> regardless of the groupId and externally committed offsets. I think this
> better matches how users usually interpret the functionality of setting
> starting positions, while also keeping the “auto.offset.reset” behaviour
> that frequent Kafka users are used to. Also, this would also more clearly
> define that under the context of Flink, the external Kafka offset store is
> used only to expose the consumers progress to the outside world, and not
> used to manipulate how topics are read.
>
> Just an idea I have in mind, not sure if it would be a reasonable add. It’d
> be great to hear what other think of this.
>
> Regards,
> Gordon
>
>
> On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch) wrote:
>
> Thank you Gordon and Max,
>
> Thank you Gordon, that explains the behaviour a bit better to me. I am
> now adding the timestamp to the group ID and that is a good workaround
> for now. The "smallest" option is unfortunately not available in this
> version of the FlinkKafkaConsumer class.
>
> Cheers,
> Kevin
>
>
> On 28.07.2016 10:39, Maximilian Michels wrote:
>> Hi Kevin,
>>
>> You need to use properties.setProperty("auto.offset.reset",
>> "smallest") for Kafka 9 to start from the smallest offset. Note, that
>> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
>> "earliest") to achieve the same behavior.
>>
>> Kafka keeps track of the offsets per group id. If you have already
>> read from a topic with a certain group id and want to restart from the
>> smallest offset available, you need to generate a unique group id.
>>
>> Cheers,
>> Max
>>
>> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs 
> wrote:
>>> Hi,
>>>
>>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
> class. I
>>> am using Flink 1.0.3.
>>>
>>> These are my properties:
>>>
>>> val properties = new Properties()
>>> properties.setProperty("bootstrap.servers", config.urlKafka)
>>> properties.setProperty("group.id", COLLECTOR_NAME)
>>> properties.setProperty("auto.offset.reset", *"earliest"*)
>>>
>>> According to the new consumer API of Kafka, this should result in the
>>> following:
>>>
>>> /auto.offset.reset: * smallest : automatically reset the offset to the
>>> smallest offset/ (source:
>>> https://kafka.apache.org/documentation.html#newconsumerapi)
>>>
>>> However, it starts from the latest item in my topic. Is this a bug or am
> I
>>> doing something wrong?
>>>
>>> Regards,
>>> Kevin
>>>