Re: ClassNotFoundException : org.apache.flink.api.common.operators.util.UserCodeObjectWrapper, while trying to run locally

2015-09-07 Thread Stephan Ewen
Effectively, the method with the blob-classloader / parent classloader is
exactly intended to do that: Start with the app class path and add some
jars in addition, if you need to.

On Mon, Sep 7, 2015 at 1:05 PM, Stephan Ewen <se...@apache.org> wrote:

> The BlobClassloader uses the App classloader as the parent. That should
> make the classloading go first to the app classpath (TaskManager's
> classpath) and then to the user-defined code.
> Is that broken somehow?
>
>
>
> On Sun, Sep 6, 2015 at 1:40 PM, gkrastev <joro.kr...@gmail.com> wrote:
>
>> Hi,I'm facing the exact same issue when running a custom Flink
>> interpreter in
>> Zeppelin, so it's a case of special classloader. I'm adding the Flink jars
>> to the Scala REPL's classpath manually, so I end up with the following
>> classloader chain:1. Boot classloader2. App classloader3. Scala
>> classloader
>> -> has Flink on the classpath4. Translating classloader for the Scala
>> REPLSo
>> the class is accessible in the vanilla Zeppelin shell. E.g. the following
>> code works fine:
>>
>> println(classOf[org.apache.flink.api.common.operators.util.UserCodeObjectWrapper[String]])
>> However, the classloader chain in the Task Manager looks differently:1.
>> Boot
>> classloader2. App classloader3. Flink's Blob classloaderAnd the following
>> code fails:
>> import org.apache.flink.api.scala._env.fromCollection(1 to 100).reduce {
>> _ +
>> _ }.collect()
>> because the Flink jars are no longer on the classpath. The question is,
>> why
>> doesn't Flink pickup the current classloader/classpath when starting Task
>> Managers, and can I add entries to their classpath manually?
>> Robert Metzger wrote
>> > Can you give us some context what you are trying to do?It sounds a lot
>> > like you would like to develop a Flink Plugin for Eclipse?And that
>> plugin
>> > needs Flink dependencies?Sorry for asking so many questions, but the
>> issue
>> > you are facing sounds alot like a) a very special classloader or b)
>> > corrupt files.I think some more info on what you are doing might help us
>> > to understandthe issue to give you better help.
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ClassNotFoundException-org-apache-flink-api-common-operators-util-UserCodeObjectWrapper-while-tryingy-tp5922p7870.html
>> Sent from the Apache Flink Mailing List archive. mailing list archive at
>> Nabble.com.
>>
>
>


Re: ClassNotFoundException : org.apache.flink.api.common.operators.util.UserCodeObjectWrapper, while trying to run locally

2015-09-07 Thread Stephan Ewen
The BlobClassloader uses the App classloader as the parent. That should
make the classloading go first to the app classpath (TaskManager's
classpath) and then to the user-defined code.
Is that broken somehow?



On Sun, Sep 6, 2015 at 1:40 PM, gkrastev  wrote:

> Hi,I'm facing the exact same issue when running a custom Flink interpreter
> in
> Zeppelin, so it's a case of special classloader. I'm adding the Flink jars
> to the Scala REPL's classpath manually, so I end up with the following
> classloader chain:1. Boot classloader2. App classloader3. Scala classloader
> -> has Flink on the classpath4. Translating classloader for the Scala
> REPLSo
> the class is accessible in the vanilla Zeppelin shell. E.g. the following
> code works fine:
>
> println(classOf[org.apache.flink.api.common.operators.util.UserCodeObjectWrapper[String]])
> However, the classloader chain in the Task Manager looks differently:1.
> Boot
> classloader2. App classloader3. Flink's Blob classloaderAnd the following
> code fails:
> import org.apache.flink.api.scala._env.fromCollection(1 to 100).reduce { _
> +
> _ }.collect()
> because the Flink jars are no longer on the classpath. The question is, why
> doesn't Flink pickup the current classloader/classpath when starting Task
> Managers, and can I add entries to their classpath manually?
> Robert Metzger wrote
> > Can you give us some context what you are trying to do?It sounds a lot
> > like you would like to develop a Flink Plugin for Eclipse?And that plugin
> > needs Flink dependencies?Sorry for asking so many questions, but the
> issue
> > you are facing sounds alot like a) a very special classloader or b)
> > corrupt files.I think some more info on what you are doing might help us
> > to understandthe issue to give you better help.
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ClassNotFoundException-org-apache-flink-api-common-operators-util-UserCodeObjectWrapper-while-tryingy-tp5922p7870.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>


Re: Configuring UDFs with user defined parameters

2015-09-07 Thread Stephan Ewen
The JobConfig is a system level config. Would be nice to not expose them to
the user-level unless necessary.

What about using the ExecutionConfig, where you can add shared user-level
parameters?

On Mon, Sep 7, 2015 at 1:39 PM, Matthias J. Sax  wrote:

> Thanks for the input.
>
> However, I doubt that a member variable approach is feasible, because
> when the Storm topology is translated into a Flink program (in
> `FlinkBuilder.createTopology()`) the Storm configuration is not
> available yet. And adding the configuration later to each operator would
> be cumbersome.
>
> If there are no better ideas, I guess the current usage of
> JobConfiguration is the best way to handle it (because extending
> TaskConfiguration seems to be no option)
>
> -Matthias
>
> On 09/06/2015 10:51 PM, Aljoscha Krettek wrote:
> > Hi,
> > I think the possibility to use a Configuration object is a legacy from
> the
> > past where the API was a bit closer to how Hadoop works. In my opinion
> this
> > is not necessary anymore since User Code objects can just contain
> > configuration settings in fields.
> >
> > The feature for the Storm API could probably be implemented by just
> storing
> > a Configuration object in the user code function.
> >
> > Regards,
> > Aljoscha
> >
> > On Sun, 6 Sep 2015 at 18:29 Matthias J. Sax  wrote:
> >
> >> Hi,
> >>
> >> I observed, that DataSet API offers a nice way to configure
> >> UDF-Operators by providing the method ".withParameters()". However,
> >> Streaming API does not offer such a method.
> >>
> >> For a current PR (https://github.com/apache/flink/pull/1046) this
> >> feature would be very helpful.
> >>
> >> As a workaround, PR #1046 can also be finished using JobConfiguration.
> >> However, this seems to be somewhat unnatural. Furthermore, I think that
> >> this feature would be nice to have in general. What do you think about
> it?
> >>
> >> If we introduce this feature, we can either open a new JIRA of just
> >> include it into the current PR #1046. What would be the better way?
> >>
> >>
> >> -Matthias
> >>
> >>
> >
>
>


Re: Failing Test: KafkaITCase and KafkaProducerITCase

2015-09-07 Thread Stephan Ewen
I have a patch pending that should help with these timeout issues (and null
checks)...

On Mon, Sep 7, 2015 at 2:41 PM, Matthias J. Sax  wrote:

> Please lock here:
>
> https://travis-ci.org/apache/flink/jobs/79086396
>
> > Failed tests:
> > KafkaITCase>KafkaTestBase.prepare:155 Test setup failed: Unable to
> connect to zookeeper server within timeout: 6000
> > KafkaProducerITCase>KafkaTestBase.prepare:155 Test setup failed: Unable
> to connect to zookeeper server within timeout: 6000
> >
> > Tests in error:
> > KafkaITCase>KafkaTestBase.shutDownServices:196 » NullPointer
> > KafkaProducerITCase>KafkaTestBase.shutDownServices:196 » NullPointer
>
> I did not find any JIRA for it.
>
>
> -Matthias
>
>


Releasing 0.10.0-milestone1

2015-09-08 Thread Stephan Ewen
Hi all!

Some day back we talked about releasing an 0.10.0-milestone1 release. The
master has advanced quite a bit (especially due to high-availability code).

I cherry picked the important additions to the release-0.10.0-milestone1
branch (fixes and Kafka consumer/producer rework).

How about releasing the branch now as an intermediate version for people to
try out while we stabilize the windows and HA code for the 0.10 release?

Greetings,
Stephan


Re: Releasing 0.10.0-milestone1

2015-09-08 Thread Stephan Ewen
Great!

I'd like to push one more commit later today.
A fix for https://issues.apache.org/jira/browse/FLINK-2632 would also be
highly appreciated by some users.

Anyone volunteering as release manager (for creating release candidates and
uploading them)?


On Tue, Sep 8, 2015 at 6:11 PM, Kostas Tzoumas <ktzou...@apache.org> wrote:

> +1 for a milestone release
>
> On Tue, Sep 8, 2015 at 5:43 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
> > +1 for a "milestone1" release. We have a lot of good features in master
> > that people can benefit from.
> >
> > On Tue, Sep 8, 2015 at 5:10 PM, Maximilian Michels <m...@apache.org>
> wrote:
> >
> > > +1 for releasing a milestone release soon to encourage people to try
> > > out the new features.
> > >
> > > There is this bug: https://issues.apache.org/jira/browse/FLINK-2632
> > > which affects the Web Client's error and results display for jobs.
> > > Would be nice to fix it but IMHO it is not critical for the milestone
> > > release.
> > >
> > > On Tue, Sep 8, 2015 at 1:00 PM, Ufuk Celebi <u...@apache.org> wrote:
> > > >
> > > >> On 08 Sep 2015, at 12:01, Stephan Ewen <se...@apache.org> wrote:
> > > >>
> > > >> Hi all!
> > > >>
> > > >> Some day back we talked about releasing an 0.10.0-milestone1
> release.
> > > The
> > > >> master has advanced quite a bit (especially due to high-availability
> > > code).
> > > >>
> > > >> I cherry picked the important additions to the
> > release-0.10.0-milestone1
> > > >> branch (fixes and Kafka consumer/producer rework).
> > > >>
> > > >> How about releasing the branch now as an intermediate version for
> > > people to
> > > >> try out while we stabilize the windows and HA code for the 0.10
> > release?
> > > >
> > > > +1
> > > >
> > > > Thanks for cping the important changes. I’ve checked and there is
> > > nothing I would add at this point.
> > > >
> > > > Can anybody else took a look at it? Other than that, I think it’s
> good
> > > to go.
> > > >
> > > > – Ufuk
> > > >
> > >
> >
>


Re: Streaming KV store abstraction

2015-09-08 Thread Stephan Ewen
@Gyula

Can you explain a bit what this KeyValue store would do more then the
partitioned key/value state?

On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay  wrote:

> Hello,
>
> As for use cases, in my old job at Ericsson we were building a
> streaming system that was processing data from telephone networks, and
> it was using key-value stores a LOT. For example, keeping track of
> various state info of the users (which cell are they currently
> connected to, what bearers do they have, ...); mapping from IDs of
> users in one subsystem of the telephone network to the IDs of the same
> users in an other subsystem; mapping from IDs of phone calls to lists
> of IDs of participating users; etc.
> So I imagine they would like this a lot. (At least, if they were
> considering moving to Flink :))
>
> Best,
> Gabor
>
>
>
>
> 2015-09-08 13:35 GMT+02:00 Gyula Fóra :
> > Hey All,
> >
> > The last couple of days I have been playing around with the idea of
> > building a streaming key-value store abstraction using stateful streaming
> > operators that can be used within Flink Streaming programs seamlessly.
> >
> > Operations executed on this KV store would be fault tolerant as it
> > integrates with the checkpointing mechanism, and if we add timestamps to
> > each put/get/... operation we can use the watermarks to create fully
> > deterministic results. This functionality is very useful for many
> > applications, and is very hard to implement properly with some dedicates
> kv
> > store.
> >
> > The KVStore abstraction could look as follows:
> >
> > KVStore store = new KVStore<>;
> >
> > Operations:
> >
> > store.put(DataStream>)
> > store.get(DataStream) -> DataStream>
> > store.remove(DataStream) -> DataStream>
> > store.multiGet(DataStream) -> DataStream[]>
> > store.getWithKeySelector(DataStream, KeySelector) ->
> > DataStream[]>
> >
> > For the resulting streams I used a special KV abstraction which let's us
> > return null values.
> >
> > The implementation uses a simple streaming operator for executing most of
> > the operations (for multi get there is an additional merge operator) with
> > either local or partitioned states for storing the kev-value pairs (my
> > current prototype uses local states). And it can either execute
> operations
> > eagerly (which would not provide deterministic results), or buffer up
> > operations and execute them in order upon watermarks.
> >
> > As for use cases you can probably come up with many I will save that for
> > now :D
> >
> > I have a prototype implementation here that can execute the operations
> > described above (does not handle watermarks and time yet):
> >
> > https://github.com/gyfora/flink/tree/KVStore
> > And also an example job:
> >
> >
> https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java
> >
> > What do you think?
> > If you like it I will work on writing tests and it still needs a lot of
> > tweaking and refactoring. This might be something we want to include with
> > the standard streaming libraries at one point.
> >
> > Cheers,
> > Gyula
>


Re: Driver Test Base issue

2015-08-25 Thread Stephan Ewen
Test for a fix is pending...

On Tue, Aug 25, 2015 at 1:30 PM, Stephan Ewen se...@apache.org wrote:

 Looking into this. Seems like a wrong use of assertions in a parallel
 thread, where errors are not propagated to the main JUNit thread.

 On Mon, Aug 24, 2015 at 6:53 PM, Sachin Goel sachingoel0...@gmail.com
 wrote:

 Hi Max
 I have already created a Jira for this.
 https://issues.apache.org/jira/browse/FLINK-2528
 The failing test was testCancelMatchTaskWhileMatching(), but I think it
 has
 more to do with the failure in canceling in general. I'll point you to
 some
 build logs. It might take me some time to find them since I've triggered
 quite a few builds since my email.

 ​Cheers!
 Sachin​

 -- Sachin Goel
 Computer Science, IIT Delhi
 m. +91-9871457685

 On Mon, Aug 24, 2015 at 3:38 PM, Maximilian Michels m...@apache.org
 wrote:

  Hi Sachin,
 
  Thanks for reporting. Which of the test cases failed in the MapTaskTest
 and
  the MatchTaskTest?
 
  Best,
  Max
 
  On Tue, Aug 18, 2015 at 5:35 PM, Sachin Goel sachingoel0...@gmail.com
  wrote:
 
   There appears to be some issue in DriverTestBase. I have observed two
   failures recently, once in MatchTaskTest and MapTaskTest, both with
 the
   following trace:
  
  
   Exception in thread Thread-154 java.lang.AssertionError: Canceling
 task
   failed: java.lang.NullPointerException
   at
  
  
 
 org.apache.flink.runtime.operators.testutils.DriverTestBase.cancel(DriverTestBase.java:271)
   at
  
  
 
 org.apache.flink.runtime.operators.testutils.TaskCancelThread.run(TaskCancelThread.java:60)
  
   at org.junit.Assert.fail(Assert.java:88)
   at
  
  
 
 org.apache.flink.runtime.operators.testutils.TaskCancelThread.run(TaskCancelThread.java:68)
  
   Regards
   Sachin
  
   -- Sachin Goel
   Computer Science, IIT Delhi
   m. +91-9871457685
  
 





Re: look for help about task hooks of storm-compatibility

2015-09-09 Thread Stephan Ewen
Hi!

I think there are four ways to do this:

1) Create a new abstract class "SerializableBaseTaskHook" that extends
BaseTaskHook and implements java.io.Serializabe. Then write the object into
bytes and put it into the config.

2) Offer in the StormCompatibilityAPI a method "public  void addHook(X hook)", which means you can
only pass elements that are both serializable and Hooks.

3) Require the user to write the classes with a public zero-argument
constructor and add only the class name to the config. Start the class via
"Class.forName(className, true, userCodeClassLoader).newInstance()".

4) Try to serialize with another Framework, like Kryo. This is not
guaranteed to work, however.

I hope any of those approaches works in your situation!

Greetings,
Stephan




On Wed, Sep 9, 2015 at 6:09 AM, Fangfengbin  wrote:

> Hi
>
> @mjsax @StephanEwen @rmetzger
> Can you give me some suggestion about my idea of how to transfer
> user-defined class code to task?
> Thank you very much!
>
> Regards
> Fengbin Fang
>
> #
>
> Dear all,
>
> I am work on task hooks of storm-compatibility. Storm support add hook
> through the Storm configuration using the "topology.auto.task.hooks"
> config. Users can use user-defined hooks class names as value of
> "topology.auto.task.hooks" of Configuration Map. These hooks are
> automatically registered in every spout or bolt.
>
> Example like this:
> public class Hooks extends BaseTaskHook {
> @Override
> public void boltExecute(BoltExecuteInfo info) {
> ...;
> }
> }
>
> public static void main(final String[] args) throws Exception {
> final FlinkTopologyBuilder builder =
> ExclamationTopology.buildTopology();
> Config config = new Config();
> config.put(Config.TOPOLOGY_AUTO_TASK_HOOKS,
> "org.apache.flink.stormcompatibility.util.Hook");
>
> // execute program locally
> final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
> cluster.submitTopology(topologyId, config, builder.createTopology()); }
>
> My question is how to transfer user-defined class code to task?
> Now I have a idea. I want to create user-defined hook object in
> FlinkClient and serialize it into bytes, then put bytes into storm
> configuration instead of user-defined hooks class names.
> So all spout/bout can get user-defined hook object from storm
> configuration. In this way, we can make a cleaner separation between the
> storm-compatibility and core code.
> But storm BaseTaskHook has not implements Serializable!  So user-defined
> hooks class must implements Serializable. This is the difference.
>
> Can you give me some suggestion? Thank you very much!!
>
> Regards
> Fengbin Fang
>
>
>
>
>
>


Re: Build failure with maven-junction-plugin

2015-09-10 Thread Stephan Ewen
Did you do a "clean" together with the "install"? Then it should work.

The problem occurred when you switch between versions where the link is set
(0.9+) and versions prior to the link (< 0.9) ...

Stephan


On Thu, Sep 10, 2015 at 1:37 PM, Matthias J. Sax  wrote:

> I could resolve this by manually deleting the link
> "/home/mjsax/workspace_flink_asf/flink-asf/build-target"
>
> But I still don't understand why this is necessary...
>
> On 09/10/2015 12:14 PM, Matthias J. Sax wrote:
> > Hi,
> >
> > I am picking up on this old thread, because I have the same error now. I
> > just created a new branch from a freshly rebased master branch.
> >
> > mvn -DskipTests clean install
> >
> >> Failed to execute goal com.pyx4j:maven-junction-plugin:1.0.3:link
> (default) on project flink-dist: Can't create junction source
> [/home/mjsax/workspace_flink_asf/flink-asf/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT]
> -> [/home/mjsax/workspace_flink_asf/flink-asf/flink-dist/../build-target]
> -> [Help 1]
> >
> > This email thread did not show a fix/reason for this issue.
> >
> >
> > -Matthias
> >
> > On 06/03/2015 05:24 PM, Theodore Vasiloudis wrote:
> >> mvn clean package -Dmaven.javadoc.skip=true -DskipTests
> >>
> >> Called from the flink root.
> >>
> >> On Wed, Jun 3, 2015 at 5:21 PM, Robert Metzger 
> wrote:
> >>
> >>> The WARNING is okay.
> >>>
> >>> How did you call maven, and from which directory (flink root or
> >>> flink-dist?)
> >>>
> >>> On Wed, Jun 3, 2015 at 5:10 PM, Theodore Vasiloudis <
> >>> theodoros.vasilou...@gmail.com> wrote:
> >>>
>  Hello,
> 
>  I'm having some problems with building the project. The error I get is
> >>> for
>  the flink-dist module is:
> 
>  [ERROR] Failed to execute goal
> com.pyx4j:maven-junction-plugin:1.0.3:link
> > (default) on project flink-dist: Can't create junction source
> >
> 
> >>>
> [/long/dir/here//flink-dist/target/flink-0.9-SNAPSHOT-bin/flink-0.9-SNAPSHOT]
> > -> [/long/dir/here/build-target/flink-dist/../build-target] -> [Help
> 1]
> >
> 
>  Before that I get this warning in maven-assembly-plugin:2.4:single
> (bin)
> >>> @
>  flink-dist ---:
> 
>  [WARNING] Assembly file:
> > long/dir/here/flink-dist/target/flink-0.9-SNAPSHOT-bin is not a
> regular
> > file (it may be a directory). It cannot be attached to the project
> >>> build
> > for installation or deployment.
> >
> 
>  Has anyone else encountered this? Any ideas on how it can be fixed?
> 
> 
> 
> 
>  --
>  View this message in context:
> 
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Build-failure-with-maven-junction-plugin-tp6056.html
>  Sent from the Apache Flink Mailing List archive. mailing list archive
> at
>  Nabble.com.
> 
> >>>
> >>
> >
>
>


Re: Java 8 JDK Issue

2015-09-16 Thread Stephan Ewen
Do you know between what classes the exception is? What is the original
class and what the cast target class?

Are they both the same, and not castable because of different copies being
loaded from different classloaders, or are they really different types?

On Wed, Sep 16, 2015 at 10:36 AM, Matthias J. Sax  wrote:

> Hi,
>
> I just hit a issue with Java 8 JDK. It occurs in a new test I added in a
> current PR.
>
> > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 8.793
> sec <<< FAILURE! - in
> org.apache.flink.stormcompatibility.wrappers.StormWrapperSetupHelperTest
> >
> testCreateTopologyContext(org.apache.flink.stormcompatibility.wrappers.StormWrapperSetupHelperTest)
> Time elapsed: 6.087 sec  <<< ERROR!
> > java.lang.ClassCastException: class sun.security.provider.ConfigFile
> >   at java.lang.Class.asSubclass(Class.java:3396)
> >   at
> javax.security.auth.login.Configuration$2.run(Configuration.java:254)
> >   at
> javax.security.auth.login.Configuration$2.run(Configuration.java:247)
> >   at java.security.AccessController.doPrivileged(Native Method)
> >   at
> javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
> >   at
> org.apache.storm.zookeeper.server.ServerCnxnFactory.configureSaslLogin(ServerCnxnFactory.java:174)
> >   at
> org.apache.storm.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:87)
> >   at
> backtype.storm.zookeeper$mk_inprocess_zookeeper$fn__1697$fn__1699.invoke(zookeeper.clj:198)
> >   at
> backtype.storm.zookeeper$mk_inprocess_zookeeper$fn__1697.invoke(zookeeper.clj:197)
> >   at
> backtype.storm.zookeeper$mk_inprocess_zookeeper.doInvoke(zookeeper.clj:195)
> >   at clojure.lang.RestFn.invoke(RestFn.java:410)
> >   at
> backtype.storm.testing$mk_local_storm_cluster.doInvoke(testing.clj:123)
> >   at clojure.lang.RestFn.invoke(RestFn.java:421)
> >   at backtype.storm.LocalCluster$_init.invoke(LocalCluster.clj:28)
> >   at backtype.storm.LocalCluster.(Unknown Source)
> >   at
> org.apache.flink.stormcompatibility.wrappers.StormWrapperSetupHelperTest.testCreateTopologyContext(StormWrapperSetupHelperTest.java:190)
>
> Please see here
>   https://travis-ci.org/mjsax/flink/jobs/80576847
>
> I have not clue what the problem might be. Searching on the Internet did
> not help...
>
> Does anyone have an idea what the problem might be?
>
>
> -Matthias
>
>


Re: Configuring UDFs with user defined parameters

2015-09-15 Thread Stephan Ewen
Yes, exactly...

On Tue, Sep 15, 2015 at 1:15 AM, Matthias J. Sax <mj...@apache.org> wrote:

> Thanks, now I understand how to do it:
>
> We just use
> > env.getConfig().setGlobalJobParameters(new StromConfig());
>
> with "StormConfig extends GlobalJobParameters"
>
> We can access this configuration in SourceFunction via
> > getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
>
> and in StreamOperator via
> > executionConfig.getGlobalJobParameters()
>
> -Matthias
>
>
> On 09/15/2015 12:31 AM, Fabian Hueske wrote:
> > Ah, here's the discussion I was looking for :-)
> > I think Stephan refers to ExecutionConfig.setGlobalJobParameters().
> >
> > 2015-09-15 0:25 GMT+02:00 Matthias J. Sax <mj...@apache.org>:
> >
> >> It might sound stupid. But how could such a configuration be set?
> >>
> >> StreamExecutionEnvironment only offerst ".getConfig()"
> >>
> >> -Matthias
> >>
> >> On 09/07/2015 03:05 PM, Stephan Ewen wrote:
> >>> The JobConfig is a system level config. Would be nice to not expose
> them
> >> to
> >>> the user-level unless necessary.
> >>>
> >>> What about using the ExecutionConfig, where you can add shared
> user-level
> >>> parameters?
> >>>
> >>> On Mon, Sep 7, 2015 at 1:39 PM, Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >>>
> >>>> Thanks for the input.
> >>>>
> >>>> However, I doubt that a member variable approach is feasible, because
> >>>> when the Storm topology is translated into a Flink program (in
> >>>> `FlinkBuilder.createTopology()`) the Storm configuration is not
> >>>> available yet. And adding the configuration later to each operator
> would
> >>>> be cumbersome.
> >>>>
> >>>> If there are no better ideas, I guess the current usage of
> >>>> JobConfiguration is the best way to handle it (because extending
> >>>> TaskConfiguration seems to be no option)
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 09/06/2015 10:51 PM, Aljoscha Krettek wrote:
> >>>>> Hi,
> >>>>> I think the possibility to use a Configuration object is a legacy
> from
> >>>> the
> >>>>> past where the API was a bit closer to how Hadoop works. In my
> opinion
> >>>> this
> >>>>> is not necessary anymore since User Code objects can just contain
> >>>>> configuration settings in fields.
> >>>>>
> >>>>> The feature for the Storm API could probably be implemented by just
> >>>> storing
> >>>>> a Configuration object in the user code function.
> >>>>>
> >>>>> Regards,
> >>>>> Aljoscha
> >>>>>
> >>>>> On Sun, 6 Sep 2015 at 18:29 Matthias J. Sax <mj...@apache.org>
> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> I observed, that DataSet API offers a nice way to configure
> >>>>>> UDF-Operators by providing the method ".withParameters()". However,
> >>>>>> Streaming API does not offer such a method.
> >>>>>>
> >>>>>> For a current PR (https://github.com/apache/flink/pull/1046) this
> >>>>>> feature would be very helpful.
> >>>>>>
> >>>>>> As a workaround, PR #1046 can also be finished using
> JobConfiguration.
> >>>>>> However, this seems to be somewhat unnatural. Furthermore, I think
> >> that
> >>>>>> this feature would be nice to have in general. What do you think
> about
> >>>> it?
> >>>>>>
> >>>>>> If we introduce this feature, we can either open a new JIRA of just
> >>>>>> include it into the current PR #1046. What would be the better way?
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>


Re: Streaming KV store abstraction

2015-09-15 Thread Stephan Ewen
y-value store
> > > logic.
> > > > > But
> > > > > > once you have one(or more) update stream and many get streams
> this
> > > > > > implementation will not work. So either the user end up
> replicating
> > > the
> > > > > > whole state in multiple connected operators, or custom implement
> > some
> > > > > > inefficient wrapper class to take care of all the put/get
> > operations.
> > > > > >
> > > > > > The Idea behind this is to give a very simple abstraction for
> this
> > > type
> > > > > of
> > > > > > processing that uses the flink runtime efficiently instead of
> > relying
> > > > on
> > > > > > custom implementations.
> > > > > >
> > > > > > Let me give you a stupid example:
> > > > > >
> > > > > > You receive Temperature data in the form of (city, temperature),
> > and
> > > > you
> > > > > > are computing a rolling avg for each city.
> > > > > > Now you have 2 other incoming streams: first is a stream of some
> > > other
> > > > > info
> > > > > > about the city let's say population (city, population) and you
> want
> > > to
> > > > > > combine it with the last known avg temperature to produce (city,
> > > temp,
> > > > > pop)
> > > > > > triplets. The second stream is a pair of cities (city,city) and
> you
> > > are
> > > > > > interested in the difference of the temperature.
> > > > > >
> > > > > > For enriching the (city, pop) to (city,temp,pop) you would
> probably
> > > > use a
> > > > > > CoFlatMap and store the last known rolling avg as state. For
> > > computing
> > > > > the
> > > > > > (city,city) temperature difference it is a little more difficult,
> > as
> > > > you
> > > > > > need to get the temperature for both cities then combine in a
> > second
> > > > > > operator. If you don't want to replicate your state, you have to
> > > > combine
> > > > > > these two problems to a common wrapper type and execute them on a
> > > same
> > > > > > operator which will keep the avg state.
> > > > > >
> > > > > > With the KVStore abstraction this is very simple:
> > > > > > you create a KVStore<City, Temp>
> > > > > > For enriching you use kvStore.getWithKeySelector() which will
> give
> > > you
> > > > > > ((cit,pop), temp) pairs and you are done. For computing the
> > > difference,
> > > > > you
> > > > > > can use kvStore.multiget(...) and get for the 2 cities at the
> same
> > > > type.
> > > > > > The kv store will abstract away the getting of the 2 keys
> > separately
> > > > and
> > > > > > merging them so it will return [(city1, t1), (city2,t2)].
> > > > > >
> > > > > > This might be slightly artificial example but I think it makes
> the
> > > > point.
> > > > > > Implementing these jobs efficiently is not trivial for the users
> > but
> > > I
> > > > > > think it is a very common problem.
> > > > > >
> > > > > > Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. szept.
> > 8.,
> > > K,
> > > > > > 14:53):
> > > > > >
> > > > > > > @Gyula
> > > > > > >
> > > > > > > Can you explain a bit what this KeyValue store would do more
> then
> > > the
> > > > > > > partitioned key/value state?
> > > > > > >
> > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <gga...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hello,
> > > > > > > >
> > > > > > > > As for use cases, in my old job at Ericsson we were building
> a
> > > > > > > > streaming system that was processing data from telephone
> > > networks,
> > > > > and
> > > > > > > > it was using key-value stores a LOT. For example, keeping
&

Re: Java 8 JDK Issue

2015-09-16 Thread Stephan Ewen
No clue, sorry.

The Storm Mailing list may be able to help you, it looks like deep into
their code...

On Wed, Sep 16, 2015 at 9:00 PM, Matthias J. Sax <mj...@apache.org> wrote:

> Hi,
>
> using Java 8 locally, I was able to reproduce the problem. I dug into it
> and could figure out the following. However, I cannot make any sense out
> of it. I have no experience with Java security stuff...
>
> Anyone?
>
> -Matthias
>
> 
>
> In class
> > javax.security.auth.login.Configuration.getConfiguration()
>
> in line 235 to 242 (for Java 7) and
> in line 232 to 239 (for Java 8)
>
> this code
>
> > String config_class = null;
> > config_class = AccessController.doPrivileged
> > (new PrivilegedAction() {
> > public String run() {
> > return java.security.Security.getProperty
> > ("login.configuration.provider");
> > }
> > });
>
> results for "config_class":
>   Java 7: com.sun.security.auth.login.ConfigFile
>   Java 8: sun.security.provider.ConfigFile
>
> Later this leads to ClassCastException (in line 254 to 257 for Java 7;
> and line 251 to 254 for Java 8)
>
> > Class implClass = Class.forName(
> > finalClass, false,
> > Thread.currentThread().getContextClassLoader()
> > ).asSubclass(Configuration.class);
>
> The call stack is also somewhat different.
>
> Java 7:
>
> > Configuration.getConfiguration() line: 235
> > NIOServerCnxnFactory(ServerCnxnFactory).configureSaslLogin() line: 174
> > NIOServerCnxnFactory.configure(InetSocketAddress, int) line: 87
> > zookeeper.clj line: 198
> > zookeeper.clj line: 197
> > zookeeper.clj line: 195
> > zookeeper$mk_inprocess_zookeeper(RestFn).invoke(Object) line: 410
> > testing.clj line: 123
> > testing$mk_local_storm_cluster(RestFn).invoke(Object, Object) line: 421
> > LocalCluster.clj line: 28
> > LocalCluster.() line: not available
> > StormWrapperSetupHelperTest.testCreateTopologyContext() line: 190
>
> Java 8:
>
> > Configuration$2.run() line: 254 [local variables unavailable]
> > Configuration$2.run() line: 247
> > AccessController.doPrivileged(PrivilegedExceptionAction) line: not
> available [native method]
> > Configuration.getConfiguration() line: 246 [local variables unavailable]
> > NIOServerCnxnFactory(ServerCnxnFactory).configureSaslLogin() line: 174
> > NIOServerCnxnFactory.configure(InetSocketAddress, int) line: 87
> > zookeeper.clj line: 198
> > zookeeper.clj line: 197
> > zookeeper.clj line: 195
> > zookeeper$mk_inprocess_zookeeper(RestFn).invoke(Object) line: 410
> > testing.clj line: 123
> > testing$mk_local_storm_cluster(RestFn).invoke(Object, Object) line: 421
> > LocalCluster.clj line: 28
> > LocalCluster.() line: not available
> > StormWrapperSetupHelperTest.testCreateTopologyContext() line: 190
>
> 
>
>
>
>
> On 09/16/2015 12:56 PM, Matthias J. Sax wrote:
> > Not right now... The ClassCastException occurs deeply inside Storm. I
> > guess it is a Storm issue.
> >
> > I just instantiate a (Storm)LocalCluster and it fails internally.
> >
> >> LocalCluster cluster = new LocalCluster();
> >
> > Maybe I need to try it locally with Java 8 JDK to be able to debug it...
> > I will pick up this thread again if I have more information.
> >
> > As a fall back, we might want to deactivate this test for Java 8 (if
> > this is possible).
> >
> >
> > -Matthias
> >
> >
> > On 09/16/2015 12:34 PM, Stephan Ewen wrote:
> >> Do you know between what classes the exception is? What is the original
> >> class and what the cast target class?
> >>
> >> Are they both the same, and not castable because of different copies
> being
> >> loaded from different classloaders, or are they really different types?
> >>
> >> On Wed, Sep 16, 2015 at 10:36 AM, Matthias J. Sax <mj...@apache.org>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> I just hit a issue with Java 8 JDK. It occurs in a new test I added in
> a
> >>> current PR.
> >>>
> >>>> Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 8.793
> >>> sec <<< FAILURE! - in
> >>>
> org.apache.flink.stormcompatibility.wrappers.StormWrapperSetupHelperTest
> >>>>
> >>>
> testCreateTopologyContext(org

Tests - Unit Tests versus Integration Tests

2015-09-17 Thread Stephan Ewen
Hi all!

The build time of Flink with all tests is nearing 1h on Travis for the
shortest run.
It is good that we do excessive testing, there are many mechanisms that
need that.

I have also seen that a lot of fixes that could be tested in a UnitTest
style are actually tested as a full Flink program (Integration test style)

While these tests are always easier to write, they have two problems:
  - The bring up the build time by about 5 secs per test
  - They are often not as targeted to the problem as a UnitTest

I would like to encourage everyone to keep this in mind and do Unit tests
in the cases where they are the preferred choice. Please also keep that in
mind when reviewing pull requests.

For Example:
  - API / TypeInformation changes can be very well tested without running
the program. Simply create the program and test the operator's type info.
  - Custom functions can be very well tested in isolation
  - Input/Output formats actually test well in UnitTests.

Integration tests need to be used when verifying behavior across components
/ layers, so keep using them when they need to be used.


Greetings,
Stephan


Re: Releasing 0.10.0-milestone1

2015-09-11 Thread Stephan Ewen
Sure that is fine with me you are the release manager ;-)

I don't want to say let's not include anything else, just saying that we
need a cutoff point, as we have been postponing this for a while...

On Fri, Sep 11, 2015 at 2:51 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> We're waiting for a fix of FLINK-2637 (TypeInfo equals + hashcode).
> Once this is in, we do the first release candidate, OK?
>
> 2015-09-11 14:48 GMT+02:00 Stephan Ewen <se...@apache.org>:
>
> > Fixes come up on a daily base. I think at some point we need to declare
> > freeze, or this thing will never be released.
> >
> > @Fabian: Since you are release manager, do you want to pick the point to
> > declare freeze (and pick what should be included)?
> >
> > On Fri, Sep 11, 2015 at 2:41 PM, Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> > > I would have an important fix too.
> > >
> > > https://issues.apache.org/jira/browse/FLINK-2658
> > > https://github.com/apache/flink/pull/1122
> > >
> > > Or is it too late?
> > >
> > >
> > > On 09/10/2015 06:54 PM, Robert Metzger wrote:
> > > > Since the vote has not been started yet, we might consider including
> > this
> > > > critical fix for the Kafka consumer into 0.10-milestone1:
> > > > https://github.com/apache/flink/pull/1117
> > > >
> > > > On Thu, Sep 10, 2015 at 1:54 PM, Matthias J. Sax <mj...@apache.org>
> > > wrote:
> > > >
> > > >> I justed opened a PR for FLINK-2632: "Web Client does not respect
> the
> > > >> class loader of submitted jobs"
> > > >>
> > > >> https://github.com/apache/flink/pull/1114
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >> On 09/10/2015 11:49 AM, Fabian Hueske wrote:
> > > >>> All these bugs reported by users and its just four of them of
> which 2
> > > are
> > > >>> already fixed.
> > > >>> It should be possible to fix them in a day.
> > > >>>
> > > >>> 2015-09-10 11:19 GMT+02:00 Robert Metzger <rmetz...@apache.org>:
> > > >>>
> > > >>>> Thank you very much for taking care of the release management!
> > > >>>>
> > > >>>> I'm not sure whether it makes sense to block a "preview" release
> on
> > so
> > > >> many
> > > >>>> bug fixes. It feels like 80% of our commits are bugfixes or
> > stability
> > > >>>> improvements anyways, so we would wait infinitely to release the
> > > >> milestone
> > > >>>> if we wait for all of them.
> > > >>>>
> > > >>>> For example for the class loader issue in the web client issue,
> Max
> > > (who
> > > >>>> initially brought the issue to our attention) also said: "Would be
> > > nice
> > > >> to
> > > >>>> fix it but IMHO it is not critical for the milestone release."
> > > >>>>
> > > >>>>
> > > >>>> I would vote for merging Till's
> > > >> https://github.com/apache/flink/pull/1101
> > > >>>> and then start with the RC0 vote.
> > > >>>> If there are more bug fixes available and the RC0 vote fails
> (which
> > is
> > > >>>> likely ;) ), we can include them as well.
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On Thu, Sep 10, 2015 at 11:04 AM, Fabian Hueske <
> fhue...@gmail.com>
> > > >> wrote:
> > > >>>>
> > > >>>>> I can take the role of the release manager and push the 0.10
> > > milestone
> > > >>>>> release forward.
> > > >>>>>
> > > >>>>> I added a list of pending fixes for 0.10 to the wiki:
> > > >>>>> https://cwiki.apache.org/confluence/display/FLINK/0.10+Release
> > > >>>>> Please extend the list (or respond to this thread) if you have
> more
> > > >> fixes
> > > >>>>> that should go into the release.
> > > >>>>>
> > > >>>>> From the currently open issues only FLINK-2632 (Web Client does
> not
> > > >>>> respect
> > > >&g

Re: Releasing 0.10.0-milestone1

2015-09-11 Thread Stephan Ewen
Fixes come up on a daily base. I think at some point we need to declare
freeze, or this thing will never be released.

@Fabian: Since you are release manager, do you want to pick the point to
declare freeze (and pick what should be included)?

On Fri, Sep 11, 2015 at 2:41 PM, Matthias J. Sax <mj...@apache.org> wrote:

> I would have an important fix too.
>
> https://issues.apache.org/jira/browse/FLINK-2658
> https://github.com/apache/flink/pull/1122
>
> Or is it too late?
>
>
> On 09/10/2015 06:54 PM, Robert Metzger wrote:
> > Since the vote has not been started yet, we might consider including this
> > critical fix for the Kafka consumer into 0.10-milestone1:
> > https://github.com/apache/flink/pull/1117
> >
> > On Thu, Sep 10, 2015 at 1:54 PM, Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> I justed opened a PR for FLINK-2632: "Web Client does not respect the
> >> class loader of submitted jobs"
> >>
> >> https://github.com/apache/flink/pull/1114
> >>
> >> -Matthias
> >>
> >>
> >> On 09/10/2015 11:49 AM, Fabian Hueske wrote:
> >>> All these bugs reported by users and its just four of them of which 2
> are
> >>> already fixed.
> >>> It should be possible to fix them in a day.
> >>>
> >>> 2015-09-10 11:19 GMT+02:00 Robert Metzger <rmetz...@apache.org>:
> >>>
> >>>> Thank you very much for taking care of the release management!
> >>>>
> >>>> I'm not sure whether it makes sense to block a "preview" release on so
> >> many
> >>>> bug fixes. It feels like 80% of our commits are bugfixes or stability
> >>>> improvements anyways, so we would wait infinitely to release the
> >> milestone
> >>>> if we wait for all of them.
> >>>>
> >>>> For example for the class loader issue in the web client issue, Max
> (who
> >>>> initially brought the issue to our attention) also said: "Would be
> nice
> >> to
> >>>> fix it but IMHO it is not critical for the milestone release."
> >>>>
> >>>>
> >>>> I would vote for merging Till's
> >> https://github.com/apache/flink/pull/1101
> >>>> and then start with the RC0 vote.
> >>>> If there are more bug fixes available and the RC0 vote fails (which is
> >>>> likely ;) ), we can include them as well.
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Sep 10, 2015 at 11:04 AM, Fabian Hueske <fhue...@gmail.com>
> >> wrote:
> >>>>
> >>>>> I can take the role of the release manager and push the 0.10
> milestone
> >>>>> release forward.
> >>>>>
> >>>>> I added a list of pending fixes for 0.10 to the wiki:
> >>>>> https://cwiki.apache.org/confluence/display/FLINK/0.10+Release
> >>>>> Please extend the list (or respond to this thread) if you have more
> >> fixes
> >>>>> that should go into the release.
> >>>>>
> >>>>> From the currently open issues only FLINK-2632 (Web Client does not
> >>>> respect
> >>>>> the class loader of submitted jobs) is unassigned.
> >>>>> Anybody around to pick this one up?
> >>>>>
> >>>>> Cheers, Fabian
> >>>>>
> >>>>> 2015-09-09 12:00 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:
> >>>>>
> >>>>>> I forgot to mention that there is also a bug with the `StreamFold`
> >>>>> operator
> >>>>>> which we might consider fixing for the milestone release. I've
> opened
> >> a
> >>>>> PR
> >>>>>> for it.
> >>>>>>
> >>>>>> https://issues.apache.org/jira/browse/FLINK-2631
> >>>>>> https://github.com/apache/flink/pull/1101
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Sep 9, 2015 at 10:58 AM, Gyula Fóra <gyf...@apache.org>
> >> wrote:
> >>>>>>
> >>>>>>> This sounds good +1 from me as well :)
> >>>>>>> Till Rohrmann <till.rohrm...@gmail.com> ezt írta (időpont: 2015.
> >>>>> szept.
> >>>>>>> 9.,
> >>>>>>> Sze, 10:40):
> >>>>>>>
>

Re: Add a module for "manual" tests

2015-09-29 Thread Stephan Ewen
@Robert: This does not mean that we move all connector tests to that
package, but only the ones that we cannot fully control (like ElasticSearch)

On Tue, Sep 29, 2015 at 3:49 PM, Robert Metzger <rmetz...@apache.org> wrote:

> I agree that some tests are pretty resource intensive and/or unstable...
>
> However I fear that we are ending up fixing our connectors for every
> release (potentially delaying releases and forcing people to fix tests for
> changes they didn't introduce)
> We could try out adding another Travis build which is only executing those
> heavy tests...
>
>
>
> On Tue, Sep 29, 2015 at 1:40 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> > +1 for your proposal Stephan. Seems like a good idea to improve build
> > stability as well as guarding components like the ElasticSearchSink by a
> > test, even though it’s not regularly executed.
> > ​
> >
> > On Tue, Sep 29, 2015 at 12:34 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > Hi all!
> > >
> > > We have by now quite some tests that are not really suitable for
> > execution
> > > as part of the regular build process.
> > >
> > >   - Some tests just take very long, for example some of the sorter
> tests
> > > with multiple recursive merges.
> > >
> > >   - Some tests depend on external components. These components are
> > > sometimes not stable enough for reliable tests. An example is the
> > > ElasticSearchITCase
> > >
> > > How about we add a dedicated module for these tests, that we execute
> > during
> > > release testing, but not as part of the regular build tests?
> > >
> > > Greetings,
> > > Stephan
> > >
> >
>


Re: Release Flink 0.10

2015-09-29 Thread Stephan Ewen
+1 here as well

On Tue, Sep 29, 2015 at 12:03 PM, Fabian Hueske  wrote:

> +1 for moving directly to 0.10.
>
> 2015-09-29 11:40 GMT+02:00 Maximilian Michels :
>
> > Hi Kostas,
> >
> > I think it makes sense to cancel the proposed 0.10-milestone release.
> > We are not far away from completing all essential features of the 0.10
> > release. After we manage to complete those, we can test and release
> > 0.10.
> >
> > The 0.10 release will be a major step towards the 1.0 release and,
> > therefore, all new features of 0.10 should get enough exposure until
> > we release 1.0.
> >
> > Cheers,
> > Max
> >
> > On Tue, Sep 29, 2015 at 11:26 AM, Kostas Tzoumas 
> > wrote:
> > > Hi everyone,
> > >
> > > I would like to propose to cancel the 0.10-milestone release and go
> > > directly for a 0.10 release as soon as possible.
> > >
> > > My opinion would be to focus this release on:
> > > - Graduating the streaming API out of staging (depends on some open
> pull
> > > requests)
> > > - Master high availability
> > > - New monitoring framework
> > > - Graduating Gelly out of staging
> > >
> > > Flink 1.0 will probably come after 0.10, which gives us time to fix
> open
> > > issues and freeze APIs.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Kostas
> >
>


Re: Pulling Streaming out of staging and project restructure

2015-10-01 Thread Stephan Ewen
+1 for Robert's comments.

On Thu, Oct 1, 2015 at 3:16 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Big +1 for graduating streaming out of staging. It is widely used, also in
> production and we are spending a lot of effort into hardening it.
> I also agree with the proposed new maven module structure.
>
> We have to carefully test the reworked structure for the scripts which are
> generating the hadoop1 and the scala 2.11 poms (they are transformed using
> a bunch of bash scripts). I can do that once the PR is open.
>
> @Chesnay: I would be fine with including the language binding into python
> > Where would new projects reside in, that previously would have been put
> into flink-staging?
>
> flink-contrib
>
>
> @Kostas: I understand the idea behind your suggested renaming, but thats
> just a name. I don't think its going to influence how people are seeing
> Flink: It doesn't feel like second class when adding "flink-streaming-core"
> to the dependencies to me.
> Also, the "flink-datastream-scala" module would depend on
> "flink-dataset-scala", which is kind of weird.
>
>
> I'm wondering whether we should remove the "flink-test-utils" module. I
> don't think its really necessary, because we can put the test jars into the
> flink-tests project and include them using the "test-jar" dependency.
>
>
> On Thu, Oct 1, 2015 at 2:27 PM, Kostas Tzoumas <ktzou...@apache.org>
> wrote:
>
> > +1
> >
> > I wanted to suggest that we rename modules to fully accept streaming as
> > first class, qualifying also "batch" as "batch" (e.g., flink-java -->
> > flink-dataset-java, flink-streaming --> flink-datastream, etc).
> >
> > This would break maven dependencies (temporary hell :-) so it's not a
> > decision to take lightly. I'm not strongly advocating for it.
> >
> >
> > On Thu, Oct 1, 2015 at 12:44 PM, Chesnay Schepler <ches...@apache.org>
> > wrote:
> >
> > > I like it in general. But while we're at it, what is the purpose of the
> > > flink-tests project, or rather which tests belong there instead of the
> > > individual projects?
> > >
> > > Where would new projects reside in, that previously would have been put
> > > into flink-staging?
> > >
> > > Lastly, I'd like to merge flink-language-binding into flink-python. I
> can
> > > go more into detail but the gist of it is that the abstraction just
> > doesn't
> > > work.
> > >
> > >
> > > On 01.10.2015 12:40, Márton Balassi wrote:
> > >
> > >> Great to see streaming graduating. :)
> > >>
> > >> I like the outline, both getting rid of staging, having the examples
> > >> together and generally flattening the structure are very reasonable to
> > me.
> > >>
> > >> You have listed flink-streaming-examples under
> > flink-streaming-connectors
> > >> and left out some less prominent maven modules, but I assume the first
> > is
> > >> accidental while the second is intentional to make the list a bit
> > briefer.
> > >>
> > >> Best,
> > >>
> > >> Marton
> > >>
> > >>
> > >> On Thu, Oct 1, 2015 at 12:25 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > >>
> > >> Hi all!
> > >>>
> > >>> We are making good headway with reworking the last parts of the
> Window
> > >>> API.
> > >>> After that, the streaming API should be good to be pulled out of
> > staging.
> > >>>
> > >>> Since we are reorganizing the projects as part of that, I would
> shift a
> > >>> bit
> > >>> more to bring things a bit more up to date.
> > >>>
> > >>> In this restructure, I would like to get rid of the "flink-staging"
> > >>> project. Anyone who only uses the maven artifacts sees no difference
> > >>> whether a project is in "staging" or not, so it does not help much to
> > >>> have
> > >>> that directory structure.
> > >>> On the other hand, projects have a tendency to linger in staging
> > forever
> > >>> (like avro, spargel, hbase, jdbc, ...)
> > >>>
> > >>> The new structure could be
> > >>>
> > >>> flink-core
> > >>> flink-java
> > >>> flink-scala
> > >>> flink-streaming-core
>

Pulling Streaming out of staging and project restructure

2015-10-01 Thread Stephan Ewen
Hi all!

We are making good headway with reworking the last parts of the Window API.
After that, the streaming API should be good to be pulled out of staging.

Since we are reorganizing the projects as part of that, I would shift a bit
more to bring things a bit more up to date.

In this restructure, I would like to get rid of the "flink-staging"
project. Anyone who only uses the maven artifacts sees no difference
whether a project is in "staging" or not, so it does not help much to have
that directory structure.
On the other hand, projects have a tendency to linger in staging forever
(like avro, spargel, hbase, jdbc, ...)

The new structure could be

flink-core
flink-java
flink-scala
flink-streaming-core
flink-streaming-scala

flink-runtime
flink-runtime-web
flink-optimizer
flink-clients

flink-shaded
  -> flink-shaded-hadoop
  -> flink-shaded-hadoop2
  -> flink-shaded-include-yarn-tests
  -> flink-shaded-curator

flink-examples
  -> (have all examples, Scala and Java, Batch and Streaming)

flink-batch-connectors
  -> flink-avro
  -> flink-jdbc
  -> flink-hadoop-compatibility
  -> flink-hbase
  -> flink-hcatalog

flink-streaming-connectors
  -> flink-connector-twitter
  -> flink-streaming-examples
  -> flink-connector-flume
  -> flink-connector-kafka
  -> flink-connector-elasticsearch
  -> flink-connector-rabbitmq
  -> flink-connector-filesystem

flink-libraries
  -> flink-gelly
  -> flink-gelly-scala
  -> flink-ml
  -> flink-table
  -> flink-language-binding
  -> flink-python


flink-scala-shell

flink-test-utils
flink-tests
flink-fs-tests

flink-contrib
  -> flink-storm-compatibility
  -> flink-storm-compatibility-examples
  -> flink-streaming-utils
  -> flink-tweet-inputformat
  -> flink-operator-stats
  -> flink-tez

flink-quickstart
  -> flink-quickstart-java
  -> flink-quickstart-scala
  -> flink-tez-quickstart

flink-yarn
flink-yarn-tests

flink-dist

flink-benchmark


Let me know if that makes sense!

Greetings,
Stephan


Re: Pulling Streaming out of staging and project restructure

2015-10-02 Thread Stephan Ewen
@matthias +1 for that approach

On Fri, Oct 2, 2015 at 11:21 AM, Matthias J. Sax <mj...@apache.org> wrote:

> It think, rename "flink-storm-compatibility-core" to just "flink-storm"
> would be the cleanest solution.
>
> So in flink-contrib there would be two modules:
>   - flink-storm
>   - flink-storm-examples
>
> Please let me know if you have any objection about it.
>
> -Matthias
>
> On 10/02/2015 10:45 AM, Matthias J. Sax wrote:
> > Sure. Will do that.
> >
> > -Matthias
> >
> > On 10/02/2015 10:35 AM, Stephan Ewen wrote:
> >> @Matthias: How about getting rid of the storm-compatibility-parent and
> >> making the core and examples projects directly projects in "contrib"
> >>
> >> On Fri, Oct 2, 2015 at 10:34 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
> >>
> >>> +1 for the new project structure. Getting rid of our code dump is a
> good
> >>> thing.
> >>>
> >>> On Fri, Oct 2, 2015 at 10:25 AM, Maximilian Michels <m...@apache.org>
> >>> wrote:
> >>>
> >>>> +1 Matthias, let's limit the overhead this has for the module
> >>> maintainers.
> >>>>
> >>>> On Fri, Oct 2, 2015 at 12:17 AM, Matthias J. Sax <mj...@apache.org>
> >>> wrote:
> >>>>> I will commit something to flink-storm-compatibility tomorrow that
> >>>>> contains some internal package restructuring. I think, renaming the
> >>>>> three modules in this commit would be a smart move as both changes
> >>>>> result in merge conflicts when rebasing open PRs. Thus we can limit
> >>> this
> >>>>> pain to a single time. If no objections, I will commit those changes
> >>>>> tomorrow.
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 10/01/2015 09:52 PM, Henry Saputra wrote:
> >>>>>> +1
> >>>>>>
> >>>>>> I like the idea moving "staging" projects into appropriate modules.
> >>>>>>
> >>>>>> While we are at it, I would like to propose changing "
> >>>>>> flink-hadoop-compatibility" to "flink-hadoop". It is in my bucket
> list
> >>>>>> but would be nice if it is part of re-org.
> >>>>>> Supporting Hadoop in the connector implicitly means compatibility
> with
> >>>> Hadoop.
> >>>>>> Also same thing with "flink-storm-compatibility" to "flink-storm".
> >>>>>>
> >>>>>> - Henry
> >>>>>>
> >>>>>> On Thu, Oct 1, 2015 at 3:25 AM, Stephan Ewen <se...@apache.org>
> >>> wrote:
> >>>>>>> Hi all!
> >>>>>>>
> >>>>>>> We are making good headway with reworking the last parts of the
> >>> Window
> >>>> API.
> >>>>>>> After that, the streaming API should be good to be pulled out of
> >>>> staging.
> >>>>>>>
> >>>>>>> Since we are reorganizing the projects as part of that, I would
> shift
> >>>> a bit
> >>>>>>> more to bring things a bit more up to date.
> >>>>>>>
> >>>>>>> In this restructure, I would like to get rid of the "flink-staging"
> >>>>>>> project. Anyone who only uses the maven artifacts sees no
> difference
> >>>>>>> whether a project is in "staging" or not, so it does not help much
> to
> >>>> have
> >>>>>>> that directory structure.
> >>>>>>> On the other hand, projects have a tendency to linger in staging
> >>>> forever
> >>>>>>> (like avro, spargel, hbase, jdbc, ...)
> >>>>>>>
> >>>>>>> The new structure could be
> >>>>>>>
> >>>>>>> flink-core
> >>>>>>> flink-java
> >>>>>>> flink-scala
> >>>>>>> flink-streaming-core
> >>>>>>> flink-streaming-scala
> >>>>>>>
> >>>>>>> flink-runtime
> >>>>>>> flink-runtime-web
> >>>>>>> flink-optimizer
> >>>>>>> flink-clien

Re: Release Flink 0.10

2015-10-02 Thread Stephan Ewen
I would actually like to remove the old one, but I am okay with keeping it
and activating the new one by default

On Fri, Oct 2, 2015 at 3:49 PM, Robert Metzger  wrote:

> The list from Kostas also contained the new JobManager front end.
>
> Do we want to enable it by default in the 0.10 release?
>
> Are we going to keep the old interface, or are we removing it?
>
> I'm voting for enabling the new one by default and keeping the old one for
> the next release.
> What do you think?
>
>
> On Tue, Sep 29, 2015 at 8:41 PM, Henry Saputra 
> wrote:
>
> > Oops, I meant Spargel [1] =)
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/libs/spargel_guide.html
> >
> > On Tue, Sep 29, 2015 at 7:59 AM, Henry Saputra 
> > wrote:
> > > +1 to the idea.
> > >
> > > I also think we need to remove Pregel if Gelly wants to graduate. It
> > already
> > > deprecated in 0.9.
> > >
> > > - Henry
> > >
> > >
> > > On Tuesday, September 29, 2015, Kostas Tzoumas 
> > wrote:
> > >>
> > >> Hi everyone,
> > >>
> > >> I would like to propose to cancel the 0.10-milestone release and go
> > >> directly for a 0.10 release as soon as possible.
> > >>
> > >> My opinion would be to focus this release on:
> > >> - Graduating the streaming API out of staging (depends on some open
> pull
> > >> requests)
> > >> - Master high availability
> > >> - New monitoring framework
> > >> - Graduating Gelly out of staging
> > >>
> > >> Flink 1.0 will probably come after 0.10, which gives us time to fix
> open
> > >> issues and freeze APIs.
> > >>
> > >> What do you think?
> > >>
> > >> Best,
> > >> Kostas
> >
>


Re: An update on the DataStream API refactoring WiP

2015-10-02 Thread Stephan Ewen
I added two comments to the pull request that this is based on...

On Fri, Oct 2, 2015 at 2:47 PM, Robert Metzger  wrote:

> I suspect: "- Deletion of "DataSet.forward() and .global()"" is a typo, you
> meant DataStream ?
>
> On Fri, Oct 2, 2015 at 2:44 PM, Kostas Tzoumas 
> wrote:
>
> > Oh, and of course, support for event time. I might be forgetting more,
> feel
> > free to add to the list
> >
> >
> > On Fri, Oct 2, 2015 at 2:40 PM, Kostas Tzoumas 
> > wrote:
> >
> > > Hi folks,
> > >
> > > Currently, Aljoscha, Stephan, and I are reworking the DataStream API as
> > > discussed before. Things are a bit in-flight right now with several
> > commits
> > > and pull requests, and the current master containing code from both the
> > old
> > > and the new API.
> > >
> > > I want to give you an idea of how the new API will look like. This is a
> > > very rough draft of the new documentation page (also a WiP):
> > >
> > >
> >
> https://www.dropbox.com/sh/t5nvlx7meadppnp/AAD5sEIH5S3QNYTiMsyE9KBva?dl=0
> > >
> > > Compared to the current API, the major changes include:
> > >
> > > - Different syntax (and implementation) for windows. Old constructs
> will
> > > be replaced by the new ones. The new syntax resembles Google's Dataflow
> > > model, but contains "shortcuts" as syntactic sugar for common cases
> > >
> > > - Different syntax (and implementation) of "grouping". New terminology
> > > will be KeyedDataStream (and "keyBy") which will replace
> > GroupedDataStream.
> > >
> > > - Reduced functionality in ConnectedDataStream - only map and flatMap
> > >
> > > - New syntax (and implementation) for window joins, removal of cross
> > >
> > > - No changes in iterations besides deleting the "long milliseconds"
> > > argument
> > >
> > > - No changes in state
> > >
> > > - Deletion of "DataSet.forward() and .global()"
> > >
> > > - Windows can only come after keyBy, otherwise they are DOP-1 operators
> > > and are defined as "windowAll"
> > >
> > > Best,
> > > Kostas
> > >
> >
>


Re: Hash-based aggregation

2015-10-02 Thread Stephan Ewen
I think that roughly, an approach like the compacting hash table is the
right one.
Go ahead and take a stab at it, if you want, ping us if you run into
obstacles.

Here are a few thoughts on the hash-aggregator from discussions between
Fabian and me:

1) It may be worth to have a specialized implementations for aggregates of
constant-length values. Such as counts, or sum/max/min of types like
int/long/double. The value can be updated in place, there is no need ever
for compaction logic. The compacting logic is in the end more tricky than
it seems at the first glance, it took quite a few cycles to eliminate most
bugs.

2) I would try to tailor the hash aggregator to a "combiner" functionality
initially. That means no spilling on memory shortage, but eviction of
pre-aggregated results into the output stream. That is probably easier to
do and the most powerful improvement over the current capabilities.

Happy coding!

Greetings,
Stephan




On Thu, Oct 1, 2015 at 8:33 PM, Gábor Gévay  wrote:

> Hello,
>
> I would really like to see FLINK-2237 solved.
> I would implement this feature over the weekend, if the
> CompactingHashTable can be used to solve it (see my comment there).
> Could you please give me some advice on whether is this a viable
> approach, or you perhaps see some difficulties that I'm not aware of?
>
> Best,
> Gabor
>


Rethink the "always copy" policy for streaming topologies

2015-10-02 Thread Stephan Ewen
Hi all!

Now that we are coming to the next release, I wanted to make sure we
finalize the decision on that point, because it would be nice to not break
the behavior of system afterwards.

Right now, when tasks are chained together, the system copies the elements
always between different tasks in the same chain.

I think this policy was established under the assumption that copies do not
cost anything, given our own test examples, which mainly use immutable
types like Strings, boxed primitives, ..

In practice, a lot of data types are actually quite expensive to copy.

For example, a rather common data type in the event analysis of web-sources
is JSON Object.
Flink treats this as a generic type. Depending on its concrete
implementation, Kryo may have perform a serialization copy, which means
encoding into bytes (JSON encoding, charset encoding) and decoding again.

This has a massive impact on the out-of-the-box performance of the system.
Given that, I was wondering whether we should set to default policy to "not
copying".

That is basically the behavior of the batch API, and there has so far never
been an issue with that (people running into the trap of overwritten
mutable elements).

What do you think?

Stephan


Re: Rethink the "always copy" policy for streaming topologies

2015-10-02 Thread Stephan Ewen
@Martin:

I think you were a user of the Batch API before we made the non-reuse mode
the default mode.
By now, when you use a GroupReduceFunction or a MapPartitionFunction or so,
you need not do any cloning or copying. All functions that receive groups
will always get fresh elements.

This chaining issue concerns only MapFunction (and FlatMapFunction) where
users keep lists to remember elements across invokations to the MapFunction.


On Fri, Oct 2, 2015 at 6:27 PM, Martin Neumann <mneum...@sics.se> wrote:

> It seems like I'm one of the few people that run into the mutable elements
> trap on the Batch API from time to time. At the moment I always clone when
> I'm not 100% sure to avoid hunting the bugs later. So far I was happy to
> learn that this is not a problem in Streaming, but that's just me.
>
> When working with groupby and partition functions, its easy to forget that
> there is one class per operator not per partition. So if you write your
> code in the state of mind that each partition is separate object reduce on
> operator level becomes really annoying.
> Since the mapping between partitions and operators is usually hidden, makes
> the debugging harder especially in cases where the test data produces a
> single partition per operator and the real deployment does not.
>
> *To summarize:*
> I'm not against reusing objects as long as there is something that helps
> ease the pitfalls. This could be coding guidelines, debugging tools or best
> practices.
>
>
> On Fri, Oct 2, 2015 at 5:53 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > Hi all!
> >
> > Now that we are coming to the next release, I wanted to make sure we
> > finalize the decision on that point, because it would be nice to not
> break
> > the behavior of system afterwards.
> >
> > Right now, when tasks are chained together, the system copies the
> elements
> > always between different tasks in the same chain.
> >
> > I think this policy was established under the assumption that copies do
> not
> > cost anything, given our own test examples, which mainly use immutable
> > types like Strings, boxed primitives, ..
> >
> > In practice, a lot of data types are actually quite expensive to copy.
> >
> > For example, a rather common data type in the event analysis of
> web-sources
> > is JSON Object.
> > Flink treats this as a generic type. Depending on its concrete
> > implementation, Kryo may have perform a serialization copy, which means
> > encoding into bytes (JSON encoding, charset encoding) and decoding again.
> >
> > This has a massive impact on the out-of-the-box performance of the
> system.
> > Given that, I was wondering whether we should set to default policy to
> "not
> > copying".
> >
> > That is basically the behavior of the batch API, and there has so far
> never
> > been an issue with that (people running into the trap of overwritten
> > mutable elements).
> >
> > What do you think?
> >
> > Stephan
> >
>


Re: Java type erasure and object reuse

2015-09-18 Thread Stephan Ewen
Good problem...

We were thinking for a while to make the input and output type serializers
available from the RuntimeContext.
That way you could call "T copy = serializer.copy(inValue)".

The "copy()" method on the copyable value is actually a good addition
nonetheless!


On Thu, Sep 17, 2015 at 10:31 PM, Greg Hogan  wrote:

> What is best practice for handling Java type erasure in user defined
> functions? Is there a means by which the TypeInformation can be accessed
> from a RichFunction? My temporary solution was to add a "T copy()" method
> to the CopyableValue interface.
>
> A common use case is a GroupReduceFunction that needs to collect objects.
> With object reuse we need to make a copy and with type erasure we cannot
> call new.
>
> Greg Hogan
>


Re: [jira] [Created] (FLINK-2695) KafkaITCase.testConcurrentProducerConsumerTopology failed on Travis

2015-09-23 Thread Stephan Ewen
The tests use a ZooKeeper mini cluster and multiple Kafka MiniClusters.

It appears that these are not very stable in our test setup. Let's see what
we can do to improve reliability there.

1) As a first step, I would suggest to reduce the number of concurrent
tests to one for this project, as it will prevent that we have multiple
concurrent mini clusters competing for compute resources.

2) The method "SimpleConsumerThread.getLastOffset()" Should probably
re-retrieve the leader, or we should allow the program more recovery
retries...

Greetings,
Stephan


On Wed, Sep 23, 2015 at 4:04 AM, Li, Chengxiang 
wrote:

> Found more KafkaITCase failure at:
> https://travis-ci.org/apache/flink/jobs/81592146
>
> Failed tests:
>
> KafkaITCase.testConcurrentProducerConsumerTopology:50->KafkaConsumerTestBase.runSimpleConcurrentProducerConsumerTopology:334->KafkaTestBase.tryExecute:313
> Test failed: The program execution failed: Job execution failed.
> Tests in error:
>
> KafkaITCase.testCancelingEmptyTopic:57->KafkaConsumerTestBase.runCancelingOnEmptyInputTest:594
> »
>
> KafkaITCase.testCancelingFullTopic:62->KafkaConsumerTestBase.runCancelingOnFullInputTest:529
> »
>
> KafkaITCase.testMultipleSourcesOnePartition:89->KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest:450
> » ProgramInvocation
>
> KafkaITCase.testOffsetInZookeeper:45->KafkaConsumerTestBase.runOffsetInZookeeperValidationTest:205->KafkaConsumerTestBase.writeSequence:938
> » ProgramInvocation
>
> KafkaITCase.testOneToOneSources:79->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:356
> » ProgramInvocation
>
> It happens only on the test mode of JDK: oraclejdk8
> PROFILE="-Dhadoop.version=2.5.0 -Dmaven.javadoc.skip=true".
>
> Thanks
> Chengxiang
>
> -Original Message-
> From: Till Rohrmann (JIRA) [mailto:j...@apache.org]
> Sent: Thursday, September 17, 2015 11:02 PM
> To: dev@flink.apache.org
> Subject: [jira] [Created] (FLINK-2695)
> KafkaITCase.testConcurrentProducerConsumerTopology failed on Travis
>
> Till Rohrmann created FLINK-2695:
> 
>
>  Summary: KafkaITCase.testConcurrentProducerConsumerTopology
> failed on Travis
>  Key: FLINK-2695
>  URL: https://issues.apache.org/jira/browse/FLINK-2695
>  Project: Flink
>   Issue Type: Bug
> Reporter: Till Rohrmann
> Priority: Critical
>
>
> The {{KafkaITCase.testConcurrentProducerConsumerTopology}} failed on
> Travis with
>
> {code}
> ---
>  T E S T S
> ---
> Running org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase
> Running org.apache.flink.streaming.connectors.kafka.KafkaITCase
> Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 14.296 sec
> - in org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase
> 09/16/2015 17:19:36 Job execution switched to status RUNNING.
> 09/16/2015 17:19:36 Source: Custom Source -> Sink: Unnamed(1/1)
> switched to SCHEDULED
> 09/16/2015 17:19:36 Source: Custom Source -> Sink: Unnamed(1/1)
> switched to DEPLOYING
> 09/16/2015 17:19:36 Source: Custom Source -> Sink: Unnamed(1/1)
> switched to RUNNING
> 09/16/2015 17:19:36 Source: Custom Source -> Sink: Unnamed(1/1)
> switched to FINISHED
> 09/16/2015 17:19:36 Job execution switched to status FINISHED.
> 09/16/2015 17:19:36 Job execution switched to status RUNNING.
> 09/16/2015 17:19:36 Source: Custom Source -> Map -> Flat Map(1/1)
> switched to SCHEDULED
> 09/16/2015 17:19:36 Source: Custom Source -> Map -> Flat Map(1/1)
> switched to DEPLOYING
> 09/16/2015 17:19:36 Source: Custom Source -> Map -> Flat Map(1/1)
> switched to RUNNING
> 09/16/2015 17:19:36 Source: Custom Source -> Map -> Flat Map(1/1)
> switched to FAILED
> java.lang.Exception: Could not forward element to next operator
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:171)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:332)
> at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:316)
> at
> 

Re: Build get stuck at BarrierBufferMassiveRandomTest

2015-09-23 Thread Stephan Ewen
I have pushed it, yes. If you rebase onto the latest master, it should work.

If you can verify that it still hangs, can you post a stack trace dump?

Thanks,
Stephan


On Wed, Sep 23, 2015 at 12:37 PM, Vasiliki Kalavri <
vasilikikala...@gmail.com> wrote:

> @Stephan, have you pushed that fix for SocketClientSinkTest? Local builds
> still hang for me :S
>
> On 21 September 2015 at 22:55, Vasiliki Kalavri <vasilikikala...@gmail.com
> >
> wrote:
>
> > Yes, you're right. BarrierBufferMassiveRandomTest has actually finished
> > :-)
> > Sorry for the confusion! I'll wait for your fix then, thanks!
> >
> > On 21 September 2015 at 22:51, Stephan Ewen <se...@apache.org> wrote:
> >
> >> I am actually very happy that it is not the
> >> "BarrierBufferMassiveRandomTest", that would be hell to debug...
> >>
> >> On Mon, Sep 21, 2015 at 10:51 PM, Stephan Ewen <se...@apache.org>
> wrote:
> >>
> >> > Ah, actually it is a different test. I think you got confused by the
> >> > sysout log, because multiple parallel tests print there (that makes it
> >> not
> >> > always obvious which one hangs).
> >> >
> >> > The test is the "SocketClientSinkTest.testSocketSinkRetryAccess()"
> test.
> >> > You can see that by looking in which test case the "main" thread is
> >> stuck,
> >> >
> >> > This test is very unstable, but, fortunately, I made a fix 1h ago and
> it
> >> > is being tested on Travis right now :-)
> >> >
> >> > Cheers,
> >> > Stephan
> >> >
> >> >
> >> >
> >> > On Mon, Sep 21, 2015 at 10:23 PM, Vasiliki Kalavri <
> >> > vasilikikala...@gmail.com> wrote:
> >> >
> >> >> Locally yes.
> >> >>
> >> >> Here's the stack trace:
> >> >>
> >> >>
> >> >> 2015-09-21 22:22:46
> >> >> Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.75-b04 mixed
> >> mode):
> >> >>
> >> >> "Attach Listener" daemon prio=5 tid=0x7ff9d104e800 nid=0x4013
> >> waiting
> >> >> on condition [0x]
> >> >>java.lang.Thread.State: RUNNABLE
> >> >>
> >> >> "Service Thread" daemon prio=5 tid=0x7ff9d3807000 nid=0x4c03
> >> runnable
> >> >> [0x]
> >> >>java.lang.Thread.State: RUNNABLE
> >> >>
> >> >> "C2 CompilerThread1" daemon prio=5 tid=0x7ff9d2001000 nid=0x4a03
> >> >> waiting on condition [0x]
> >> >>java.lang.Thread.State: RUNNABLE
> >> >>
> >> >> "C2 CompilerThread0" daemon prio=5 tid=0x7ff9d201e000 nid=0x4803
> >> >> waiting on condition [0x]
> >> >>java.lang.Thread.State: RUNNABLE
> >> >>
> >> >> "Signal Dispatcher" daemon prio=5 tid=0x7ff9d3012800 nid=0x451b
> >> >> runnable [0x]
> >> >>java.lang.Thread.State: RUNNABLE
> >> >>
> >> >> "Finalizer" daemon prio=5 tid=0x7ff9d4005800 nid=0x3303 in
> >> >> Object.wait() [0x00011430d000]
> >> >>java.lang.Thread.State: WAITING (on object monitor)
> >> >> at java.lang.Object.wait(Native Method)
> >> >> - waiting on <0x0007ef504858> (a
> java.lang.ref.ReferenceQueue$Lock)
> >> >> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
> >> >> - locked <0x0007ef504858> (a java.lang.ref.ReferenceQueue$Lock)
> >> >> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
> >> >> at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
> >> >>
> >> >> "Reference Handler" daemon prio=5 tid=0x7ff9d480b000 nid=0x3103
> in
> >> >> Object.wait() [0x00011420a000]
> >> >>java.lang.Thread.State: WAITING (on object monitor)
> >> >> at java.lang.Object.wait(Native Method)
> >> >> - waiting on <0x0007ef504470> (a java.lang.ref.Reference$Lock)
> >> >> at java.lang.Object.wait(Object.java:503)
> >> >> at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
> >> >> - locked <0x0007ef504470> (a jav

Re: Build get stuck at BarrierBufferMassiveRandomTest

2015-09-23 Thread Stephan Ewen
:309)
> > at
> >
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
> > at
> >
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
> > at
> >
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
> > at
> >
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
> > at
> >
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
> > at
> >
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
> > at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> >
> > "VM Thread" prio=5 tid=0x7faebb82e800 nid=0x2f03 runnable
> >
> > "GC task thread#0 (ParallelGC)" prio=5 tid=0x7faeb9806800 nid=0x1e03
> > runnable
> >
> > "GC task thread#1 (ParallelGC)" prio=5 tid=0x7faebb00 nid=0x2103
> > runnable
> >
> > "GC task thread#2 (ParallelGC)" prio=5 tid=0x7faebb001000 nid=0x2303
> > runnable
> >
> > "GC task thread#3 (ParallelGC)" prio=5 tid=0x7faebb001800 nid=0x2503
> > runnable
> >
> > "GC task thread#4 (ParallelGC)" prio=5 tid=0x7faebb002000 nid=0x2703
> > runnable
> >
> > "GC task thread#5 (ParallelGC)" prio=5 tid=0x7faebb002800 nid=0x2903
> > runnable
> >
> > "GC task thread#6 (ParallelGC)" prio=5 tid=0x7faebb003800 nid=0x2b03
> > runnable
> >
> > "GC task thread#7 (ParallelGC)" prio=5 tid=0x7faeb9809000 nid=0x2d03
> > runnable
> >
> > "VM Periodic Task Thread" prio=5 tid=0x7faeb980e000 nid=0x4f03
> waiting
> > on condition
> >
> > JNI global references: 195
> >
> >
> >
> >
> > On 23 September 2015 at 13:35, Stephan Ewen <se...@apache.org> wrote:
> >
> >> I have pushed it, yes. If you rebase onto the latest master, it should
> >> work.
> >>
> >> If you can verify that it still hangs, can you post a stack trace dump?
> >>
> >> Thanks,
> >> Stephan
> >>
> >>
> >> On Wed, Sep 23, 2015 at 12:37 PM, Vasiliki Kalavri <
> >> vasilikikala...@gmail.com> wrote:
> >>
> >>> @Stephan, have you pushed that fix for SocketClientSinkTest? Local
> builds
> >>> still hang for me :S
> >>>
> >>> On 21 September 2015 at 22:55, Vasiliki Kalavri <
> >> vasilikikala...@gmail.com
> >>>>
> >>> wrote:
> >>>
> >>>> Yes, you're right. BarrierBufferMassiveRandomTest has actually
> finished
> >>>> :-)
> >>>> Sorry for the confusion! I'll wait for your fix then, thanks!
> >>>>
> >>>> On 21 September 2015 at 22:51, Stephan Ewen <se...@apache.org> wrote:
> >>>>
> >>>>> I am actually very happy that it is not the
> >>>>> "BarrierBufferMassiveRandomTest", that would be hell to debug...
> >>>>>
> >>>>> On Mon, Sep 21, 2015 at 10:51 PM, Stephan Ewen <se...@apache.org>
> >>> wrote:
> >>>>>
> >>>>>> Ah, actually it is a different test. I think you got confused by the
> >>>>>> sysout log, because multiple parallel tests print there (that makes
> >> it
> >>>>> not
> >>>>>> always obvious which one hangs).
> >>>>>>
> >>>>>> The test is the "SocketClientSinkTest.testSocketSinkRetryAccess()"
> >>> test.
> >>>>>> You can see that by looking in which test case the "main" thread is
> >>>>> stuck,
> >>>>>>
> >>>>>> This test is very unstable, but, fortunately, I made a fix 1h ago
> >> and
> >>> it
> >>>>>> is being tested on Travis right now :-)
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Stephan
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Sep 21, 2015 at 10:23 PM, Vasiliki Kalavri <
> >>>>>> vasilikikala...@gmail.com> wrote:
> >>>>>>
> >>>>>>> Locally yes.
> >>>>>>>
> >>>>>>> Here's the stack tr

Re: Extending and improving our "How to contribute" page

2015-09-23 Thread Stephan Ewen
Thanks, Fabian for driving this!

I agree with your points.

Concerning Vasia's comment to not raise the bar too high:
That is true, the requirements should be reasonable. We can definitely tag
issues as "simple" which means they do not require a design document. That
should be more for new features and needs not be very detailed.

We could also make the inverse, meaning we explicitly tag certain issues as
"requires design document".

Greetings,
Stephan




On Wed, Sep 23, 2015 at 5:05 PM, Vasiliki Kalavri  wrote:

> Hi,
>
> I agree with you Fabian. Clarifying these issues in the "How to Contribute"
> guide will save lots of time both to reviewers and contributors. It is a
> really disappointing situation when someone spends time implementing
> something and their PR ends up being rejected because either the feature
> was not needed or the implementation details were never agreed on.
>
> That said, I think we should also make sure that we don't raise the bar too
> high for simple contributions.
>
> Regarding (1) and (2), I think we should clarify what kind of
> additions/changes require this process to be followed. e.g. do we need to
> discuss additions for which JIRAs already exist? Ideas described in the
> roadmaps? Adding a new algorithm to Gelly/Flink-ML?
>
> Regarding (3), maybe we can all suggest some examples/patterns that we've
> seen when reviewing PRs and then choose the most common (or all).
>
> (4) sounds good to me.
>
> Cheers,
> Vasia.
>
> On 23 September 2015 at 15:08, Kostas Tzoumas  wrote:
>
> > Big +1.
> >
> > For (1), a discussion in JIRA would also be an option IMO
> >
> > For (2), let us come up with few examples on what constitutes a feature
> > that needs a design doc, and what should be in the doc (IMO
> > architecture/general approach, components touched, interfaces changed)
> >
> >
> >
> > On Wed, Sep 23, 2015 at 2:24 PM, Fabian Hueske 
> wrote:
> >
> > > Hi everybody,
> > >
> > > I guess we all have noticed that the Flink community is quickly growing
> > and
> > > more and more contributions are coming in. Recently, a few
> contributions
> > > proposed new features without being discussed on the mailing list. Some
> > of
> > > these contributions were not accepted in the end. In other cases, pull
> > > requests had to be heavily reworked because the approach taken was not
> > the
> > > best one. These are situations which should be avoided because both the
> > > contributor as well as the person who reviewed the contribution
> invested
> > a
> > > lot of time for nothing.
> > >
> > > I had a look at our “How to contribute” and “Coding guideline” pages
> and
> > > think, we can improve them. I see basically two issues:
> > >
> > >   1. The documents do not explain how to propose and discuss new
> features
> > > and improvements.
> > >   2. The documents are quite technical and the structure could be
> > improved,
> > > IMO.
> > >
> > > I would like to improve these pages and propose the following
> additions:
> > >
> > >   1. Request contributors and committers to start discussions on the
> > > mailing list for new features. This discussion should help to figure
> out
> > > whether such a new feature is a good fit for Flink and give first
> > pointers
> > > for a design to implement it.
> > >   2. Require contributors and committers to write design documents for
> > all
> > > new features and major improvements. These documents should be attached
> > to
> > > a JIRA issue and follow a template which needs to be defined.
> > >   3. Extend the “Coding Style Guides” and add patterns that are
> commonly
> > > remarked in pull requests.
> > >   4. Restructure the current pages into three pages: a general guide
> for
> > > contributions and two guides for how to contribute to code and website
> > with
> > > all technical issues (repository, IDE setup, build system, etc.)
> > >
> > > Looking forward for your comments,
> > > Fabian
> > >
> >
>


Re: Flink's Checking and uploading JAR files Issue

2015-09-24 Thread Stephan Ewen
I think there is not yet any mechanism, but it would be a good addition, I
agree.

Between JobManager and TaskManagers, the JARs are cached. The TaskManagers
receive hashes of the JARs only, and only load them if they do not already
have them. The same mechanism should be used for the Client to upload JARs
to the JobManager - that way, they would be transferred only once.

For now, a workaround is to directly put the user JARs into the "lib"
directory of the flink directory. That way they are available to every
worker without and uploads per job. Your RemoteExecutionEnvironment would
then not have any JARs at all.

Would the workaround work for you for now?

Greetings,
Stephan


On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer  wrote:

> Hello All
>
> I use Flink in order to filter data from Hdfs and write it back as CSV.
>
> I keep getting the "Checking and uploading JAR files" on every DataSet
> filtering action or
> executionEnvironment execution.
>
> I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I
> launch Flink from
> a J2EE Aplication Server .
>
> The Jars serialization and transportation takes a huge part of the
> execution time .
> Is there a way to force Flink to pass the Jars only once?
>
> Please advise
>
> Thanks,
>
> Hanan Meyer
>


Re: Build get stuck at BarrierBufferMassiveRandomTest

2015-09-23 Thread Stephan Ewen
Turned out it was due to different behavior of Sockets under Ubuntu
(Debian) and OS/X (BSD)

That why it did not happen for Travis and me...

+1 for OS diversity among committers :-)

On Wed, Sep 23, 2015 at 7:50 PM, Ufuk Celebi <u...@apache.org> wrote:

> I’ve pushed a fix.
>
> > On 23 Sep 2015, at 16:28, Paris Carbone <par...@kth.se> wrote:
> >
> > It hangs for me too at the same test when doing "clean verify"
> >
> >> On 23 Sep 2015, at 16:09, Stephan Ewen <se...@apache.org> wrote:
> >>
> >> Okay, will look into this is a bit today...
> >>
> >> On Wed, Sep 23, 2015 at 4:04 PM, Ufuk Celebi <u...@apache.org> wrote:
> >>
> >>> Same here.
> >>>
> >>>> On 23 Sep 2015, at 13:50, Vasiliki Kalavri <vasilikikala...@gmail.com
> >
> >>> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> It's the latest master I'm trying to build, but it still hangs.
> >>>> Here's the trace:
> >>>>
> >>>> -
> >>>> 2015-09-23 13:48:41
> >>>> Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.75-b04 mixed
> >>> mode):
> >>>>
> >>>> "Attach Listener" daemon prio=5 tid=0x7faeb984a000 nid=0x3707
> waiting
> >>>> on condition [0x]
> >>>> java.lang.Thread.State: RUNNABLE
> >>>>
> >>>> "Service Thread" daemon prio=5 tid=0x7faeb9808000 nid=0x4d03
> runnable
> >>>> [0x]
> >>>> java.lang.Thread.State: RUNNABLE
> >>>>
> >>>> "C2 CompilerThread1" daemon prio=5 tid=0x7faebb00e800 nid=0x4b03
> >>>> waiting on condition [0x]
> >>>> java.lang.Thread.State: RUNNABLE
> >>>>
> >>>> "C2 CompilerThread0" daemon prio=5 tid=0x7faebb840800 nid=0x4903
> >>>> waiting on condition [0x]
> >>>> java.lang.Thread.State: RUNNABLE
> >>>>
> >>>> "Signal Dispatcher" daemon prio=5 tid=0x7faeba806800 nid=0x3d0f
> >>>> runnable [0x]
> >>>> java.lang.Thread.State: RUNNABLE
> >>>>
> >>>> "Finalizer" daemon prio=5 tid=0x7faebb836800 nid=0x3303 in
> >>>> Object.wait() [0x00014eff8000]
> >>>> java.lang.Thread.State: WAITING (on object monitor)
> >>>> at java.lang.Object.wait(Native Method)
> >>>> - waiting on <0x000138a84858> (a
> java.lang.ref.ReferenceQueue$Lock)
> >>>> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
> >>>> - locked <0x000138a84858> (a java.lang.ref.ReferenceQueue$Lock)
> >>>> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
> >>>> at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
> >>>>
> >>>> "Reference Handler" daemon prio=5 tid=0x7faebb004000 nid=0x3103 in
> >>>> Object.wait() [0x00014eef5000]
> >>>> java.lang.Thread.State: WAITING (on object monitor)
> >>>> at java.lang.Object.wait(Native Method)
> >>>> - waiting on <0x000138a84470> (a java.lang.ref.Reference$Lock)
> >>>> at java.lang.Object.wait(Object.java:503)
> >>>> at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
> >>>> - locked <0x000138a84470> (a java.lang.ref.Reference$Lock)
> >>>>
> >>>> "main" prio=5 tid=0x7faeb9009800 nid=0xd03 runnable
> >>> [0x00010f1c]
> >>>> java.lang.Thread.State: RUNNABLE
> >>>> at java.net.PlainSocketImpl.socketAccept(Native Method)
> >>>> at
> >>>
> java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
> >>>> at java.net.ServerSocket.implAccept(ServerSocket.java:530)
> >>>> at java.net.ServerSocket.accept(ServerSocket.java:498)
> >>>> at
> >>>>
> >>>
> org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.testSocketSinkRetryAccess(SocketClientSinkTest.java:315)
> >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>> at
> >>>>
> >>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >>>> at
> >

Re: [Proposal] Create a separate sub module for benchmark test

2015-09-22 Thread Stephan Ewen
Sounds like a nice idea!

Do you want to make this a new maven project as part of the Flink
repository, or create a dedicated repository for that?

BTW: We are currently not mixing microbenchmarks with test execution. The
code for these benchmarks resides in the test scope of the projects (so it
is not packaged), but it is not executed as part of the UnitTests or
IntegrationTests.

Greetings,
Stephan


On Tue, Sep 22, 2015 at 12:22 PM, Li, Chengxiang 
wrote:

> Hi, folks
> During work on Flink, I found several micro benchmarks which come from
> different modules, these benchmarks measure on manual, annotated with Junit
> annotations, so they got executed during unit test as well. There are some
> shortage on current implementation:
>
> 1.   Benchmark test performance instead of feature, and normally, it
> takes much more time than unit test. Mixed benchmark with unit test would
> expand the CI check time.
>
> 2.   With mixed with other tests, no warm up, no standalone process...
> these benchmarks result may not very accurate.
> Although looks easy, there are actually many pitfalls about benchmark, so
> I suggest we create a new sub module for all benchmark test, and import JMH(
> http://openjdk.java.net/projects/code-tools/jmh/) as the benchmark
> framework. With the help of JMH, we should get:
>
> 1.   More available metrics.
>
> 2.   More accurate result.
>
> 3.   Focus on benchmark logic only, no need to worry about measure
> logic.
>
> Thanks
> Chengxiang
>


Re: Build get stuck at BarrierBufferMassiveRandomTest

2015-09-21 Thread Stephan Ewen
I am actually very happy that it is not the
"BarrierBufferMassiveRandomTest", that would be hell to debug...

On Mon, Sep 21, 2015 at 10:51 PM, Stephan Ewen <se...@apache.org> wrote:

> Ah, actually it is a different test. I think you got confused by the
> sysout log, because multiple parallel tests print there (that makes it not
> always obvious which one hangs).
>
> The test is the "SocketClientSinkTest.testSocketSinkRetryAccess()" test.
> You can see that by looking in which test case the "main" thread is stuck,
>
> This test is very unstable, but, fortunately, I made a fix 1h ago and it
> is being tested on Travis right now :-)
>
> Cheers,
> Stephan
>
>
>
> On Mon, Sep 21, 2015 at 10:23 PM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Locally yes.
>>
>> Here's the stack trace:
>>
>>
>> 2015-09-21 22:22:46
>> Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.75-b04 mixed mode):
>>
>> "Attach Listener" daemon prio=5 tid=0x7ff9d104e800 nid=0x4013 waiting
>> on condition [0x]
>>java.lang.Thread.State: RUNNABLE
>>
>> "Service Thread" daemon prio=5 tid=0x7ff9d3807000 nid=0x4c03 runnable
>> [0x]
>>java.lang.Thread.State: RUNNABLE
>>
>> "C2 CompilerThread1" daemon prio=5 tid=0x7ff9d2001000 nid=0x4a03
>> waiting on condition [0x]
>>java.lang.Thread.State: RUNNABLE
>>
>> "C2 CompilerThread0" daemon prio=5 tid=0x7ff9d201e000 nid=0x4803
>> waiting on condition [0x]
>>java.lang.Thread.State: RUNNABLE
>>
>> "Signal Dispatcher" daemon prio=5 tid=0x7ff9d3012800 nid=0x451b
>> runnable [0x]
>>java.lang.Thread.State: RUNNABLE
>>
>> "Finalizer" daemon prio=5 tid=0x7ff9d4005800 nid=0x3303 in
>> Object.wait() [0x00011430d000]
>>java.lang.Thread.State: WAITING (on object monitor)
>> at java.lang.Object.wait(Native Method)
>> - waiting on <0x0007ef504858> (a java.lang.ref.ReferenceQueue$Lock)
>> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
>> - locked <0x0007ef504858> (a java.lang.ref.ReferenceQueue$Lock)
>> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
>> at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
>>
>> "Reference Handler" daemon prio=5 tid=0x7ff9d480b000 nid=0x3103 in
>> Object.wait() [0x00011420a000]
>>java.lang.Thread.State: WAITING (on object monitor)
>> at java.lang.Object.wait(Native Method)
>> - waiting on <0x0007ef504470> (a java.lang.ref.Reference$Lock)
>> at java.lang.Object.wait(Object.java:503)
>> at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
>> - locked <0x0007ef504470> (a java.lang.ref.Reference$Lock)
>>
>> "main" prio=5 tid=0x7ff9d480 nid=0xd03 runnable
>> [0x00010b764000]
>>java.lang.Thread.State: RUNNABLE
>> at java.net.PlainSocketImpl.socketAccept(Native Method)
>> at
>> java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
>> at java.net.ServerSocket.implAccept(ServerSocket.java:530)
>> at java.net.ServerSocket.accept(ServerSocket.java:498)
>> at
>>
>> org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.testSocketSinkRetryAccess(SocketClientSinkTest.java:315)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>>
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>> at
>>
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>>
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>> at
>>
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>> at
>>
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>> at
>>
>> org.junit.runners.BlockJUnit4ClassRunner.r

Re: Build get stuck at BarrierBufferMassiveRandomTest

2015-09-21 Thread Stephan Ewen
eWithRerun(JUnit4Provider.java:173)
> at
>
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
> at
>
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
> at
>
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
> at
>
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
> at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
>
> "VM Thread" prio=5 tid=0x7ff9d4005000 nid=0x2f03 runnable
>
> "GC task thread#0 (ParallelGC)" prio=5 tid=0x7ff9d2005800 nid=0x1f03
> runnable
>
> "GC task thread#1 (ParallelGC)" prio=5 tid=0x7ff9d180 nid=0x2103
> runnable
>
> "GC task thread#2 (ParallelGC)" prio=5 tid=0x7ff9d1804800 nid=0x2303
> runnable
>
> "GC task thread#3 (ParallelGC)" prio=5 tid=0x00007ff9d1805000 nid=0x2503
> runnable
>
> "GC task thread#4 (ParallelGC)" prio=5 tid=0x7ff9d1805800 nid=0x2703
> runnable
>
> "GC task thread#5 (ParallelGC)" prio=5 tid=0x7ff9d1806800 nid=0x2903
> runnable
>
> "GC task thread#6 (ParallelGC)" prio=5 tid=0x7ff9d1807000 nid=0x2b03
> runnable
>
> "GC task thread#7 (ParallelGC)" prio=5 tid=0x7ff9d1807800 nid=0x2d03
> runnable
>
> "VM Periodic Task Thread" prio=5 tid=0x7ff9d1006000 nid=0x4e03 waiting
> on condition
>
> JNI global references: 193
>
>
> On 21 September 2015 at 22:13, Stephan Ewen <se...@apache.org> wrote:
>
> > This happened locally on your machine?
> >
> > Can you dump the stack-trace and post it? "jps  >
> > stacktrace.txt" or so...
> >
> > On Mon, Sep 21, 2015 at 10:09 PM, Vasiliki Kalavri <
> > vasilikikala...@gmail.com> wrote:
> >
> > > Hi squirrels,
> > >
> > > I've been meaning to merge a PR (#1520), but my local maven build gets
> > > stuck at
> > > org.apache.flink.streaming.runtime.io.BarrierBufferMassiveRandomTest.
> > > It looks like a deadlock.. The build just hangs there and top shows no
> > > CPU/memory load. Anyone else has experienced the same? I'm on OS X
> 10.10.
> > >
> > > Thanks!
> > > -Vasia.
> > >
> >
>


Tests in the streaming API

2015-09-19 Thread Stephan Ewen
Hi!

I just saw that all tests in the streaming API are declared as Unit tests,
even though the vast majority are integration tests (spawn mini clusters).

That leads to problems, because the streaming test mini clusters do not
properly clean up after themselves and unit tests reuse JVMs (for speed,
they are expected to be quick).

The streaming API is under quite some rework anyways right now, for all new
and changes tests, we need to absolutely fix this!

Greetings,
Stephan


Re: Task Parallelism in a Cluster

2015-12-08 Thread Stephan Ewen
Hi Ali!

In the case you have, the sequence of source-map-filter ... forms a
pipeline.

You mentioned that you set the parallelism to 16, so there should be 16
pipelines. These pipelines should be completely independent.

Looking at the way the scheduler is implemented, independent pipelines
should be spread across machines. But when you execute that in parallel,
you say all 16 pipelines end up on the same machine?

Can you share with us the rough code of your program? Or a Screenshot from
the runtime dashboard that shows the program graph?


If your cluster is basically for that one job only, you could try and set
the number of slots to 4 for each machine. Then you have 16 slots in total
and each node would run one of the 16 pipelines.


Greetings,
Stephan


On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:

> There is no shuffle operation in my flow. Mine actually looks like this:
>
> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map ->
> Map, Filter)
>
>
> Maybe it’s treating this whole flow as one pipeline and assigning it to a
> slot. What I really wanted was to have the custom source I built to have
> running instances on all nodes. I’m not really sure if that’s the right
> approach, but if we could add this as a feature that’d be great, since
> having more than one node running the same pipeline guarantees the
> pipeline is never offline.
>
> -Ali
>
> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrm...@apache.org> wrote:
>
> >If I'm not mistaken, then the scheduler has already a preference to spread
> >independent pipelines out across the cluster. At least he uses a queue of
> >instances from which it pops the first element if it allocates a new slot.
> >This instance is then appended to the queue again, if it has some
> >resources
> >(slots) left.
> >
> >I would assume that you have a shuffle operation involved in your job such
> >that it makes sense for the scheduler to deploy all pipelines to the same
> >machine.
> >
> >Cheers,
> >Till
> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <se...@apache.org> wrote:
> >
> >> Slots are like "resource groups" which execute entire pipelines. They
> >> frequently have more than one operator.
> >>
> >> What you can try as a workaround is decrease the number of slots per
> >> machine to cause the operators to be spread across more machines.
> >>
> >> If this is a crucial issue for your use case, it should be simple to
> >>add a
> >> "preference to spread out" to the scheduler...
> >>
> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <ali.kash...@emc.com>
> >>wrote:
> >>
> >> > Is there a way to make a task cluster-parallelizable? I.e. Make sure
> >>the
> >> > parallel instances of the task are distributed across the cluster.
> >>When I
> >> > run my flink job with a parallelism of 16, all the parallel tasks are
> >> > assigned to the first task manager.
> >> >
> >> > - Ali
> >> >
> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <u...@apache.org> wrote:
> >> >
> >> > >
> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <ali.kash...@emc.com>
> wrote:
> >> > >> Do the parallel instances of each task get distributed across the
> >> > >>cluster or is it possible that they all run on the same node?
> >> > >
> >> > >Yes, slots are requested from all nodes of the cluster. But keep in
> >>mind
> >> > >that multiple tasks (forming a local pipeline) can be scheduled to
> >>the
> >> > >same slot (1 slot can hold many tasks).
> >> > >
> >> > >Have you seen this?
> >> > >
> >> >
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
> >>b
> >> > >_scheduling.html
> >> > >
> >> > >> If they can all run on the same node, what happens when that node
> >> > >>crashes? Does the job manager recreate them using the remaining open
> >> > >>slots?
> >> > >
> >> > >What happens: The job manager tries to restart the program with the
> >>same
> >> > >parallelism. Thus if you have enough free slots available in your
> >> > >cluster, this works smoothly (so yes, the remaining/available slots
> >>are
> >> > >used)
> >> > >
> >> > >With a YARN cluster the task manager containers are restarted
> >> > >automatically. In standalone mode, you have to take care of this
> >> yourself.
> >> > >
> >> > >
> >> > >Does this help?
> >> > >
> >> > >­ Ufuk
> >> > >
> >> >
> >> >
> >>
>
>


Re: Lack of review on PRs

2015-12-06 Thread Stephan Ewen
Hi Sachin!

Thank you for honestly speaking your mind on this issue.
I agree that handling ML pull requests is particularly challenging right
now for various reasons.

There are currently very few active committers that have the background to
actually review these pull requests. The whole work basically hinges on
Till and Chiwan. As far as I know, both of them work on many things, and
have only a certain time they can spend on ML. With regard to ML, Flink is
simply a bit understaffed.

The ML parts hard hard to develop. They require people who understand math,
know how to write good code, and can think deeply into the parallel systems
aspects of Flink. A lot of ML pull requests had to be review multiple times
because they did not bring these aspects together.

Finally, the ML pull requests are very time consuming to review, because
the reviewer usually has to learn the implemented algorithm (or the
parallel adaption) before being able to properly review it. As a result,
while most other pull requests can be reviewed with a bit of time next to
another weekly agenda, ML pull requests need a lot of dedicated time.


The things I see that can really improve the situation are these:

1) Try to get more people in the community to help bring these pull
requests into shape. While the group of ML-savvy committers is still small,
it would be good if contributors could help each other out there are well,
by reviewing the pull requests, and helping to get them into shape. Pull
requests that are high quality by the time one of the committers looks over
them can be merged relatively quickly.

2) Honestly reject some pull requests. Focusing the time on the promising
ML pull requests and earlier rejecting pull requests that are far away from
good shape. Giving the rejected pull requests some coarser feedback about
what categories to improve, rather than detailed comments on each part.
While it is nice to try and shepherd as many pull requests in as possible
(and the community really tries to do that), it may not be possible in that
area, as long as there is still not enough ML people.

Maybe Till and Chiwan can share their thoughts on this.


Greetings,
Stephan


On Sun, Dec 6, 2015 at 5:44 PM, Chiwan Park  wrote:

> Hi Sachin,
>
> I’m sorry for your unsatisfied experience about lack of review.
>
> As you know, because there are few committers (including me) whose
> interest is ML, the review of PRs could be completed slowly. But I admit my
> fault about leaving the PRs unreviewed for 3-5 months.
>
> I’ll try to review your PRs as soon as possible.
>
> > On Dec 6, 2015, at 1:00 AM, Sachin Goel 
> wrote:
> >
> > Hi all
> > Sorry about a weekend email.
> >
> > This email is to express my displeasure over the lack of any review on my
> > PRs on extending the ML library. Five of my PRs have been without any
> > review for times varying from 3-5 months now.
> > When I took up the task of extending the ML library by implementing core
> > algorithms such as Decision Tree and k-means clustering [with several
> > initialization schemes], I had hoped that the community will be actively
> > involved in it since ML is a very important component of any big data
> > system these days. However, it appears I have been wrong.
> > Surely, the initial reviews required a lot of changes from my side over
> > coding style mistakes [first time programmer in Scala], and
> > less-than-optimal implementations. I like to think that I have learned a
> > lot about maintaining better coding style compatible with Flink code
> base,
> > and spending time to optimize my work due to this.
> > However, if a PR requires work, that doesn't automatically disqualify it
> > from being reviewed actively, since the author has spent a lot of time on
> > it and has voluntarily taken up the task of contributing.
> >
> > Machine learning is my core area of interest and I am able to contribute
> > much more to the library; however, a lack of review after repeated
> > reminders automatically discourages me from picking up more issues.
> >
> > However minor some of my commits maybe, I have been actively involved in
> > the development work [with a total of 29 commits.]. I have also spent a
> lot
> > of time in release testing and diagnosing-slash-fixing lots of issues
> with
> > Web Dashboard. However, as with any contributor, my main goal is to
> > contribute to my area of interest, while also diversifying my work by
> > fixing other issues.
> >
> > The PRs are 710, 757, 861, 918 and 1032. I propose the following order
> for
> > anyone who wants to review my work:
> > 1032 [very simple feature.]
> > 918 [very short PR]
> > 861 [followed by 710 after a complete rebase] [major work for Histograms
> > and Decision Trees]
> > 757 [major work for K-Means clustering and initialization schemes]
> >
> > If I have come across as rude, I apologize.
> >
> > Happy reviewing and thanks for bearing with me. :)
> >
> > Cheers!
> > 

Re: Monitoring backpressure

2015-12-07 Thread Stephan Ewen
I discussed about this quite a bit with other people.

It is not totally straightforward. One could try and measure exhaustion of
the output buffer pools, but that fluctuates a lot - it would need some
work to get a stable metric from that...

If you have a profiler that you can attach to the processes, you could
check whether a lot of time is spent within the "requestBufferBlocking()"
method of the buffer pool...

Stephan


On Mon, Dec 7, 2015 at 9:45 AM, Gyula Fóra  wrote:

> Hey guys,
>
> Is there any way to monitor the backpressure in the Flink job? I find it
> hard to debug slow operators because of the backpressure mechanism so it
> would be good to get some info out of the network layer on what exactly
> caused the backpressure.
>
> For example:
>
> task1 -> task2 -> task3 -> task4
>
> I want to figure out whether task 2 or task 3 is slow.
>
> Any ideas?
>
> Thanks,
> Gyula
>


Re: flink-dist packaging including unshaded classes

2015-12-09 Thread Stephan Ewen
Hi!

Did you change anything in the POM files, with respect to Guava, or add
another dependency that might transitively pull Guava?

Stephan


On Tue, Dec 8, 2015 at 9:25 PM, Nick Dimiduk  wrote:

> Hi there,
>
> I'm attempting to build locally a flink based on release-0.10.0 +
> FLINK-3147. When I build from this sandbox, the resulting flink-dist.jar
> contains both shanded and unshaded jars. In my case, this results in a
> runtime conflict in my application, where com.google.common.base.Stopwatch
> from both Guava-12 and Guava-18 are in my classpath.
>
> Is there some additional profile required to build a dist package with only
> the shaded jars?
>
> Thanks,
> Nick
>
> $ tar xvzf flink-0.10.0-bin-hadoop27-scala_2.11.tgz
> $ cd flink-0.10.0
> $ $ unzip -t lib/flink-dist_2.11-0.10.0.jar | grep Stopwatch
> testing:
> org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class   OK
> testing: org/apache/flink/shaded/com/google/common/base/Stopwatch.class
>   OK
> testing:
>
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
>   OK
> testing:
>
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
>   OK
> testing:
> org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class
>  OK
> testing:
> org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class   OK
> testing: com/google/inject/internal/util/$Stopwatch.class   OK
>
> vs.
>
> $ git status
> HEAD detached from release-0.10.0
> $ git log --decorate=short --oneline | head -n3
> dccdbd8 (HEAD) [FLINK-3147] HadoopOutputFormatBase should expose fields as
> protected
> ab2cca4 (tag: release-0.10.0, origin/release-0.10.0-rc8,
> ndimiduk/release-0.10.0-rc8) Commit for release 0.10.0
> c0fe305 [FLINK-2992] Remove use of SerializationUtils
> $ mvn clean install -DskipTests
> ...
> $ cd flink-dist/target/flink-0.10.0-bin/flink-0.10.0
> $ unzip -t lib/flink-dist-0.10.0.jar | grep Stopwatch
> testing:
> org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class   OK
> testing: org/apache/flink/shaded/com/google/common/base/Stopwatch.class
>   OK
> testing:
>
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
>   OK
> testing:
>
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
>   OK
> testing:
> org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class
>  OK
> testing:
> org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class   OK
> testing: com/google/inject/internal/util/$Stopwatch.class   OK
> testing: com/google/common/base/Stopwatch$1.class   OK
> testing: com/google/common/base/Stopwatch.class   OK
> testing:
> com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
>  OK
> testing:
> com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class   OK
>


Re: Task Parallelism in a Cluster

2015-12-11 Thread Stephan Ewen
Hi Ali!

I see, so the tasks 192.168.200.174 and 192.168.200.175 apparently do not
make progress, even do not recognize the end-of-stream point.

I expect that the streams on 192.168.200.174 and 192.168.200.175 are
back-pressured to a stand-still. Since no network is involved, the reason
for the back pressure are probably the sinks.

What kind of data sink are you using (in the addSink()) function?
Can you check if that one starts to fully block on machines
192.168.200.174 and 192.168.200.175 ?

Greetings,
Stephan



On Fri, Dec 11, 2015 at 4:50 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:

> Hi Stephan,
>
> I got a request to share the image with someone and I assume it was you.
> You should be able to see it now. This seems to be the main issue I have
> at this time. I've tried running the job on the cluster with a parallelism
> of 16, 24, 36, and even went up to 48. I see all the parallel pipelines
> working for a bit and then some of them just stop, I’m not sure if they’re
> stuck or not. Here’s another screenshot:
> http://postimg.org/image/gr6ogxqjj/
>
> Two things you’ll notice:
> 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing
> anything at one point and only 192.168.200.173 is doing all the work.
> 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end time
> even though the job should be finished (the screenshot was taken after the
> source was closed).
>
> I’m not sure if this helps or not, but here are some properties from the
> flink-conf.yaml:
>
> jobmanager.heap.mb: 8192
> taskmanager.heap.mb: 49152
> taskmanager.numberOfTaskSlots: 16
> parallelism.default: 1
>
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints
>
> taskmanager.network.numberOfBuffers: 3072
>
> recovery.mode: zookeeper
> recovery.zookeeper.quorum:
> 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181
> recovery.zookeeper.storageDir: file:///tmp/zk-recovery
> recovery.zookeeper.path.root: /opt/flink-0.10.0
>
> I appreciate all the help.
>
>
> Thanks,
> Ali
>
>
> On 2015-12-10, 10:16 AM, "Stephan Ewen" <se...@apache.org> wrote:
>
> >Hi Ali!
> >
> >Seems like the Google Doc has restricted access, I tells me I have no
> >permission to view it...
> >
> >Stephan
> >
> >
> >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
> >
> >> Hi Stephan,
> >>
> >> Here’s a link to the screenshot I tried to attach earlier:
> >>
> >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
> >>
> >> It looks to me like the distribution is fairly skewed across the nodes,
> >> even though they’re executing the same pipeline.
> >>
> >> Thanks,
> >> Ali
> >>
> >>
> >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <se...@apache.org> wrote:
> >>
> >> >Hi!
> >> >
> >> >The parallel socket source looks good.
> >> >I think you forgot to attach the screenshot, or the mailing list
> >>dropped
> >> >the attachment...
> >> >
> >> >Not sure if I can diagnose that without more details. The sources all
> >>do
> >> >the same. Assuming that the server distributes the data evenly across
> >>all
> >> >connected sockets, and that the network bandwidth ends up being
> >>divided in
> >> >a fair way, all pipelines should run be similarly "eager".
> >> >
> >> >Greetings,
> >> >Stephan
> >> >
> >> >
> >> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <ali.kash...@emc.com>
> >>wrote:
> >> >
> >> >> Hi Stephan,
> >> >>
> >> >> That was my original understanding, until I realized that I was not
> >> >>using
> >> >> a parallel socket source. I had a custom source that extended
> >> >> SourceFunction which always runs with parallelism = 1. I looked
> >>through
> >> >> the API and found the ParallelSourceFunction interface so I
> >>implemented
> >> >> that and voila, now all 3 nodes in the cluster are actually receiving
> >> >> traffic on socket connections.
> >> >>
> >> >> Now that I’m running it successfully end to end, I’m trying to
> >>improve
> >> >>the
> >> >> performance. Can you take a look at the attached screen shot and
> >>tell me
> >> >> if the distribution of wor

Re: Apache Tinkerpop & Geode Integration?

2015-12-16 Thread Stephan Ewen
I am not very familiar with Gremlin, but I remember a brainstorming session
with Martin Neumann on porting Cypher (the neo4j query language) to Flink.
We looked at Cypher queries for filtering and traversing the graph.

It looked like it would work well. We remember we could even model
recursive conditions on traversals pretty well with delta iterations.

If Gremlin's use cases are anything like Cypher, I could ping Martin and
see if we can collect again some of those ideas.

Stephan

On Tue, Dec 15, 2015 at 5:35 PM, Vasiliki Kalavri  wrote:

> Hi Dr. Fabian,
>
> thanks a lot for your answer!
>
>
> On 15 December 2015 at 15:42, Fabian Hueske  wrote:
>
> > Hi Vasia,
> >
> > I agree, Gremlin definitely looks like an interesting API for Flink.
> > I'm not sure how it relates to Gelly. I guess Gelly would (initially) be
> > more tightly integrated with the DataSet API whereas Gremlin would be a
> > connector for other languages. Any ideas on this?
> >
>
> The idea is to provide a FlinkGraphComputer which will use Gelly's
> iterations to compile the Gremlin query language to Flink.
> In my previous email, I linked to our discussion over at the Tinkerpop
> mailing list, where you can find more details on this. By adding the
> FlinkGraphComputer, we basically get any graph query language that compiles
> to the Gremlin VM for free.
>
>
> >
> > Another question would be whether the connector should to into Flink or
> > Tinkerpop. For example, the Spark, Giraph, and Neo4J connectors are all
> > included in Tinkerpop.
> > This should be discussed with the Tinkerpop community.
> >
> >
> I'm copying from the Tinkerpop mailing list thread (link for full thread in
> my previous email):​
>
>
> *In the past, TinkerPop use to be a "dumping ground" for all
> implementations, but we decided for TinkerPop3 that we would only have
> "reference implementations" so users can play, system providers can learn,
> and ultimately, system providers would provide TinkerPop support in their
> distribution. As such, we would like to have FlinkGraphComputer distributed
> with Flink. If that sounds like something your project would be comfortable
> with, I think we can provide a JIRA/PR for FlinkGraphComputer (as well as
> any necessary documentation). We can start with a JIRA ticket to get things
> going. Thoughts?*
>
>
> ​This is why I brought the conversation over here, so I hear the opinions
> of the Flink community on this :)​
>
>
>
> > Best, Fabian
> >
>
>
> -Vasia.​
>
>
>
> >
> >
> > 2015-12-14 18:33 GMT+01:00 Vasiliki Kalavri :
> >
> > > Ping squirrels! Any thoughts/opinions on this?
> > >
> > > On 9 December 2015 at 20:40, Vasiliki Kalavri <
> vasilikikala...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hello squirrels,
> > > >
> > > > I have been discussing with the Apache Tinkerpop [1] community
> > regarding
> > > > an integration with Flink/Gelly.
> > > > You can read our discussion in [2].
> > > >
> > > > Tinkerpop has a graph traversal machine called Gremlin, which
> supports
> > > > many high-level graph processing languages and runs on top of
> different
> > > > systems (e.g. Giraph, Spark, Graph DBs). You can read more in this
> > great
> > > > blog post [3].
> > > >
> > > > The idea is to provide a FlinkGraphComputer implementation, which
> will
> > > add
> > > > Gremlin support to Flink.
> > > >
> > > > I believe Tinkerpop is a great project and I would love to see an
> > > > integration with Gelly.
> > > > Before we move forward, I would like your input!
> > > > To me, it seems that this addition would nicely fit in flink-contrib,
> > > > where we also have connectors to other projects.
> > > > If you agree, I will go ahead and open a JIRA about it.
> > > >
> > > > Thank you!
> > > > -Vasia.
> > > >
> > > > [1]: https://tinkerpop.incubator.apache.org/
> > > > [2]:
> > > >
> > >
> >
> https://mail-archives.apache.org/mod_mbox/incubator-tinkerpop-dev/201511.mbox/%3ccanva_a390l7g169r8sn+ej1-yfkbudlnd4td6atwnp0uza-...@mail.gmail.com%3E
> > > > [3]:
> > > >
> > >
> >
> http://www.datastax.com/dev/blog/the-benefits-of-the-gremlin-graph-traversal-machine
> > > >
> > > > On 25 November 2015 at 16:54, Vasiliki Kalavri <
> > > vasilikikala...@gmail.com>
> > > > wrote:
> > > >
> > > >> Hi James,
> > > >>
> > > >> I've just subscribed to the Tinkerpop dev mailing list. Could you
> > please
> > > >> send a reply to the thread, so then I can reply to it?
> > > >> I'm not sure how I can reply to the thread otherwise...
> > > >> I also saw that there is a grafos.ml project thread. I could also
> > > >> provide some input there :)
> > > >>
> > > >> Thanks!
> > > >> -Vasia.
> > > >>
> > > >> On 25 November 2015 at 15:09, James Thornton <
> > james.thorn...@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> Hi Vasia -
> > > >>>
> > > >>> Yes, a FlinkGraphComputer should be a straight-forward first step.
> > > Also,
> > > >>> on
> > > >>> the Apache Tinkerpop dev mailing 

Re: [DISCUSS] Improving State/Timers/Windows

2015-12-14 Thread Stephan Ewen
A lot of this makes sense, but I am not sure about renaming
"OperatorState". The other name is nicer, but why make users' life hard
just for a name?


On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels  wrote:

> Hi Aljoscha,
>
> Thanks for the informative technical description.
>
> >  - function state: this is the state that you get when a user function
> implements the Checkpointed interface. it is not partitioned
> >  - operator state: This is the state that a StreamOperator can snapshot,
> it is similar to the function state, but for operators. it is not
> partitioned
> > - partitioned state: state that is scoped to the key of the incoming
> element, in Flink, this is (confusingly) called OperatorState and KvState
> (internally)
>
> Let's clean that up! Let's rename the OperatorState interface to KvState.
>
> > Both stream operators and user functions can have partitioned state, and
> the namespace is the same, i.e. the state can clash. The partitioned state
> will stay indefinitely if not manually cleared.
>
> I suppose operators currently have to take care to use a unique
> identifier for the state such that it doesn't clash with the user
> function. Wouldn't be too hard to introduce a scoping here.
>
> Your proposal makes sense. It seems like this is a rather delicate
> change which improves the flexibility of the streaming API. What is
> the motivation behind this? I suppose you are thinking of improvements
> to the session capabilities of the streaming API.
>
> > If we want to also implement the current WindowOperator on top of these
> generic facilities we need to have a way to scope state not only by key but
> also by windows (or better, some generic state scope).
>
> This is currently handled by the WindowOperator itself and would then
> be delegated to the enhanced state interface? Makes sense if we want
> to make use of the new state interface. Again, is it just cleaner or
> does this enable new type of applications?
>
> Cheers,
> Max
>
> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek 
> wrote:
> > Hi All,
> > I want to discuss some ideas about improving the primitives/operations
> that Flink offers for user-state, timers and windows and how these concepts
> can be unified.
> >
> > It has come up a lot lately that people have very specific requirements
> regarding the state that they keep and it seems necessary to allows users
> to set their own custom timers (on processing time and watermark time
> (event-time)) to do both expiration of state and implementation of custom
> windowing semantics. While we’re at this, we might also think about
> cleaning up the state handling a bit.
> >
> > Let me first describe the status quo, so that we’re all on the same
> page. There are three types of state:
> >  - function state: this is the state that you get when a user function
> implements the Checkpointed interface. it is not partitioned
> >  - operator state: This is the state that a StreamOperator can snapshot,
> it is similar to the function state, but for operators. it is not
> partitioned
> >  - partitioned state: state that is scoped to the key of the incoming
> element, in Flink, this is (confusingly) called OperatorState and KvState
> (internally)
> >
> > (Operator is the low-level concept, user functions are usually invoked
> by the operator, for example StreamMap is the operator that handles a
> MapFunction.)
> >
> > Function state and operator state is not partitioned, meaning that it
> becomes difficult when we want to implement dynamic scale-in/scale-out.
> With partitioned state it can be redistributed when changing the degree of
> parallelism.
> >
> > Both stream operators and user functions can have partitioned state, and
> the namespace is the same, i.e. the state can clash. The partitioned state
> will stay indefinitely if not manually cleared.
> >
> > On to timers, operators can register processing-time callbacks, they can
> react to watermarks to implement event-time callbacks. They have to
> implement the logic themselves, however. For example, the WindowOperator
> has custom code to keep track of watermark timers and for reacting to
> watermarks. User functions have no way of registering timers. Also, timers
> are not scoped to any key. So if you register a timer while processing an
> element of a certain key, when the timer fires you don’t know what key was
> active when registering the timer. This might be necessary for cleaning up
> state for certain keys, or to trigger processing for a certain key only,
> for example with session windows of some kind.
> >
> > Now, on to new stuff. I propose to add a timer facility that can be used
> by both operators and user functions. Both partitioned state and timers
> should be aware of keys and if a timer fires the partitioned state should
> be scoped to the same key that was active when the timer was registered.
> >
> > One last bit. If we want to also implement the current WindowOperator on
> 

Re: Flink shell in Jupyter

2015-12-17 Thread Stephan Ewen
I think Till has done some advanced Pythin / Flink / Zeppelin integration
(to use Python plotting libs) for a talk at some point.

@Till: Do you still have the code? Could you share it with Gyula?

On Wed, Dec 16, 2015 at 4:22 PM, Gyula Fóra  wrote:

> Hey Guys,
>
> Has anyone tried to setup the Flink scala shell with Jupyter? I would
> assume the logic is similar to Zeppelin.
>
> The reason I am asking this because we have a Jupyter cluster that runs
> python and scala (2.11 I believe) and Spark works on it, so we figured it
> would be good to add support for Flink as well so data scientists can then
> use that as well.
>
> I am of course willing to help with this, both with development and testing
> in a real production environment :)
>
> Cheers,
> Gyula
>


Re: [DISCUSS] Time Behavior in Streaming Jobs (Event-time/processing-time)

2015-12-18 Thread Stephan Ewen
I am also in favor of option (2).

We could also pass the TimeCharacteristic to for example the
"SlidingTimeWindows". Then there is one class, users can explicitly choose
the characteristic of choice, and when nothing is specified, the
default time characteristic is chosen.

On Thu, Dec 17, 2015 at 11:41 AM, Maximilian Michels  wrote:

> Hi Aljoscha,
>
> I'm in favor of option 2: Keep the setStreamTimeCharacteristic to set
> the default time behavior. Then add a method to the operators to set a
> custom time behavior.
>
> The problem explanatory in SlidingTimeWindows:
>
> @Override
> public Trigger
> getDefaultTrigger(StreamExecutionEnvironment env) {
>if (env.getStreamTimeCharacteristic() ==
> TimeCharacteristic.ProcessingTime) {
>   return ProcessingTimeTrigger.create();
>} else {
>   return EventTimeTrigger.create();
>}
> }
>
> That just needs to be fixed, e.g. by having a dedicated
> setTimeCharacteristic(..) on the operator.
>
> +1 for removing AbstractTime, EvenTime, and ProcessingTime.
>
> Cheers,
> Max
>
> On Wed, Dec 16, 2015 at 3:26 PM, Aljoscha Krettek 
> wrote:
> > Hi,
> > I thought a bit about how to improve the handling of time in Flink,
> mostly as it relates to windows. The problem is that mixing processing-time
> and event-time windows in one topology is very hard (impossible) right now.
> Let my explain it with this example:
> >
> > val env: StreamExecutionEnvironment = …
> >
> > env.setStreamTimeCharacteristic(EventTime)
> >
> > val input = 
> >
> > val quickResults = input
> >   .keyBy(…)
> >   .window(TumblingTimeWindows.of(Time.seconds(5))
> >   .trigger(ProcessingTimeTrigger.create())
> >   .sum(1)
> >
> > val slowResults = input
> >   .keyBy(…)
> >   .window(TumblingTimeWindows.of(Time.seconds(5))
> >   // .trigger(EventTimeTrigger.create()) this is the default trigger, so
> no need to set it, really
> >   .sum(1)
> >
> > The idea is that you want to have fast, but possibly inaccurate, results
> using processing time and correct, but maybe slower, results using
> event-time windowing.
> >
> > The problem is that the current API tries to solve two problems:
> >  1. We want to have a way to just say “time window” and then let the
> system instantiate the correct window-operator based on the time
> characteristic
> >  2. We want to have flexibility to allow users to mix ’n match
> processing-time and event-time windows
> >
> > The above example does not work because both operators would assign
> elements to windows based on the event-time timestamp. The first window
> therefore triggers event-time windows by processing time, which has
> unexpected (wrong) results.
> >
> > I see three solutions to this:
> >  1. Remove setStreamTimeCharacteristic(), let users always specify
> exactly what kind of windowing they want
> >  2. Keep setStreamTimeCharacteristic() but only employ the magic that
> decides on the window operator for the special .timeWindow() call. Have two
> different window assigners (two per window type, that is TumblingWindows,
> SlidingTimeWindows, SessionWindows, ...), one for processing-time and one
> for event-time that allow users to accurately specify what they want
> >  3. Keep setStreamTimeCharacteristic() and have three window assigners
> per window type, one for processing-time, one for event-time and one that
> automatically decides based on the time characteristic
> >
> > What do you think?
> >
> > On a side note, I would also suggest to remove AbstractTime, EventTime,
> and ProcessingTime and just keep Time for specifying time.
> >
> > Cheers,
> > Aljoscha
>


Re: Flink and Clojure

2015-12-10 Thread Stephan Ewen
Flink's classloading is different from Hadoop's.

In Hadoop, the entire JVM is started with all classes (including the user
jar) in the classpath already. In Flink, jars are added dymanically, to
running JVMs with custom class loaders. That way, running worker/master
processes can accept new jars without restarts. Important for low-latency,
shells, etc

Flink itself respects these classloaders whenever dynamically looking up a
class. It may be that Closure is written such that it can only dynamically
instantiate what is the original classpath.



On Fri, Dec 11, 2015 at 1:31 AM, Nick Dimiduk <ndimi...@apache.org> wrote:

> As far as the jvm is concerned, clojure is just another library. You should
> be able to package it up like any other dependency and submit the job.
> That's always how it worked in Hadoop/MR anyway...
>
> On Thu, Dec 10, 2015 at 3:22 PM, Matthias J. Sax <mj...@apache.org> wrote:
>
> > Thanks for this idea.
> >
> > I extended my pom to include clojure-1.5.1.jar in my program jar.
> > However, the problem is still there... I did some research on the
> > Internet, and it seems I need to mess around with Clojure's class
> > loading strategy...
> >
> > -Matthias
> >
> > On 12/10/2015 06:47 PM, Nick Dimiduk wrote:
> > > I think Mattias's project is using maven though -- there's a pom in the
> > > project that doesn't look generated. If you want to do it from lein,
> > maybe
> > > my old lein-hadoop [0] plugin can help?
> > >
> > > [0]: https://github.com/ndimiduk/lein-hadoop
> > >
> > > On Thu, Dec 10, 2015 at 8:54 AM, Robert Metzger <rmetz...@apache.org>
> > wrote:
> > >
> > >> I had the same though as Nick. Maybe Leiningen allows to somehow
> build a
> > >> fat-jar containing the clojure standard library.
> > >>
> > >> On Thu, Dec 10, 2015 at 5:51 PM, Nick Dimiduk <ndimi...@apache.org>
> > wrote:
> > >>
> > >>> What happens when you follow the packaging examples provided in the
> > flink
> > >>> quick start archetypes? These have the maven-foo required to package
> an
> > >>> uberjar suitable for flink submission. Can you try adding that step
> to
> > >> your
> > >>> pom.xml?
> > >>>
> > >>> On Thursday, December 10, 2015, Stephan Ewen <se...@apache.org>
> wrote:
> > >>>
> > >>>> This is a problem in Java.
> > >>>> I think you cannot dynamically modify the initial system class
> loader.
> > >>>>
> > >>>> What most apps do is check for the thread context class loader when
> > >>>> dynamically loading classes. We can check and make sure that one is
> > >> set,
> > >>>> but if Closure does not respect that, we have a problem.
> > >>>> Then Closure is not built for dynamic class loading.
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Thu, Dec 10, 2015 at 5:15 PM, Matthias J. Sax <mj...@apache.org
> > >>>> <javascript:;>> wrote:
> > >>>>
> > >>>>> Would it make sense (if possible?) for Flink to add the user jar
> > >>>>> dynamically to it's own classpath so Clojure can find it? Or
> somehow
> > >>>>> modify Clojure's class loader?
> > >>>>>
> > >>>>> The jars in lib are added to the classpath at startup. This makes
> it
> > >>>>> practically impossible to execute a Flink program that is written
> in
> > >>>>> Clojure right now...
> > >>>>>
> > >>>>>
> > >>>>> On 12/10/2015 05:09 PM, Aljoscha Krettek wrote:
> > >>>>>> Clojure is not considering the user-jar when trying to load the
> > >>> class.
> > >>>>>>
> > >>>>>>> On 10 Dec 2015, at 17:05, Matthias J. Sax <mj...@apache.org
> > >>>> <javascript:;>> wrote:
> > >>>>>>>
> > >>>>>>> Hi Squirrels,
> > >>>>>>>
> > >>>>>>> I was playing with a Flink Clojure WordCount example today.
> > >>>>>>>
> https://github.com/mjsax/flink-external/tree/master/flink-clojure
> > >>>>>>>
> > >>>>>>> After building the project with "mvn package" I tried to submit
> it
> > >

Re: Task Parallelism in a Cluster

2015-12-10 Thread Stephan Ewen
Hi Ali!

Seems like the Google Doc has restricted access, I tells me I have no
permission to view it...

Stephan


On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:

> Hi Stephan,
>
> Here’s a link to the screenshot I tried to attach earlier:
>
> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
>
> It looks to me like the distribution is fairly skewed across the nodes,
> even though they’re executing the same pipeline.
>
> Thanks,
> Ali
>
>
> On 2015-12-09, 12:36 PM, "Stephan Ewen" <se...@apache.org> wrote:
>
> >Hi!
> >
> >The parallel socket source looks good.
> >I think you forgot to attach the screenshot, or the mailing list dropped
> >the attachment...
> >
> >Not sure if I can diagnose that without more details. The sources all do
> >the same. Assuming that the server distributes the data evenly across all
> >connected sockets, and that the network bandwidth ends up being divided in
> >a fair way, all pipelines should run be similarly "eager".
> >
> >Greetings,
> >Stephan
> >
> >
> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
> >
> >> Hi Stephan,
> >>
> >> That was my original understanding, until I realized that I was not
> >>using
> >> a parallel socket source. I had a custom source that extended
> >> SourceFunction which always runs with parallelism = 1. I looked through
> >> the API and found the ParallelSourceFunction interface so I implemented
> >> that and voila, now all 3 nodes in the cluster are actually receiving
> >> traffic on socket connections.
> >>
> >> Now that I’m running it successfully end to end, I’m trying to improve
> >>the
> >> performance. Can you take a look at the attached screen shot and tell me
> >> if the distribution of work amongst the pipelines is normal? I feel like
> >> some pipelines are lot lazier than others, even though the cluster nodes
> >> are exactly the same.
> >>
> >> By the way, here’s the class I wrote. It would be useful to have this
> >> available in Flink distro:
> >>
> >> public class ParallelSocketSource implements
> >> ParallelSourceFunction {
> >>
> >> private static final long serialVersionUID =
> >>-271094428915640892L;
> >> private static final Logger LOG =
> >> LoggerFactory.getLogger(ParallelSocketSource.class);
> >>
> >> private volatile boolean running = true;
> >> private String host;
> >> private int port;
> >>
> >> public ParallelSocketSource(String host, int port) {
> >> this.host = host;
> >> this.port = port;
> >> }
> >>
> >> @Override
> >> public void run(SourceContext ctx) throws Exception {
> >> try (Socket socket = new Socket(host, port);
> >> BufferedReader reader = new BufferedReader(new
> >> InputStreamReader(socket.getInputStream( {
> >>         String line  = null;
> >> while(running && ((line = reader.readLine()) !=
> >> null)) {
> >> ctx.collect(line);
> >> }
> >> } catch(IOException ex) {
> >> LOG.error("error reading from socket", ex);
> >> }
> >> }
> >>
> >> @Override
> >> public void cancel() {
> >> running = false;
> >> }
> >> }
> >>
> >> Regards,
> >> Ali
> >>
> >>
> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <se...@apache.org> wrote:
> >>
> >> >Hi Ali!
> >> >
> >> >In the case you have, the sequence of source-map-filter ... forms a
> >> >pipeline.
> >> >
> >> >You mentioned that you set the parallelism to 16, so there should be 16
> >> >pipelines. These pipelines should be completely independent.
> >> >
> >> >Looking at the way the scheduler is implemented, independent pipelines
> >> >should be spread across machines. But when you execute that in
> >>parallel,
> >> >you say all 16 pipelines end up on the same machine?
> >> >
> >> >Can you share with us the rough cod

Re: Externalizing the Flink connectors

2015-12-10 Thread Stephan Ewen
I like this a lot. It has multiple advantages:

  - Obviously more frequent connector updates without being forced to go to
a snapshot version
  - Reduce complexity and build time of the core flink repository

We should make sure that for example 0.10.x connectors always work with
0.10.x flink core releases.

Would we loose test coverage by putting the connectors into a separate
repository/maven project?



On Thu, Dec 10, 2015 at 3:45 PM, Fabian Hueske  wrote:

> Sounds like a good idea to me.
>
> +1
>
> Fabian
>
> 2015-12-10 15:31 GMT+01:00 Maximilian Michels :
>
> > Hi squirrels,
> >
> > By this time, we have numerous connectors which let you insert data
> > into Flink or output data from Flink.
> >
> > On the streaming side we have
> >
> > - RollingSink
> > - Flume
> > - Kafka
> > - Nifi
> > - RabbitMQ
> > - Twitter
> >
> > On the batch side we have
> >
> > - Avro
> > - Hadoop compatibility
> > - HBase
> > - HCatalog
> > - JDBC
> >
> >
> > Many times we would have liked to release updates to the connectors or
> > even create new ones in between Flink releases. This is currently not
> > possible because the connectors are part of the main repository.
> >
> > Therefore, I have created a new repository at
> > https://git-wip-us.apache.org/repos/asf/flink-connectors.git. The idea
> > is to externalize the connectors to this repository. We can then
> > update and release them independently of the main Flink repository. I
> > think this will give us more flexibility in the development process.
> >
> > What do you think about this idea?
> >
> > Cheers,
> > Max
> >
>


Re: Diagnosing TaskManager disappearance

2015-12-14 Thread Stephan Ewen
Hi!

The Netty memory usually goes much lower than 2*network memory (that is
theoretical).

Netty needs memory at the size two buffers on the sender and receiver side,
per TCP connection.
Since Flink usually multiplexes many Channels (that need network buffers)
through the same TCP connection, the
amount of memory actually needed by Netty is much lower than the
theoretical value.

That being said, we have seen also another case where the Netty memory size
is not trivially small.

A way to alleviate that is to allocate Flink's network memory off heap and
have the buffers implement Netty's ByteBuf interface.
That way, the sender side Netty channels will not need any additional
memory at all (direct zero copy Flink Buffer to Network stack), which is
quite a win already.

For the receiver side, it is a tad bit trickier, but afaik the receiver
Netty needs only memory in the size of one buffer per TCP channel,
so even without fixing this, we would be down to 1/3 of the required Netty
memory.


I think @uce has some thoughts about this as well...

Greetings,
Stephan





On Sat, Dec 12, 2015 at 12:53 PM, Greg Hogan <c...@greghogan.com> wrote:

> The TaskManagers were nixed by the OOM killer.
>
>   [63896.699500] Out of memory: Kill process 12892 (java) score 910 or
> sacrifice child
>   [63896.702018] Killed process 12892 (java) total-vm:47398740kB,
> anon-rss:28487812kB, file-rss:8kB
>
> The cluster is comprised of AWS c4.8xlarge instances which have 60 GiB of
> memory across two NUMA nodes (~32.2 GB each). Pertinent TaskManager
> configuration:
>
>   taskmanager.memory.off-heap: true
>   taskmanager.memory.segment-size: 16384
>   taskmanager.heap.mb: 18000
>   taskmanager.network.numberOfBuffers: 414720
>
> This was allocating 18 GB plus up to 6.8 GB for network buffers. As Max
> noted in FLINK-2865, "I think the maximum number of network memory can
> never exceed 2 * (network memory). In this case all network buffers would
> be inside the Netty buffer pool." Doubling the 6.8 GB exceeds the node
> memory.
>
> Would disabling off-heap memory cause the network buffers to be re-used by
> Netty and save half of the network buffer memory? I created FLINK-3164
> which would reduce the number of necessary network buffers.
>
> Greg Hogan
>
> On Fri, Oct 30, 2015 at 12:33 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> > The logging of the TaskManager stops 3 seconds before the JobManager
> > detects that the connection to the TaskManager is failed. If the clocks
> are
> > remotely in sync and the TaskManager is still running, then we should
> also
> > see logging statements for the time after the connection has failed.
> > Therefore, I would also suspect that something happened to the
> TaskManager
> > JVM.
> >
> > Cheers,
> > Till
> >
> > On Fri, Oct 30, 2015 at 3:43 AM, Robert Metzger <rmetz...@apache.org>
> > wrote:
> >
> > > So is the TaskManager JVM still running after the JM detected that the
> TM
> > > has gone?
> > >
> > > If not, can you check the kernel log (dmesg) to see whether Linux OOM
> > > killer stopped the process? (if its a kill, the JVM might not be able
> to
> > > log anything anymore)
> > >
> > > On Thu, Oct 29, 2015 at 9:27 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > > > Thanks for sharing the logs, Greg!
> > > >
> > > > Okay, so the TaskManager does not crash, but the Remote Failure
> > Detector
> > > of
> > > > Akka marks the connection between JobManager and TaskManager as
> broken.
> > > >
> > > > The TaskManager is not doing much GC, so it is not a long JVM freeze
> > that
> > > > causes hearbeats to time out...
> > > >
> > > > I am wondering at this point whether this is an issue in Akka,
> > > specifically
> > > > the remote death watch that we use to let the JobManager recognize
> > > > disconnected TaskManagers.
> > > >
> > > > One thing you could try is actually to comment out the line where the
> > > > JobManager starts the death watch for the TaskManager and see if they
> > can
> > > > still successfully exchange messages (tasks finished, find inputs,
> > > > schedule) and the program completes. That would indicate that the
> Akka
> > > > Death Watch is flawed and that we should probably do our own
> heartbeats
> > > > instead.
> > > >
> > > > Greetings,
> > > > Stephan
> > > >
> > > >
> > > > On Thu, Oct 29, 2015 at 11:44 AM, Aljoscha 

Re: Task Parallelism in a Cluster

2015-12-09 Thread Stephan Ewen
Hi!

The parallel socket source looks good.
I think you forgot to attach the screenshot, or the mailing list dropped
the attachment...

Not sure if I can diagnose that without more details. The sources all do
the same. Assuming that the server distributes the data evenly across all
connected sockets, and that the network bandwidth ends up being divided in
a fair way, all pipelines should run be similarly "eager".

Greetings,
Stephan


On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:

> Hi Stephan,
>
> That was my original understanding, until I realized that I was not using
> a parallel socket source. I had a custom source that extended
> SourceFunction which always runs with parallelism = 1. I looked through
> the API and found the ParallelSourceFunction interface so I implemented
> that and voila, now all 3 nodes in the cluster are actually receiving
> traffic on socket connections.
>
> Now that I’m running it successfully end to end, I’m trying to improve the
> performance. Can you take a look at the attached screen shot and tell me
> if the distribution of work amongst the pipelines is normal? I feel like
> some pipelines are lot lazier than others, even though the cluster nodes
> are exactly the same.
>
> By the way, here’s the class I wrote. It would be useful to have this
> available in Flink distro:
>
> public class ParallelSocketSource implements
> ParallelSourceFunction {
>
> private static final long serialVersionUID = -271094428915640892L;
> private static final Logger LOG =
> LoggerFactory.getLogger(ParallelSocketSource.class);
>
> private volatile boolean running = true;
> private String host;
> private int port;
>
> public ParallelSocketSource(String host, int port) {
> this.host = host;
> this.port = port;
> }
>
> @Override
> public void run(SourceContext ctx) throws Exception {
> try (Socket socket = new Socket(host, port);
> BufferedReader reader = new BufferedReader(new
> InputStreamReader(socket.getInputStream( {
> String line  = null;
> while(running && ((line = reader.readLine()) !=
> null)) {
> ctx.collect(line);
> }
> } catch(IOException ex) {
> LOG.error("error reading from socket", ex);
> }
> }
>
> @Override
> public void cancel() {
> running = false;
> }
> }
>
> Regards,
> Ali
>
>
> On 2015-12-08, 3:35 PM, "Stephan Ewen" <se...@apache.org> wrote:
>
> >Hi Ali!
> >
> >In the case you have, the sequence of source-map-filter ... forms a
> >pipeline.
> >
> >You mentioned that you set the parallelism to 16, so there should be 16
> >pipelines. These pipelines should be completely independent.
> >
> >Looking at the way the scheduler is implemented, independent pipelines
> >should be spread across machines. But when you execute that in parallel,
> >you say all 16 pipelines end up on the same machine?
> >
> >Can you share with us the rough code of your program? Or a Screenshot from
> >the runtime dashboard that shows the program graph?
> >
> >
> >If your cluster is basically for that one job only, you could try and set
> >the number of slots to 4 for each machine. Then you have 16 slots in total
> >and each node would run one of the 16 pipelines.
> >
> >
> >Greetings,
> >Stephan
> >
> >
> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
> >
> >> There is no shuffle operation in my flow. Mine actually looks like this:
> >>
> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map
> >>->
> >> Map, Filter)
> >>
> >>
> >> Maybe it’s treating this whole flow as one pipeline and assigning it to
> >>a
> >> slot. What I really wanted was to have the custom source I built to have
> >> running instances on all nodes. I’m not really sure if that’s the right
> >> approach, but if we could add this as a feature that’d be great, since
> >> having more than one node running the same pipeline guarantees the
> >> pipeline is never offline.
> >>
> >> -Ali
> >>
> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrm...@apache.org> wrote:
> >>
> >> >If I'm not mistaken, then the 

Re: ClassNotFoundException : org.apache.flink.api.common.operators.util.UserCodeObjectWrapper, while trying to run locally

2015-12-30 Thread Stephan Ewen
I agree, this would be nice to support in Flink.

The important parts are on the client side (which may be embedded). The
classloaders are used there as part of the de-serialization of messages
that contain user-defined types (such as in collect() or in accumulators
and in exception reporting).

How are you starting the program in your web-app, by the way?

If you would be up for contributing this fix, that would be great! One
could approach this in two ways:

  - Simple, but not sure if completely future proof: we take always the
context classloader as the parent, and if that is null, the system class
loader

  - More elaborate: We allow to set a root classloader on the
ExecutionEnvironment. That one should be passed as parent class loader to
the Remote Executor and the Client.


Greetings,
Stephan


On Mon, Dec 21, 2015 at 4:41 PM, peterpanne  wrote:

> I identified the root-cause of the problem:
>
> When the FlinkUserCodeClassLoader ist created it simply instantiates the
> URLClassLoader which takes the System-Classloader as parent. However in the
> web-application-setting the current thread-webapp-classloader should be
> taken. I tried to change the class slightly to take the "right" threads
> class loader as parent.
>  private static class FlinkUserCodeClassLoader extends URLClassLoader {
> public FlinkUserCodeClassLoader(URL[] urls) {
> super(urls,
> Thread.currentThread().getContextClassLoader());}}
>
> Unfortunately there are multiple places where the system-classloader ist
> taken instead of the thread class loader:
>
> flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
> That solved my problem for the moment.
> However, I am not sure if that introduces problems somewhere else. In
> theory
> it should not be critical since the web-appclassloader simply introduces an
> additional level of classes to look for.
>
> It would cool if that problem could be resolved in flink ;-)
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ClassNotFoundException-org-apache-flink-api-common-operators-util-UserCodeObjectWrapper-while-tryingy-tp5922p9653.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>


Re: Release tag for 0.10.1

2016-01-08 Thread Stephan Ewen
Hi Nick!

We have not pushed a release tag, but have a frozen release-0.10.1-RC1
branch (https://github.com/apache/flink/tree/release-0.10.1-rc1)
A tag would be great, agree!

Flink does in its core not depend on Hadoop. The parts that reference
Hadoop (HDFS filesystem, YARN, MapReduce function/format compatibility) are
not using any Hadoop version specific code (with the exception of some YARN
functions, which are reflectively invoked). So it should work across
versions nicely. The main friction we saw were version clashes of
transitive dependencies.

The Flink CI builds include building Flink with Hadoop 2.5.0, see here:
https://github.com/apache/flink/blob/master/.travis.yml

Greetings,
Stephan



On Fri, Jan 8, 2016 at 6:54 PM, Nick Dimiduk  wrote:

> An only-slightly related question: Is Flink using Hadoop version specific
> features in some way? IIRC, the basic APIs should be compatible back as far
> as 2.2. I'm surprised to see builds of flink explicitly against many hadoop
> versions, but 2.5.x is excluded.
>
> -n
>
> On Fri, Jan 8, 2016 at 9:45 AM, Nick Dimiduk  wrote:
>
> > Hi Devs,
> >
> > It seems no release tag was pushed to 0.10.1. I presume this was an
> > oversight. Is there some place I can look to see from which sha the
> 0.10.1
> > release was built? Are the RC vote threads the only cannon in this
> matter?
> >
> > Thanks,
> > Nick
> >
>


Re: [gelly] Spargel model rework

2016-01-06 Thread Stephan Ewen
+1 for the renaming!

On Wed, Jan 6, 2016 at 8:01 PM, Vasiliki Kalavri <vasilikikala...@gmail.com>
wrote:

> issue created: https://issues.apache.org/jira/browse/FLINK-3207
>
> If anyone has any other suggestion about the renaming, let me know :)
>
> -V.
>
> On 5 January 2016 at 11:52, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> > Nice to hear. :D
> >
> > I think you can go ahead and add the Jira. About the renaming: I also
> > think that it would make sense to do it.
> > > On 04 Jan 2016, at 19:48, Vasiliki Kalavri <vasilikikala...@gmail.com>
> > wrote:
> > >
> > > Hello squirrels and happy new year!
> > >
> > > I'm reviving this thread to share some results and discuss next steps.
> > >
> > > Using the Either type I was able to get rid of redundant messages and
> > > vertex state. During the past few weeks, I have been running
> experiments,
> > > which show that the performance of this "Pregel" model has improved a
> > lot :)
> > > In [1], you can see the speedup of GSA and Pregel over Spargel, for
> SSSP
> > > and Connected Components (CC), for the Livejournal (68m edges), Orkut
> > (117m
> > > edges) and Wikipedia (340m edges) datasets.
> > >
> > > Regarding next steps, if no objections, I will open a Jira for adding a
> > > Pregel iteration abstraction to Gelly. The Gelly guide has to be
> updated
> > to
> > > reflect the spectrum of iteration abstractions that we have discussed
> in
> > > this thread, i.e. Pregel -> Spargel (Scatter-Gather) -> GSA.
> > >
> > > I think it might also be a good idea to do some renaming. Currently, we
> > > call the Spargel iteration "vertex-centric", which fits better to the
> > > Pregel abstraction. I propose we rename the spargel iteration into
> > > "scatter-gather" or "signal-collect" (where it was first introduced
> [2]).
> > > Any other ideas?
> > >
> > > Thanks,
> > > -Vasia.
> > >
> > > [1]:
> > >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYRTRjMkp1d3R6eVE/view?usp=sharing
> > > [2]: http://link.springer.com/chapter/10.1007/978-3-642-17746-0_48
> > >
> > > On 11 November 2015 at 11:05, Stephan Ewen <se...@apache.org> wrote:
> > >
> > >> See: https://issues.apache.org/jira/browse/FLINK-3002
> > >>
> > >> On Wed, Nov 11, 2015 at 10:54 AM, Stephan Ewen <se...@apache.org>
> > wrote:
> > >>
> > >>> "Either" an "Optional" types are quite useful.
> > >>>
> > >>> Let's add them to the core Java API.
> > >>>
> > >>> On Wed, Nov 11, 2015 at 10:00 AM, Vasiliki Kalavri <
> > >>> vasilikikala...@gmail.com> wrote:
> > >>>
> > >>>> Thanks Fabian! I'll try that :)
> > >>>>
> > >>>> On 10 November 2015 at 22:31, Fabian Hueske <fhue...@gmail.com>
> > wrote:
> > >>>>
> > >>>>> You could implement a Java Either type (similar to Scala's Either)
> > >> that
> > >>>>> either has a Message or the VertexState and a corresponding
> > >>>> TypeInformation
> > >>>>> and TypeSerializer that serializes a byte flag to indicate which
> both
> > >>>> types
> > >>>>> is used.
> > >>>>> It might actually make sense, to add a generic Either type to the
> > Java
> > >>>> API
> > >>>>> in general (similar to the Java Tuples with resemble the Scala
> > >> Tuples).
> > >>>>>
> > >>>>> Cheers, Fabian
> > >>>>>
> > >>>>> 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri <
> > >> vasilikikala...@gmail.com
> > >>>>> :
> > >>>>>
> > >>>>>> Hi,
> > >>>>>>
> > >>>>>> after running a few experiments, I can confirm that putting the
> > >>>> combiner
> > >>>>>> after the flatMap is indeed more efficient.
> > >>>>>>
> > >>>>>> I ran SSSP and Connected Components with Spargel, GSA, and the
> > >> Pregel
> > >>>>> model
> > >>>>>> and the results are the following:
> > >>>>>>
> > >

Re: Add CEP library to Flink

2016-01-08 Thread Stephan Ewen
Looks super cool, Till!

Especially the section about the Patterns is great.
For the other parts, I was wondering about the overlap with the TableAPI
and the SQL efforts.

I was thinking that a first version could really focus on the Patterns and
make the assumption that they are always applied on a KeyedStream.
That way the effort would focus on the most important new addition, and we
could evaluate whether we could reuse the TableAPI for the other
grouping/windowing/etc parts.

Basically, have initially something like this:

Pattern pattern = Pattern.next("e1").where( (evt) -> evt.id
== 42 )
  .followedBy("e2").where( (evt) -> evt.id == 1337 )
.within(Time.minutes(10))

KeyedStream ks = input.keyBy( (evt) -> evt.getId() );

CEP.pattern(ks, pattern).select( new PatternSelectFunction() { ... }
);


All other parts could still be constructed around that in the end.

Any thoughts?


Greetings,
Stepahn







On Fri, Jan 8, 2016 at 5:50 PM, Gordon Tai (戴資力)  wrote:

> A definite +1 for this feature, thanks for your effort Till!
> Really look forward to the POC foundation and would like to help contribute
> where-ever possible.
>
> Pattern matching along with event time support seems to be another major
> breakthrough for stream processing framework options currently on the
> table.
>
> At our company, we've been using Flink to implement pattern matching very
> similar to the use cases detailed in Till's design doc for adtech related
> applications. A comprehensive and expressive DSL for these applications
> will be fantastic.
>
> On Sat, Jan 9, 2016 at 12:36 AM, Ufuk Celebi  wrote:
>
> >
> > > On 08 Jan 2016, at 15:54, Till Rohrmann  wrote:
> > >
> > > Hi everybody,
> > >
> > > recently we've seen an increased interest in complex event processing
> > (CEP)
> > > by Flink users. Even though most functionality is already there to
> solve
> > > many use cases it would still be helpful for most users to have an easy
> > to
> > > use library. Having such a library which allows to define complex event
> > > patterns would increase Flink's user range to the CEP community. Once
> > > having laid the foundation, I'm optimistic that people will quickly
> pick
> > it
> > > up and further extend it.
> > >
> > > The major contribution of this library would be to add an efficient
> > > non-deterministic finite automaton which can detect complex event
> > patterns.
> > > For everything else, Flink already has most of the functionality in
> > place.
> > >
> > > I've drafted a design document for the first version. Please review it
> > and
> > > comment:
> > >
> > >
> >
> https://docs.google.com/document/d/15iaBCZkNcpqSma_qrF0GUyobKV_JttEDVuhNd0Y1aAU/edit?usp=sharing
> >
> > Thanks for sharing, Till! I think that this will be a very valuable
> > addition to Flink. Looking forward to it. :-)
> >
> > – Ufuk
> >
> >
>
>
> --
> Tzu-Li (Gordon) Tai
> Data Engineer @ VMFive
> vmfive.com
>


Re: Naive question

2016-01-08 Thread Stephan Ewen
Hi!

This looks like a mismatch between the Scala dependency in Flink and Scala
in your Eclipse. Make sure you use the same for both. By default, Flink
reference Scala 2.10

If your IDE is set up for Scala 2.11, set the Scala version variable in the
Flink root pom.xml also to 2.11

Greetings,
Stephan




On Fri, Jan 8, 2016 at 12:06 PM, Vasudevan, Ramkrishna S <
ramkrishna.s.vasude...@intel.com> wrote:

> I have been trying to install, learn and understand Flink. I am using
> Scala- EclipseIDE as my IDE.
>
> I have downloaded the flink source coded, compiled and created the project.
>
> My work laptop is Windows based and I don't have eclipse based workstation
> but I do have linux boxes for running and testing things.
>
> Some of the examples given in Flink source code do run directly from
> Eclipse but when I try to run the Wordcount example from Eclipse I get this
> error
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
>  at akka.actor.ActorCell$.(ActorCell.scala:336)
>  at akka.actor.ActorCell$.(ActorCell.scala)
>  at akka.actor.RootActorPath.$div(ActorPath.scala:159)
>  at akka.actor.LocalActorRefProvider.(ActorRefProvider.scala:464)
>  at akka.actor.LocalActorRefProvider.(ActorRefProvider.scala:452)
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
> Source)
>  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
> Source)
>  at java.lang.reflect.Constructor.newInstance(Unknown Source)
>  at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
>  at scala.util.Try$.apply(Try.scala:191)
>  at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
>  at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>  at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>  at scala.util.Success.flatMap(Try.scala:230)
>  at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
>  at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
>  at akka.actor.ActorSystemImpl.(ActorSystem.scala:578)
>  at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
>  at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
>  at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
>  at
> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84)
>  at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:196)
>  at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:225)
>  at org.apache.flink.runtime.minicluster.FlinkMiniCluster.org
> $apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:225)
>  at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:230)
>  at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:228)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>  at scala.collection.immutable.Range.foreach(Range.scala:166)
>  at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:228)
>  at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:219)
>  at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:104)
>  at
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:80)
>
> I know this is a naïve question but I would like to get some help in order
> to over come this issue. I tried various options like setting scala-2.10 as
> the compiler for the project (then it shows completely different error) and
> many of the projects don't even compile. But with 2.11 version I get the
> above stack trace. Any help here is welcome.
>
> Regards
> Ram
>


Re: Union a data stream with a product of itself

2015-11-25 Thread Stephan Ewen
"stream.union(stream.map(..))" should definitely be possible. Not sure why
this is not permitted.

"stream.union(stream)" would contain each element twice, so should either
give an error or actually union (or duplicate) elements...

Stephan


On Wed, Nov 25, 2015 at 10:42 AM, Gyula Fóra  wrote:

> Yes, I am not sure if this the intentional behaviour. I think you are
> supposed to be able to do the things you described.
>
> stream.union(stream.map(..)) and things like this are fair operations. Also
> maybe stream.union(stream) should just give stream instead of an error.
>
> Could someone comment on this who knows the reasoning behind the current
> mechanics?
>
> Gyula
>
> Vasiliki Kalavri  ezt írta (időpont: 2015. nov.
> 24., K, 16:46):
>
> > Hi squirrels,
> >
> > when porting the gelly streaming code from 0.9 to 0.10 today with Paris,
> we
> > hit an exception in union: "*A DataStream cannot be unioned with
> itself*".
> >
> > The code raising this exception looks like this:
> > stream.union(stream.map(...)).
> >
> > Taking a look into the union code, we see that it's now not allowed to
> > union a stream, not only with itself, but with any product of itself.
> >
> > First, we are wondering, why is that? Does it make building the stream
> > graph easier in some way?
> > Second, we might want to give a better error message there, e.g. "*A
> > DataStream cannot be unioned with itself or a product of itself*", and
> > finally, we should update the docs, which currently state that union a
> > stream with itself is allowed and that "*If you union a data stream with
> > itself you will still only get each element once.*"
> >
> > Cheers,
> > -Vasia.
> >
>


Re: [VOTE] Release Apache Flink 0.10.1 (release-0.10.0-rc1)

2015-11-25 Thread Stephan Ewen
@Till I think the avro test data file is okay, the "no binaries" policy
refers to binary executables, as far as I know.

On Wed, Nov 25, 2015 at 2:54 PM, Till Rohrmann <till.rohrm...@gmail.com>
wrote:

> Checked checksums for src release and Hadoop 2.7 Scala 2.10 release
>
> Checked binaries in source release
> - contains ./flink-staging/flink-avro/src/test/resources/testdata.avro
>
> License
> - no new files added which are relevant for licensing
>
> Build Flink and run tests from source release for Hadoop 2.5.1
>
> Checked empty that log files don't contain exceptions and out files are
> empty
>
> Run all examples with Hadoop 2.7 Scala 2.10 binaries via FliRTT tool on 4
> node standalone cluster and YARN cluster
>
> Tested planVisualizer
>
> Tested flink command line client
> - tested info command
> - tested -p option
>
> Tested cluster HA in standalone mode => working
>
> Tested cluster HA on YARN (2.7.1) => working
>
> Except for the avro testdata file which is contained in the source release,
> I didn't find anything.
>
> +1 for releasing and removing the testdata file for the next release.
>
> On Wed, Nov 25, 2015 at 2:33 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
> > +1
> >
> > - Build a maven project with the staging repository
> > - started Flink on YARN on a CDH 5.4.5 / Hadoop 2.6.0-cdh5.4.5 cluster
> with
> > YARN and HDFS HA
> > - ran some kafka (0.8.2.0) read / write experiments
> > - job cancellation with yarn is working ;)
> >
> > I found the following issue while testing:
> > https://issues.apache.org/jira/browse/FLINK-3078 but it was already in
> > 0.10.0 and its not super critical bc the JobManager container will be
> > killed by YARN after a few minutes.
> >
> >
> > I'll extend the vote until tomorrow Thursday, November 26.
> >
> >
> > On Tue, Nov 24, 2015 at 1:54 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > @Gyula: I think it affects users, so should definitely be fixed very
> soon
> > > (either 0.10.1 or 0.10.2)
> > >
> > > Still checking whether Robert's current version fix solves it now, or
> > > not...
> > >
> > > On Tue, Nov 24, 2015 at 1:46 PM, Vyacheslav Zholudev <
> > > vyacheslav.zholu...@gmail.com> wrote:
> > >
> > > > I can confirm that the build works fine when increasing a max number
> of
> > > > opened files. Sorry for confusion.
> > > >
> > > >
> > > >
> > > > --
> > > > View this message in context:
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-0-10-1-release-0-10-0-rc1-tp9296p9327.html
> > > > Sent from the Apache Flink Mailing List archive. mailing list archive
> > at
> > > > Nabble.com.
> > > >
> > >
> >
>


Re: [VOTE] Release Apache Flink 0.10.1 (release-0.10.0-rc1)

2015-11-25 Thread Stephan Ewen
+1

 - License and Notice are good
 - ran all tests (including manual tests) work for hadoop 2.3.0 - Scala 2.10
 - ran all tests for hadoop 2.7.0 - Scala 2.11
 - ran all examples, several on larger external data
 - checked web frontend
 - checked quickstart archetypes


On Tue, Nov 24, 2015 at 1:54 PM, Stephan Ewen <se...@apache.org> wrote:

> @Gyula: I think it affects users, so should definitely be fixed very soon
> (either 0.10.1 or 0.10.2)
>
> Still checking whether Robert's current version fix solves it now, or
> not...
>
> On Tue, Nov 24, 2015 at 1:46 PM, Vyacheslav Zholudev <
> vyacheslav.zholu...@gmail.com> wrote:
>
>> I can confirm that the build works fine when increasing a max number of
>> opened files. Sorry for confusion.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-0-10-1-release-0-10-0-rc1-tp9296p9327.html
>> Sent from the Apache Flink Mailing List archive. mailing list archive at
>> Nabble.com.
>>
>
>


Re: The null in Flink

2015-11-26 Thread Stephan Ewen
sts in both side Join key, refer to #i,
> NULL == NULL return false, no output for NULL Join key.
> >  v.  NULL in Scalar expression, expression within
> NULL(eg. 1 + NULL) return NULL.
> >  vi. NULL in Boolean expression, add an extra result:
> UNKNOWN, more semantic for Boolean expression in reference #1.
> >  vii. More related function support, like COALESCE, NVL,
> NANVL, and so on.
> >
> > 3. NULL value storage in Table API.
> >Just set null to Row field value. To mark NULL value in serialized
> binary record data, normally it use extra flag for each field to mark
> whether its value is NULL, which would change the data layout of Row
> object. So any logic that access serialized Row data directly should
> updated to sync with new data layout, for example, many methods in
> RowComparator.
> >
> > Reference:
> > 1. Nulls: Nothing to worry about:
> http://www.oracle.com/technetwork/issue-archive/2005/05-jul/o45sql-097727.html
> .
> > 2. Null related functions:
> > https://oracle-base.com/articles/misc/null-related-functions
> >
> > -Original Message-
> > From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf
> > Of Stephan Ewen
> > Sent: Thursday, June 18, 2015 8:43 AM
> > To: dev@flink.apache.org
> > Subject: Re: The null in Flink
> >
> > Hi!
> >
> > I think we actually have two discussions here, both of them important:
> >
> > --
> > 1) Null values in the Programming Language APIs
> > --
> >
> > Fields in composite types may simply be null pointers.
> >
> > In object types:
> >- primitives members are naturally non-nullable
> >- all other members are nullable
> >
> > => If you want to avoid the overhead of nullability, go with primitive
> types.
> >
> > In Tuples, and derives types (Scala case classes):
> >- Fields are non-nullable.
> >
> > => The reason here is that we initially decided to keep tuples as a very
> fast data type. Because tuples cannot hold primitives in Java/Scala, we
> would not have a way to make fast non-nullable fields. The performance of
> nullable fields affects the key-operations, especially on normalized keys.
> > We can work around that with some effort, but have not one it so far.
> >
> > => In Scala, the Option types is a natural way of elegantly working
> around that.
> >
> >
> > --
> > 2) Null values in the high-level (logial) APIs
> > --
> >
> > This is mainly what Ted was referring to, if I understood him correctly.
> >
> > Here, we need to figure out what form of semantical null values in the
> Table API and later, in SQL.
> >
> > Besides deciding what semantics to follow here in the logical APIs, we
> need to decide what these values confert to/from when switching between
> logical/physical APIs.
> >
> >
> >
> >
> >
> >
> > On Mon, Jun 15, 2015 at 10:07 AM, Ted Dunning <ted.dunn...@gmail.com>
> wrote:
> >
> >> On Mon, Jun 15, 2015 at 8:45 AM, Maximilian Michels <m...@apache.org>
> >> wrote:
> >>
> >>> Just to give an idea what null values could cause in Flink:
> >> DataSet.count()
> >>> returns the number of elements of all values in a Dataset (null or
> >>> not) while #834 would ignore null values and aggregate the DataSet
> >>> without
> >> them.
> >> Compare R's na.action.
> >>
> >> http://www.ats.ucla.edu/stat/r/faq/missing.htm
> >>
>
>


Re: [DISCUSS] Release Flink 0.10.1 soon

2015-11-18 Thread Stephan Ewen
+1 for a timely 0.10.1 release

I would like to add FLINK-2974 - periodic kafka offset committer for case
where checkpointing is deactivated

On Wed, Nov 18, 2015 at 12:34 PM, Vasiliki Kalavri <
vasilikikala...@gmail.com> wrote:

> Hey,
>
> I would also add FLINK-3012 and FLINK-3036 (both pending PRs).
>
> Thanks!
> -Vasia.
>
> On 18 November 2015 at 12:24, Robert Metzger  wrote:
>
> > Hi,
> >
> > I was wondering whether we should release Flink 0.10.1 soon, as there are
> > some issues we've identified:
> >
> > (pending PRs)
> > - FLINK-3032: Flink does not start on Hadoop 2.7.1 (HDP), due to class
> > conflict
> > - FLINK-3011, 3019, 3028 Cancel jobs in RESTARTING state
> > - FLINK-3021 Fix class loading issue for streaming sources
> > - FLINK-2989 job cancel button doesn't work on YARN
> >
> > (merged)
> > - FLINK-2977 Using reflection to load HBase Kerberos tokens
> > - FLINK-3024 Fix TimestampExtractor.getCurrentWatermark() Behaviour
> > - FLINK-2967 Increase timeout for LOCAL_HOST address detection stratey
> > - FLINK-3025 [kafka consumer] Bump transitive ZkClient dependency
> >
> >
> > Anything else that you would like to add?
> >
> > Do you think we can manage to merge the four PRs until tomorrow and then
> > start the RC?
> >
>


Re: Streaming statefull operator with hashmap

2015-11-18 Thread Stephan Ewen
For initializing the Map manually, I meant making "null" the default value
and writing the code like

HashMap<InputType, MicroModel> map = state.value()
if (map == null) {
  map = new HashMap<>();
}

rather than expecting the state to always clone you a new empty map

On Thu, Nov 12, 2015 at 11:29 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> you can do it using the register* methods on StreamExecutionEnvironment.
> So, for example:
>
> // set up the execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.registerType(InputType.class);
> env.registerType(MicroModel.class);
>
> If you want to have custom Kryo Serializers for those types you can also
> do:
>
> env.registerTypeWithKryoSerializer(InputType.class,
> MyInputTypeSerializer.class);
>
> I hope this gets you on the right track. :D
>
> Cheers,
> Aljoscha
>
> > On 11 Nov 2015, at 21:14, Martin Neumann <mneum...@sics.se> wrote:
> >
> > Thanks for the help.
> >
> > TypeExtractor.getForObject(modelMapInit) did the job. Its possible that
> its
> > an IDE problem that .getClass() did not work. Intellij is a bit fiddly
> with
> > those things.
> >
> > 1) Making null the default value and initializing manually is probably
> more
> >> efficient, because otherwise the empty map would have to be cloned each
> >> time the default value is returned, which adds avoidable overhead.
> >
> >
> > What do you mean by initialize manually? Can I do that direct in the open
> > function or are we talking about checking for null in the FlatMap and
> > initializing there? In general the program is supposed to constantly run
> > once deployed, so I can get away with a little slower setup.
> >
> > 2) The HashMap type will most likely go through Kryo, so for efficiency,
> >> make sure you register the types "InputType" and "MicroModel" on the
> >> execution environment.
> >>Here you need to do that manually, because they are type erased and
> >> Flink cannot auto-register them.
> >
> >
> > Can you point me to an example on how to do this?
> >
> > cheers Martin
> >
> >
> > On Wed, Nov 11, 2015 at 4:52 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> >> It should suffice to do something like
> >>
> >> "getRuntimeContext().getKeyValueState("microModelMap", new
> >> HashMap<InputType,MicroModel>().getClass(), null);"
> >>
> >> Two more comments:
> >>
> >> 1) Making null the default value and initializing manually is probably
> more
> >> efficient, because otherwise the empty map would have to be cloned each
> >> time the default value is returned, which adds avoidable overhead.
> >>
> >> 2) The HashMap type will most likely go through Kryo, so for efficiency,
> >> make sure you register the types "InputType" and "MicroModel" on the
> >> execution environment.
> >>Here you need to do that manually, because they are type erased and
> >> Flink cannot auto-register them.
> >>
> >> Greetings,
> >> Stephan
> >>
> >>
> >>
> >> On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra <gyula.f...@gmail.com>
> wrote:
> >>
> >>> Hey,
> >>>
> >>> Yes what you wrote should work. You can alternatively use
> >>> TypeExtractor.getForObject(modelMapInit) to extract the tye
> information.
> >>>
> >>> I also like to implement my custom type info for Hashmaps and the other
> >>> types and use that.
> >>>
> >>> Cheers,
> >>> Gyula
> >>>
> >>> Martin Neumann <mneum...@sics.se> ezt írta (időpont: 2015. nov. 11.,
> >> Sze,
> >>> 16:30):
> >>>
> >>>> Hej,
> >>>>
> >>>> What is the correct way of initializing a state-full operator that is
> >>> using
> >>>> a hashmap? modelMapInit.getClass() does not work neither does
> >>>> HashMap.class. Do I have to implement my own TypeInformation class or
> >> is
> >>>> there a simpler way?
> >>>>
> >>>> cheers Martin
> >>>>
> >>>> private OperatorState<HashMap<InputType,MicroModel>> microModelMap;
> >>>>
> >>>> @Override
> >>>> public void open(Configuration parameters) throws Exception {
> >>>>HashMap<InputType,MicroModel> modelMapInit = new HashMap<>();
> >>>>this.microModelMap =
> >>>> getRuntimeContext().getKeyValueState("microModelMap",
> >>>> modelMapInit.getClass() , modelMapInit);
> >>>> }
> >>>>
> >>>
> >>
>
>


Re: Fixing the ExecutionConfig

2015-11-18 Thread Stephan Ewen
I had pretty much in mind what Aljoscha suggested.

On Thu, Nov 12, 2015 at 11:37 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> IMHO it’s not possible to have streaming/batch specific ExecutionConfig
> since the user functions share a common interface, i.e.
> getRuntimeContext().getExecutionConfig() simply returns the same type for
> both.
>
> What could be done is to migrate batch/streaming specific stuff to the
> ExecutionEnvironment and keep the ExecutionConfig strictly for stuff that
> applies to both execution modes.
> > On 12 Nov 2015, at 11:35, Maximilian Michels <m...@apache.org> wrote:
> >
> > +1 for separating concerns by having a StreamExecutionConfig and a
> > BatchExecutionConfig with inheritance from ExecutionConfig for general
> > options. Not sure about the pre-flight and runtime options. I think
> > they are ok in one config.
> >
> > On Wed, Nov 11, 2015 at 1:24 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
> >> I think now (before the 1.0 release) is the right time to clean it up.
> >>
> >> Are you suggesting to have two execution configs for batch and
> streaming?
> >>
> >> I'm not sure if we need to distinguish between pre-flight and runtime
> >> options: From a user's perspective, it doesn't matter. For example the
> >> serializer settings are evaluated during pre-flight but they have a
> impact
> >> during execution.
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Wed, Nov 11, 2015 at 11:59 AM, Stephan Ewen <se...@apache.org>
> wrote:
> >>
> >>> Hi all!
> >>>
> >>> The ExecutionConfig is a bit of a strange thing right now. It looks
> like it
> >>> became the place where everyone just put the stuff they want to somehow
> >>> push from the client to runtime, plus a random assortment of conflig
> flags.
> >>>
> >>> As a result:
> >>>
> >>>  - The ExecutionConfig is available in batch and streaming, but has a
> >>> number of fields that are very streaming specific, like the watermark
> >>> interval, etc.
> >>>
> >>>  - Several fields that are purely pre-flight time relevant are in
> there,
> >>> like whether to use the closure cleaner, or whether to force Avro or
> Kryo
> >>> serializers for POJOs.
> >>>
> >>> Any interest in cleaning this up? Because these messy classes simply
> grow
> >>> ever more messy unless we establish a proper definition of what its
> >>> concerns and non-concerns are...
> >>>
> >>> Greetings,
> >>> Stephan
> >>>
>
>


Re: [DISCUSS] Release Flink 0.10.1 soon

2015-11-20 Thread Stephan Ewen
Let me look at FLINK-2974 (open PR) to see if it can be merged...

On Thu, Nov 19, 2015 at 10:09 PM, Robert Metzger <rmetz...@apache.org>
wrote:

> Looks like we didn't manage to merge everything today.
>
> (pending PRs)
> - FLINK-3021 Fix class loading issue for streaming sources
> - FLINK-2974 Add periodic offset committer for Kafka
>
> (merged)
> - FLINK-2977 Using reflection to load HBase Kerberos tokens
> - FLINK-3024 Fix TimestampExtractor.getCurrentWatermark() Behaviour
> - FLINK-2967 Increase timeout for LOCAL_HOST address detection stratey
> - FLINK-3025 [kafka consumer] Bump transitive ZkClient dependency
> (merged since my first email)
> - FLINK-2989 job cancel button doesn't work on YARN
> - FLINK-3032: Flink does not start on Hadoop 2.7.1 (HDP), due to class
> conflict
> - FLINK-3011, 3019, 3028 Cancel jobs in RESTARTING state
>
>
> Is anybody volunteering to be a release manager?
>
>
> On Thu, Nov 19, 2015 at 10:16 AM, Suneel Marthi <smar...@apache.org>
> wrote:
>
> > Flink-3017, Flink-3022, and some hotfixes u had put in over the past few
> > days.
> >
> > Flink-3041 and Flink-3043 are pending PRs at the moment.
> >
> >
> > On Thu, Nov 19, 2015 at 4:10 AM, Till Rohrmann <till.rohrm...@gmail.com>
> > wrote:
> >
> > > If they cover things which are also wrongly documented in 0.10, then
> they
> > > should be merged to 0.10-release as well.
> > >
> > > On Thu, Nov 19, 2015 at 10:01 AM, Suneel Marthi <
> suneel.mar...@gmail.com
> > >
> > > wrote:
> > >
> > > > @Till Should the recent updates to docs/apis/streaming-guide.md be
> > also
> > > > merged to 'release 0.10'? There are other related PRs pending.
> > > >
> > > > On Thu, Nov 19, 2015 at 3:42 AM, Till Rohrmann <trohrm...@apache.org
> >
> > > > wrote:
> > > >
> > > > > Yes forgot about merging them to release-0.10. Will do it right
> away.
> > > > > On Nov 19, 2015 9:40 AM, "Robert Metzger" <rmetz...@apache.org>
> > wrote:
> > > > >
> > > > > > @Suneel, I think
> https://issues.apache.org/jira/browse/FLINK-2949
> > > is a
> > > > > new
> > > > > > feature. The 0.10.1 release is intended as a bugfix release only.
> > > > > >
> > > > > > FLINK-3013 and FLINK-3036 have been merged to master only. Are
> > there
> > > > > plans
> > > > > > to merge it to "release-0.10" as well?
> > > > > >
> > > > > > The list of issues now looks as follows:
> > > > > >
> > > > > > (pending PRs)
> > > > > > - FLINK-3032: Flink does not start on Hadoop 2.7.1 (HDP), due to
> > > class
> > > > > > conflict
> > > > > > - FLINK-3011, 3019, 3028 Cancel jobs in RESTARTING state
> > > > > > - FLINK-3021 Fix class loading issue for streaming sources
> > > > > > - FLINK-2974 Add periodic offset committer for Kafka
> > > > > >
> > > > > >
> > > > > > (merged)
> > > > > > - FLINK-2977 Using reflection to load HBase Kerberos tokens
> > > > > > - FLINK-3024 Fix TimestampExtractor.getCurrentWatermark()
> Behaviour
> > > > > > - FLINK-2967 Increase timeout for LOCAL_HOST address detection
> > > stratey
> > > > > > - FLINK-3025 [kafka consumer] Bump transitive ZkClient dependency
> > > > > > (merged since my first email)
> > > > > > - FLINK-2989 job cancel button doesn't work on YARN
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Nov 18, 2015 at 2:35 PM, Ufuk Celebi <u...@apache.org>
> > wrote:
> > > > > >
> > > > > > > @Suneel: I think that's OK for the next major release :)
> > > > > > >
> > > > > > > On Wed, Nov 18, 2015 at 2:17 PM, Suneel Marthi <
> > > > > suneel.mar...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Would be nice to have Flink-2949 in the mix, but I won't be
> > able
> > > to
> > > > > get
> > > > > > > to
> > > > > > > > it until early next week.
> > > > > > > >
> > > > > > > &

Re: [DISCUSS] Release Flink 0.10.1 soon

2015-11-20 Thread Stephan Ewen
>From my experience, frequent bugfix releases are highly appreciated by
users.

There are some pretty serious fixes people are waiting for, and we can
certainly do a 0.10.2  in a bit, if people find more issues.

On Fri, Nov 20, 2015 at 11:25 AM, Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi all,
>
> Wouldnt you think that it would make sense to wait a week or so to find all
> the hot issues with the current release?
>
> To me it feels a little bit like rushing this out and we will have almost
> the same situation afterwards.
>
> I might be wrong but I think people should get a chance to try this out.
>
> In any case I would +1 for the quick release if everyone else thinks thats
> the way, these are just my thoughts.
>
> Gyula
> On Fri, Nov 20, 2015 at 11:13 AM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> > Actually, I still have another bug related to the optimizer which I would
> > like to include if possible. The problem is that the optimizer is not
> able
> > to push properties properly out of a bulk iteration which in some cases
> can
> > lead to rejected Flink jobs.
> >
> > On Fri, Nov 20, 2015 at 11:10 AM, Robert Metzger <rmetz...@apache.org>
> > wrote:
> >
> > > Great, thank you!
> > >
> > > Let me know if there is any issue, I'll address it asap. The PR is not
> > > building anymore because you've pushed an update to the Kafka
> > > documentation. I can rebase and merge the PR once you give me green
> light
> > > ;)
> > >
> > > Till has merged FLINK-3021, so we might be able to have a first RC
> today.
> > >
> > >
> > > On Fri, Nov 20, 2015 at 11:05 AM, Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > > > Let me look at FLINK-2974 (open PR) to see if it can be merged...
> > > >
> > > > On Thu, Nov 19, 2015 at 10:09 PM, Robert Metzger <
> rmetz...@apache.org>
> > > > wrote:
> > > >
> > > > > Looks like we didn't manage to merge everything today.
> > > > >
> > > > > (pending PRs)
> > > > > - FLINK-3021 Fix class loading issue for streaming sources
> > > > > - FLINK-2974 Add periodic offset committer for Kafka
> > > > >
> > > > > (merged)
> > > > > - FLINK-2977 Using reflection to load HBase Kerberos tokens
> > > > > - FLINK-3024 Fix TimestampExtractor.getCurrentWatermark() Behaviour
> > > > > - FLINK-2967 Increase timeout for LOCAL_HOST address detection
> > stratey
> > > > > - FLINK-3025 [kafka consumer] Bump transitive ZkClient dependency
> > > > > (merged since my first email)
> > > > > - FLINK-2989 job cancel button doesn't work on YARN
> > > > > - FLINK-3032: Flink does not start on Hadoop 2.7.1 (HDP), due to
> > class
> > > > > conflict
> > > > > - FLINK-3011, 3019, 3028 Cancel jobs in RESTARTING state
> > > > >
> > > > >
> > > > > Is anybody volunteering to be a release manager?
> > > > >
> > > > >
> > > > > On Thu, Nov 19, 2015 at 10:16 AM, Suneel Marthi <
> smar...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Flink-3017, Flink-3022, and some hotfixes u had put in over the
> > past
> > > > few
> > > > > > days.
> > > > > >
> > > > > > Flink-3041 and Flink-3043 are pending PRs at the moment.
> > > > > >
> > > > > >
> > > > > > On Thu, Nov 19, 2015 at 4:10 AM, Till Rohrmann <
> > > > till.rohrm...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > If they cover things which are also wrongly documented in 0.10,
> > > then
> > > > > they
> > > > > > > should be merged to 0.10-release as well.
> > > > > > >
> > > > > > > On Thu, Nov 19, 2015 at 10:01 AM, Suneel Marthi <
> > > > > suneel.mar...@gmail.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > @Till Should the recent updates to docs/apis/
> > streaming-guide.md
> > > be
> > > > > > also
> > > > > > > > merged to 'release 0.10'? There are other related PRs
> pending.
> > > > > > > >
> > > > > > > > On 

Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Stephan Ewen
One addition: You can set the system to use "ingestion time", which gives
you event time with auto-generated timestamps and watermarks, based on the
time that the events are seen in the sources.

That way you have the same simplicity as processing time, and you get the
window alignment that Aljoscha described (second total max window has the
same elements as initial max-per-key window).

On Mon, Nov 23, 2015 at 12:49 PM, Aljoscha Krettek 
wrote:

> Hi,
> @Konstantin: are you using event-time or processing-time windows. If you
> are using processing time, then you can only do it the way Fabian
> suggested. The problem here is, however, that the .keyBy().reduce()
> combination would emit a new maximum for every element that arrives there
> and you never know when you saw the final element, i.e. the maximum.
>
> If you are using event-time, then you are indeed lucky because then you
> can use what Gyula suggested and you won’t have latency, if I’m correct.
> The reason is that the watermark that flushes out the windows in the first
> (keyed window) will also flush out the elements in the all-window. So the
> keyed window will do computations, send along the elements and then after
> it is done it will forward the watermark. This watermark will immediately
> trigger computation of the all-window for the same time period.
>
> Cheers,
> Aljoscha
> > On 23 Nov 2015, at 11:51, Gyula Fóra  wrote:
> >
> > Yes, you are right I think we should have some nice abstractions for
> doing this.
> >
> > Before the rewrite of the windowing runtime to support out-of-order
> events,  we had abstractions for supporting this but that code was not
> feasible from performance perspective.  (The result of a keyed window
> reduce used to be a window containing all the aggregates and one could then
> just aggregate again on the result without specifying the window again)
> >
> > Maybe we could implement similar abstractions on the new window runtime,
> I think that would be really awesome.
> >
> > Gyula
> >
> > Konstantin Knauf  ezt írta (időpont:
> 2015. nov. 23., H, 11:40):
> > Thanks!
> >
> > @Fabian: Yepp, but this still results in multiple outputs per window,
> > because the maximum is emitted for every key.
> >
> > @Gyula: Yepp, that's the second bullet point from my question ;) The way
> > I implemented it, it basically doubles the latency, because the
> > timeWindowAll has to wait for the next timeWindow before it can close
> > the previous one. So if the first timeWindow is 10s, it takes 20s until
> > you have a result, although it cant change after 10s. You know what I
> mean?
> >
> > Cheers,
> >
> > Konstantin
> >
> > On 23.11.2015 11:32, Gyula Fóra wrote:
> > > Hi,
> > >
> > > Alright it seems there are multiple ways of doing this.
> > >
> > > I would do something like:
> > >
> > > ds.keyBy(key)
> > > .timeWindow(w)
> > > .reduce(...)
> > > .timeWindowAll(w)
> > > .reduce(...)
> > >
> > > Maybe Aljoscha could jump in here :D
> > >
> > > Cheers,
> > > Gyula
> > >
> > > Fabian Hueske > ezt írta
> > > (időpont: 2015. nov. 23., H, 11:21):
> > >
> > > If you set the key to the time attribute, the "old" key is no
> longer
> > > valid.
> > > The streams are organized by time and only one aggregate for each
> > > window-time should be computed.
> > >
> > > This should do what you are looking for:
> > >
> > > DataStream
> > >   .keyBy(_._1) // key by orginal key
> > >   .timeWindow(..)
> > >   .apply(...)  // extract window end time: (origKey, time, agg)
> > >   .keyBy(_._2) // key by time field
> > >   .maxBy(_._3) // value with max agg field
> > >
> > > Best, Fabian
> > >
> > > 2015-11-23 11:00 GMT+01:00 Konstantin Knauf
> > >  >>:
> > >
> > > Hi Fabian,
> > >
> > > thanks for your answer. Yes, that's what I want.
> > >
> > > The solution you suggest is what I am doing right now (see last
> > > of the
> > > bullet point in my question).
> > >
> > > But given your example. I would expect the following output:
> > >
> > > (key: 1, w-time: 10, agg: 17)
> > > (key: 2, w-time: 10, agg: 20)
> > > (key: 1, w-time: 20, agg: 30)
> > > (key: 1, w-time: 20, agg: 30)
> > > (key: 1, w-time: 20, agg: 30)
> > >
> > > Because the reduce function is evaluated for every incoming
> > > event (i.e.
> > > each key), right?
> > >
> > > Cheers,
> > >
> > > Konstantin
> > >
> > > On 23.11.2015 10:47, Fabian Hueske wrote:
> > > > Hi Konstantin,
> > > >
> > > > let me first summarize to make sure I understood what you
> are looking for.
> > > > You computed an aggregate over a keyed event-time window and
> you are
> > > > looking for the maximum 

Re: how to write dataset in a file?

2015-11-22 Thread Stephan Ewen
You can configure the system to always create a directly (not just on
parallelism > 1),
 see "fs.output.always-create-directory"under
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#file-systems

The behavior we support right now is pretty much what people coming from
the Hadoop world are used to, that's why it behaves the way it does.

Greetings,
Stephan


On Sun, Nov 22, 2015 at 8:49 AM, jun aoki  wrote:

> Thank you guys for helping me understand!
> Precisely I was able to control the behavior on my research work with your
> help.
>
> Does anybody think, however, the behavior is not straightforward? (At least
> there is another guy on StackOverflow who misunderstand the same way I did)
>
> I'd like to ask the community if they like my suggestions
> 1. Make the method signatures writeAsText(String directoryPath) and
> writeAsCsv(String directoryPath) (not filePath but directoryPath) and they
> ALWAYS create a directory instead of sometimes a file and sometimes a
> directory depending on the sink's parallelism.
> This creates a directory and a sole "1" file is created even when
> parallelism is set to 1.
> This is more consistent and no confusion of what it says it does.
>
> 2. And create another methods called writeAsTextFile(String filePath) and
> writeAsCsvFile(String filePath) which ALWAYS create a file and there is no
> directory. In order to make this happen, either its sink's parallelism is
> implicitly set to 1, or collect all data from all workers into one dataset
> behind the scene.
>
> What do you guys think?
>
>
> On Sat, Nov 21, 2015 at 6:18 AM, Matthias J. Sax  wrote:
>
> > I would not set
> >
> > > ExecutionEnvironment env =
> > ExecutionEnvironment.createLocalEnvironment().setParallelism(1);
> >
> > because this changes the default parallelism of *all* operator to one.
> > Instead, only set the parallelism of the **sink** to one (as described
> > here:
> >
> >
> https://stackoverflow.com/questions/32580970/writeascsv-and-writeastext-is-unexpected/32581813#32581813
> > )
> >
> > filteredData.writeAsText("file:///output1.txt").setParallelism(1);
> >
> > -Matthias
> >
> > On 11/21/2015 02:23 PM, Márton Balassi wrote:
> > > Additionally as having multiple files under /output1.txt is standard in
> > the
> > > Hadoop ecosystem you can transparently read all the files with
> > > env.readTextFile("/output1.txt").
> > >
> > > You can also set parallelism on individual operators (e.g the file
> > writer)
> > > if you really need a single output.
> > >
> > > On Fri, Nov 20, 2015, 21:27 Suneel Marthi  wrote:
> > >
> > >> You can write to a single output file by setting parallelism == 1
> > >>
> > >>  So final ExecutionEnvironment env = ExecutionEnvironment.
> > >> createLocalEnvironment().setParallelism(1);
> > >>
> > >> The reason u see multiple output files is because, each worker is
> > writing
> > >> to a different file.
> > >>
> > >> On Fri, Nov 20, 2015 at 10:06 PM, jun aoki  wrote:
> > >>
> > >>> Hi Flink community
> > >>>
> > >>> I know I'm mistaken but could not find what I want.
> > >>>
> > >>> final ExecutionEnvironment env =
> > >>> ExecutionEnvironment.createLocalEnvironment();
> > >>> DataSet data = env.readTextFile("file:///text1.txt");
> > >>> FilterFunction filter = new MyFilterFunction();  // looks
> for a
> > >>> line starts with "[ERROR]"
> > >>> DataSet filteredData = data.filter(filter);
> > >>> filteredData.writeAsText("file:///output1.txt");
> > >>> env.execute();
> > >>>
> > >>> Then I expect to get a single file /output1.txt , but actually get
> > >>> /output1.txt/1, /output1.txt/2, /output1.txt/3...
> > >>> I assumed I was getting a single file because the method signature
> says
> > >>> writeAsText(String filePath).  <-- filePath instead of directoryPath
> > >>> Also the Javadoc comment sounds like I assumed right.
> > >>>
> > >>>
> > >>
> >
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354
> > >>>
> > >>> Can anyone tell if the method signature and document should be fixed?
> > or
> > >> if
> > >>> I am missing some configuration?
> > >>>
> > >>> --
> > >>> -jun
> > >>>
> > >>
> > >
> >
> >
>
>
> --
> -jun
>


Re: [VOTE] Release Apache Flink 0.10.1 (release-0.10.0-rc1)

2015-11-24 Thread Stephan Ewen
Hi Slava!

I think the problem with your build is the file handles. It shows in
various points:

Exception in thread "main" java.lang.InternalError:
java.io.FileNotFoundException:
/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/localedata.jar
(Too many open files in system)

Caused by: java.io.IOException: Too many open files in system
at sun.nio.ch.KQueueArrayWrapper.init(Native Method)
at sun.nio.ch.KQueueArrayWrapper.(KQueueArrayWrapper.java:98)
at sun.nio.ch.KQueueSelectorImpl.(KQueueSelectorImpl.java:87)
at 
sun.nio.ch.KQueueSelectorProvider.openSelector(KQueueSelectorProvider.java:42)
at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:126)


Can you check this on your system? I'll try to compile with the same flags
on my system as well...



On Tue, Nov 24, 2015 at 11:07 AM, Vyacheslav Zholudev <
vyacheslav.zholu...@gmail.com> wrote:

> I'm having trouble building release-0.10.1-rc1 with parameters:
> mvn clean install -Dhadoop.version=2.6.0.2.2.6.0-2800 -Pvendor-repos
>
> Env: maven 3, JDK 7, MacOS 10.10.5
>
> Attached maven log when it started to produce failing tests.
>
> P.S. I had to kill the build process since it got stuck (probably due to
> some long waiting interval)
>
> mvn.log
> <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/file/n9315/mvn.log
> >
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-0-10-1-release-0-10-0-rc1-tp9296p9315.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>


Re: Slinding Window Join (without duplicates)

2015-11-24 Thread Stephan Ewen
Since sessions are built per key, they have groups of keys that are close
enough together in time. They will, however, treat the closeness
transitively...

On Tue, Nov 24, 2015 at 11:33 AM, Matthias J. Sax <mj...@apache.org> wrote:

> Stephan is right. A tumbling window does not help. The last tuple of
> window n and the first tuple of window n+1 are "close" to each other and
> should be joined for example.
>
> From a SQL-like point of view this is a very common case expressed as:
>
> SELECT * FROM s1,s2 WHERE s1.key = s2.key AND |s1.ts - s2.ts| < window-size
>
> I would not expect to get any duplicates here.
>
> Basically, the window should move by one tuple (for each stream) and
> join with all tuples from the other stream that are within the time
> range (window size) were the ts of this new tuple define the boundaries
> of the window (ie, there are no "fixed" window boundaries as defined by
> a time-slide).
>
> Not sure how a "session window" can help here... I guess using most
> generic window API allows to define slide by one tuple and window size X
> seconds. But I don't know how duplicates could be avoided...
>
> -Matthias
>
> On 11/24/2015 11:04 AM, Stephan Ewen wrote:
> > I understand Matthias' point. You want to join elements that occur
> within a
> > time range of each other.
> >
> > In a tumbling window, you have strict boundaries and a pair of elements
> > that arrives such that one element is before the boundary and one after,
> > they will not join. Hence the sliding windows.
> >
> > What may be a solution here is a "session window" join...
> >
> > On Tue, Nov 24, 2015 at 10:33 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> >> Hi,
> >> I’m not sure this is a problem. If a user specifies sliding windows then
> >> one element can (and will) end up in several windows. If these are
> joined
> >> then there will be multiple results. If the user does not want multiple
> >> windows then tumbling windows should be used.
> >>
> >> IMHO, this is quite straightforward. But let’s see what others have to
> say.
> >>
> >> Cheers,
> >> Aljoscha
> >>> On 23 Nov 2015, at 20:36, Matthias J. Sax <mj...@apache.org> wrote:
> >>>
> >>> Hi,
> >>>
> >>> it seems that a join on the data streams with an overlapping sliding
> >>> window produces duplicates in the output. The default implementation
> >>> internally just use two nested-loops over both windows to compute the
> >>> result.
> >>>
> >>> How can duplicates be avoided? Is there any way after all right now? If
> >>> not, should be add this?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>
> >>
> >
>
>


Re: Slinding Window Join (without duplicates)

2015-11-24 Thread Stephan Ewen
I understand Matthias' point. You want to join elements that occur within a
time range of each other.

In a tumbling window, you have strict boundaries and a pair of elements
that arrives such that one element is before the boundary and one after,
they will not join. Hence the sliding windows.

What may be a solution here is a "session window" join...

On Tue, Nov 24, 2015 at 10:33 AM, Aljoscha Krettek 
wrote:

> Hi,
> I’m not sure this is a problem. If a user specifies sliding windows then
> one element can (and will) end up in several windows. If these are joined
> then there will be multiple results. If the user does not want multiple
> windows then tumbling windows should be used.
>
> IMHO, this is quite straightforward. But let’s see what others have to say.
>
> Cheers,
> Aljoscha
> > On 23 Nov 2015, at 20:36, Matthias J. Sax  wrote:
> >
> > Hi,
> >
> > it seems that a join on the data streams with an overlapping sliding
> > window produces duplicates in the output. The default implementation
> > internally just use two nested-loops over both windows to compute the
> > result.
> >
> > How can duplicates be avoided? Is there any way after all right now? If
> > not, should be add this?
> >
> >
> > -Matthias
> >
>
>


Re: withParameters() for Streaming API

2015-11-24 Thread Stephan Ewen
I was also thinking of deprecating that. With that, RichFunctions should
change "open(Configuration)" --> "open()".

Would be heavily API breaking, so bit hesitant there...

On Tue, Nov 24, 2015 at 2:48 PM, Timo Walther  wrote:

> Thanks for the hint Matthias.
> So actually the parameter of the open() method is useless? IMHO that does
> not look like a nice API design...
> We should try to keep DataSet and DataStream API in sync.
> Does it make sense to deprecate withParameters() for 1.0?
>
> Timo
>
>
> On 24.11.2015 14:31, Matthias J. Sax wrote:
>
>> We had this discussion a while ago.
>>
>> If I recall correctly, "withParameters()" is not encourage to be used in
>> DataSet either.
>>
>> This is the thread:
>>
>> https://mail-archives.apache.org/mod_mbox/flink-dev/201509.mbox/%3C55EC69CD.1070003%40apache.org%3E
>>
>> -Matthias
>>
>> On 11/24/2015 02:14 PM, Timo Walther wrote:
>>
>>> Hi all,
>>>
>>> I want to set the Configuration of a streaming operator and access it
>>> via the open method of the RichFunction.
>>> There is no possibility to set the Configuration of the open method at
>>> the moment, right? Can I open an issue for a withParameters() equivalent
>>> for the Stremaing API?
>>>
>>> Regards,
>>> Timo
>>>
>>>
>


Re: Add BigDecimal and BigInteger as types

2015-11-19 Thread Stephan Ewen
Ah, support as an efficient key types is a fair argument, yes!

On Thu, Nov 19, 2015 at 2:30 PM, Timo Walther <twal...@apache.org> wrote:

> I could image that some applications also want to group or join by a
> BigInteger or sort by BigDecimal. All DBMS support this types by default.
> I'm not from the industry but there is a need for that I think.
>
>
> On 18.11.2015 18:21, Stephan Ewen wrote:
>
>> I agree that they are important.
>>
>> They are currently generic types and handled by Kryo, which has (AFAIK)
>> proper serializers for them. Are there more benefits of native support
>> (other than more compact serialization) that you are thinking of?
>>
>> On Wed, Nov 18, 2015 at 5:55 PM, Timo Walther <twal...@apache.org> wrote:
>>
>> Hey everyone,
>>>
>>> I'm not sure if we already had a discussion about it but as we are
>>> currently adding new types like the Either type, I would like to discuss
>>> it
>>> again. I think especially for business or scientific applications it
>>> makes
>>> sense to support the BigInteger and BigDecimal types natively. In my
>>> opinion they are as important as Date or Void and should be added as
>>> BasicTypes. I need them for the SQL prototype (FLINK-2099) but I think
>>> people working with the Table API or Java/Scala API would also benefit
>>> from
>>> it.
>>>
>>> What do you think?
>>>
>>> Regards,
>>> Timo
>>>
>>>
>


Re: [DISCUSSION] Type hints versus TypeInfoParser

2015-11-19 Thread Stephan Ewen
Ah, I see. Sounds a bit corner case to me, actually.

On Thu, Nov 19, 2015 at 2:20 PM, Timo Walther <twal...@apache.org> wrote:

> All that you have mentioned is implemented in the TypeExtractor. I just
> mean corner cases e.g. if you have a POJO
>
> public class MyPojo {
> public Object field1;
> public Object field2;
> public Tuple2 field3;
> }
>
> Where the TypeExtractor can not analyze anything. Then you may want to
> provide the TypeInfo manually. TypeInfoParser makes it easy to specify the
> types of the fields of POJOs manually (but only as an internal feature).
> But as I said this just a corner case.
>
> Timo
>
>
>
> On 18.11.2015 18:43, Stephan Ewen wrote:
>
>> I think the TypeHints case can cover this:
>>
>> public class MyPojo<T, R> {
>>  public T field1;
>>  public R field2;
>> }
>>
>> If you say '.returns(new TypeHint<MyPojo<String, Double>>() {})' this
>> creates an anonymous subclass of the TypeHint, which has the types that T
>> and R bind to, which allows one to construct the POJO type info properly.
>> (Not sure if all that is implemented in the TypeExtractor, though).
>>
>> What do you think?
>>
>> Stephan
>>
>>
>>
>>
>> On Wed, Nov 18, 2015 at 6:03 PM, Timo Walther <twal...@apache.org> wrote:
>>
>> If the TypeExtractor is not able to handle the fields of a Pojo correctly,
>>> the String parser is quite useful to say
>>> "org.my.Pojo

Re: Weird test-source issue

2016-01-08 Thread Stephan Ewen
Hmm, strange issue indeed.

So, checkpoints are definitely triggered (log message by coordinator to
trigger checkpoint) but are not completing?
Can you check which is the first checkpoint to complete? Is it Checkpoint
1, or a later one (indicating that checkpoint 1 was somehow subsumed).

Can you check in the stacktrace on which lock the checkpoint runables are
waiting, and who is holding that lock?

Two thoughts:

1) What I mistakenly did once in one of my tests is to have the sleep() in
a downstream task. That would simply prevent the fast generated data
elements (and the inline checkpoint barriers) from passing though and
completing the checkpoint.

2) Is this another issue with the non-fair lock? Does the checkpoint
runnable simply not get the lock before the checkpoint. Not sure why it
would suddenly work after the failure. We could try and swap the lock
Object by a "ReentrantLock(true)" and see what would happen.


Stephan


On Fri, Jan 8, 2016 at 11:49 AM, Gyula Fóra  wrote:

> Hey,
>
> I have encountered a weird issue in a checkpointing test I am trying to
> write. The logic is the same as with the previous checkpointing tests,
> there is a OnceFailingReducer.
>
> My problem is that before the reducer fails, my job cannot take any
> snapshots. The Runnables executing the checkpointing logic in the sources
> keep waiting on some lock.
>
> After the failure and the restart, everything is fine and the checkpointing
> can succeed properly.
>
> Also if I remove the failure from the reducer, the job doesnt take any
> snapshots (waiting on lock) and the job will finish.
>
> Here is the code:
>
> https://github.com/gyfora/flink/blob/d1f12c2474413c9af357b6da33f1fac30549fbc3/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/TfileStateCheckpointingTest.java#L83
>
> I assume there is no problem with the source as the Thread.sleep(..) is
> outside of the synchronized block. (and as I said after the failure it
> works fine).
>
> Any ideas?
>
> Thanks,
> Gyula
>


Re: [DISCUSS] Git force pushing and deletion of branchs

2016-01-13 Thread Stephan Ewen
+1

Important to protect the master.

I think we should also protect Tags.

On Wed, Jan 13, 2016 at 11:36 AM, Gyula Fóra  wrote:

> +1 for protecting the master branch.
>
> I also don't see any reason why anyone should force push there
>
> Gyula
>
> Fabian Hueske  ezt írta (időpont: 2016. jan. 13., Sze,
> 11:07):
>
> > Hi everybody,
> >
> > Lately, ASF Infra has changed the write permissions of all Git
> repositories
> > twice.
> >
> > Originally, it was not possible to force into the master branch.
> > A few weeks ago, infra disabled also force pushing into other branches.
> >
> > Now, this has changed again after the issue was discussed with the ASF
> > board.
> > The current situation is the following:
> > - force pushing is allowed on all branched, including master
> > - branches and tags can be deleted (not sure if this applies as well for
> > the master branch)
> > - "the 'protected' portions of git to primarily focus on refs/tags/rel -
> > thus any tags under rel, will have their entire commit history."
> >
> > I am not 100% sure which exact parts of the repository are protected now
> as
> > I am not very much into the details of Git.
> > However, I believe we need to create new tags under rel for our previous
> > releases to protect them.
> >
> > In addition, I would like to propose to ask Infra to add protection for
> the
> > master branch. I can only recall very few situations where changes had to
> > be reverted. I am much more in favor of a reverting commit now and then
> > compared to a branch that can be arbitrarily changed.
> >
> > What do you think about this?
> >
> > Best, Fabian
> >
>


Re: Naive question

2016-01-12 Thread Stephan Ewen
@Chiwan: Is this still up to date from your experience?

https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/ide_setup.html

On Tue, Jan 12, 2016 at 12:04 PM, Chiwan Park <chiwanp...@apache.org> wrote:

> Hi Ram,
>
> Because there are some Scala IDE (Eclipse) plugins needed, I recommend to
> avoid `mvn eclipse:eclipse` command. Could you try just run `mvn clean
> install -DskipTests` and import the project to Scala IDE directly? In
> middle of importing process, Scala IDE suggests some plugins needed.
>
> And which version of Scala IDE you are using?
>
> > On Jan 12, 2016, at 7:58 PM, Vasudevan, Ramkrishna S <
> ramkrishna.s.vasude...@intel.com> wrote:
> >
> > Yes. I added it as Maven project only. I did mvn eclipse:eclipse to
> create the project and also built the code using mvn clean install
> -DskipTests.
> >
> > Regards
> > Ram
> >
> > -Original Message-
> > From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of
> Stephan Ewen
> > Sent: Tuesday, January 12, 2016 4:10 PM
> > To: dev@flink.apache.org
> > Subject: Re: Naive question
> >
> > Sorry to hear that it did not work out with Eclipse at all in the end,
> even with all adjustments.
> >
> > Just making sure: You imported Flink as a Maven project, not manually
> adding the big Flink dependency JAR?
> >
> > On Tue, Jan 12, 2016 at 5:15 AM, Vasudevan, Ramkrishna S <
> ramkrishna.s.vasude...@intel.com> wrote:
> >
> >> Thanks to all. I tried with Scala Eclipse IDE with all these
> >> 'change-scala-version.sh'. But in vain.
> >>
> >> So I switched over to Intellij and thing work fine over there. I am
> >> new to Intellij so will try using it.
> >>
> >> Once again thanks for helping me out.
> >>
> >> Regards
> >> Ram
> >>
> >> -Original Message-
> >> From: Chiwan Park [mailto:chiwanp...@apache.org]
> >> Sent: Monday, January 11, 2016 4:37 PM
> >> To: dev@flink.apache.org
> >> Subject: Re: Naive question
> >>
> >> Hi Ram,
> >>
> >> If you want to build Flink with Scala 2.10, just checkout Flink
> >> repository from github or download source code from homepage, run `mvn
> >> clean install -DskipTests` and import projects to your IDE. If you
> >> want to build Flink with Scala 2.11, you have to run
> >> `tools/change-scala-version.sh 2.11` before build the project. You can
> >> revert Scala version change by running `tools/change-scala-version.sh
> 2.10`.
> >>
> >> About IDE, Flink community recommends IntelliJ IDEA because Scala IDE
> >> have some problems in Java/Scala mixed project like Flink. But I
> >> tested importing Flink project with Scala IDE 4.3.0, Scala 2.11.7 and
> >> Flink 0.10.0 source code. Note that you should import the project as
> maven project.
> >>
> >> By the way, the community welcomes any questions. Please feel free to
> >> post questions. :)
> >>
> >>> On Jan 11, 2016, at 7:30 PM, Vasudevan, Ramkrishna S <
> >> ramkrishna.s.vasude...@intel.com> wrote:
> >>>
> >>> Thank you very much for the reply.
> >>> I tried different ways and when I tried setting up the root pom.xml
> >>> to
> >>> 2.11
> >>>
> >>>  2.11.6
> >>>  2.11
> >>>
> >>> I got the following error
> >>> [INFO]
> >>> 
> >>> --
> >>> -- [ERROR] Failed to execute goal on project flink-scala: Could not
> >>> resolve depende ncies for project
> >>> org.apache.flink:flink-scala:jar:1.0-SNAPSHOT: Could not find
> >>> artifact
> >>> org.scalamacros:quasiquotes_2.11:jar:2.0.1 in central
> >>> (http://repo.mave
> >>> n.apache.org/maven2) -> [Help 1]
> >>>
> >>> If I leave the scala.binary.verson to be at 2.10 and the scala
> >>> version to be at 2.11.6 then I get the following problem [INFO]
> >>> C:\flink\flink\flink-runtime\src\test\scala:-1: info: compiling
> >>> [INFO] Compiling 366 source files to
> >>> C:\flink\flink\flink-runtime\target\test-cl
> >>> asses at 1452508064750
> >>> [ERROR]
> >>> C:\flink\flink\flink-runtime\src\test\scala\org\apache\flink\runtime
> >>> \j
> >>> ob
> >>> manager\JobManagerITCase.scala:700: error: can't expa

Re: Naive question

2016-01-12 Thread Stephan Ewen
Sorry to hear that it did not work out with Eclipse at all in the end, even
with all adjustments.

Just making sure: You imported Flink as a Maven project, not manually
adding the big Flink dependency JAR?

On Tue, Jan 12, 2016 at 5:15 AM, Vasudevan, Ramkrishna S <
ramkrishna.s.vasude...@intel.com> wrote:

> Thanks to all. I tried with Scala Eclipse IDE with all these
> 'change-scala-version.sh'. But in vain.
>
> So I switched over to Intellij and thing work fine over there. I am new to
> Intellij so will try using it.
>
> Once again thanks for helping me out.
>
> Regards
> Ram
>
> -Original Message-
> From: Chiwan Park [mailto:chiwanp...@apache.org]
> Sent: Monday, January 11, 2016 4:37 PM
> To: dev@flink.apache.org
> Subject: Re: Naive question
>
> Hi Ram,
>
> If you want to build Flink with Scala 2.10, just checkout Flink repository
> from github or download source code from homepage, run `mvn clean install
> -DskipTests` and import projects to your IDE. If you want to build Flink
> with Scala 2.11, you have to run `tools/change-scala-version.sh 2.11`
> before build the project. You can revert Scala version change by running
> `tools/change-scala-version.sh 2.10`.
>
> About IDE, Flink community recommends IntelliJ IDEA because Scala IDE have
> some problems in Java/Scala mixed project like Flink. But I tested
> importing Flink project with Scala IDE 4.3.0, Scala 2.11.7 and Flink 0.10.0
> source code. Note that you should import the project as maven project.
>
> By the way, the community welcomes any questions. Please feel free to post
> questions. :)
>
> > On Jan 11, 2016, at 7:30 PM, Vasudevan, Ramkrishna S <
> ramkrishna.s.vasude...@intel.com> wrote:
> >
> > Thank you very much for the reply.
> > I tried different ways and when I tried setting up the root pom.xml to
> > 2.11
> >
> >   2.11.6
> >   2.11
> >
> > I got the following error
> > [INFO]
> > --
> > -- [ERROR] Failed to execute goal on project flink-scala: Could not
> > resolve depende ncies for project
> > org.apache.flink:flink-scala:jar:1.0-SNAPSHOT: Could not find artifact
> > org.scalamacros:quasiquotes_2.11:jar:2.0.1 in central
> > (http://repo.mave
> > n.apache.org/maven2) -> [Help 1]
> >
> > If I leave the scala.binary.verson to be at 2.10 and the scala version
> > to be at 2.11.6 then I get the following problem [INFO]
> > C:\flink\flink\flink-runtime\src\test\scala:-1: info: compiling [INFO]
> > Compiling 366 source files to
> > C:\flink\flink\flink-runtime\target\test-cl
> > asses at 1452508064750
> > [ERROR]
> > C:\flink\flink\flink-runtime\src\test\scala\org\apache\flink\runtime\j
> > ob
> > manager\JobManagerITCase.scala:700: error: can't expand macros
> > compiled by previ ous versions of Scala
> > [ERROR]   assert(cachedGraph2.isArchived)
> > [ERROR]   ^
> >
> > So am not pretty sure how to proceed with this. If I try to change the
> version of scala to 2.10 in the IDE then I get lot of compilation issues.
> IS there any way to over come this?
> >
> > Once again thanks a lot and apologies for the naïve question.
> >
> > Regards
> > Ram
> > -Original Message-
> > From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf
> > Of Stephan Ewen
> > Sent: Friday, January 8, 2016 5:01 PM
> > To: dev@flink.apache.org
> > Subject: Re: Naive question
> >
> > Hi!
> >
> > This looks like a mismatch between the Scala dependency in Flink and
> > Scala in your Eclipse. Make sure you use the same for both. By
> > default, Flink reference Scala 2.10
> >
> > If your IDE is set up for Scala 2.11, set the Scala version variable
> > in the Flink root pom.xml also to 2.11
> >
> > Greetings,
> > Stephan
> >
> >
> >
> >
> > On Fri, Jan 8, 2016 at 12:06 PM, Vasudevan, Ramkrishna S <
> ramkrishna.s.vasude...@intel.com> wrote:
> >
> >> I have been trying to install, learn and understand Flink. I am using
> >> Scala- EclipseIDE as my IDE.
> >>
> >> I have downloaded the flink source coded, compiled and created the
> project.
> >>
> >> My work laptop is Windows based and I don't have eclipse based
> >> workstation but I do have linux boxes for running and testing things.
> >>
> >> Some of the examples given in Flink source code do run directly from
> >> Eclipse but when I try to run the Wordcoun

Re: New to Apache Flink

2016-01-12 Thread Stephan Ewen
Actually, the latest master should have this fix. The Dashboard ignores the
log fiel if not found...

On Tue, Jan 12, 2016 at 10:32 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Arjun,
>
> yes, it is possible to start the web dashboard also when running jobs from
> the IDE.
> It is a bit hacky though...
>
> You can do it by creating a LocalStreamExecutionEnvironment as follows:
>
> val config = new Configuration()
> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY,
> "/path/to/some/random/text/file")
> new LocalStreamEnvironment(config)
>
> The dashboard will try to access the job manager log file and terminate if
> it cannot be found.
> Since the JM log file is not present when running from the IDE, we need to
> point it to some other (text) file.
> I hope this will be fixed soon.
>
> Best, Fabian
>
>
>
> 2016-01-12 2:27 GMT+01:00 Arjun Rao <sporty.ar...@gmail.com>:
>
> > Thanks for the replies and help. Stephan, the Maven shortcut worked like
> a
> > charm :).
> >
> > - As for the 50ms window duration, when I was running the WindowWordCount
> > example with a duration of 5ms, I encountered this error stack trace:
> >
> > Exception in thread "main" java.lang.IllegalArgumentException: Window
> > length must be at least 50 msecs
> > at
> >
> >
> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.(AbstractAlignedProcessingTimeWindowOperator.java:84)
> > at
> >
> >
> org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator.(AggregatingProcessingTimeWindowOperator.java:40)
> > at
> >
> >
> org.apache.flink.streaming.api.datastream.WindowedStream.createFastTimeOperatorIfValid(WindowedStream.java:616)
> > at
> >
> >
> org.apache.flink.streaming.api.datastream.WindowedStream.reduce(WindowedStream.java:146)
> > at
> >
> >
> org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:556)
> > at
> >
> >
> org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373)
> >
> > - As for logging separate datastreams individually, the reason I ask is
> for
> > debugging purposes. Let us say that I have 5 different streaming jobs
> and 1
> > taskmanager. If run out of the box, all these 5 jobs will log to the same
> > taskmanager log. My question is, is there a way to configure the logging
> > framework( or some other component perhaps) in such a way, that we are
> able
> > to log the different jobs separately to their own files, so it can be
> > easier to debug?
> >
> > - With respect to web page display in embedded mode, I apologize for the
> > confusion in terminology. When I say embedded, I mean running the flink
> > jobs through my IDE, with the flink jars in my local classpath. Is there
> a
> > way to look at the web page in this mode? Without having to deploy the
> > application jar to the flink cluster. Especially for  the DataStream
> mode.
> >
> > Thanks!
> >
> > Best,
> > Arjun
> >
> > On Thu, Jan 7, 2016 at 10:37 AM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > For the generated classes like
> > > "org.apache.flink.api.io.avro.generated.Address;",
> > > IntelliJ has a shortcut: Just right-click on the project, select "Maven
> > ->
> > > generate sources and update folders".
> > >
> > > That should do the trick...
> > >
> > > On Thu, Jan 7, 2016 at 11:55 AM, Till Rohrmann <trohrm...@apache.org>
> > > wrote:
> > >
> > > > Hi Arjun,
> > > >
> > > > welcome to the Flink community :-)
> > > >
> > > > On Thu, Jan 7, 2016 at 5:40 AM, Arjun Rao <sporty.ar...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I am new to Apache Flink and I really like the look of the API. I
> > have
> > > > been
> > > > > working with Storm for the past year and have questions about the
> > > > > DataStream API among others.
> > > > >
> > > > > 1. What are the interactions of the actor system in the flink
> > > ecosystem?
> > > > > Where can I find more information?
> > > > >
> > > >
> > > > Actors are used internally by the system for the communication
> between
> >

Re: [PROPOSAL] Structure the Flink Open Source Development

2016-06-08 Thread Stephan Ewen
I am adding a dedicated component for "Checkpointing". It would include the
checkpoint coordinator, barriers, threads, state handles and recovery.

I think that part is big and complex enough to warrant its own shepherd. I
would volunteer for that and be happy to also have a second shepherd.

On Tue, Jun 7, 2016 at 7:51 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Okay, it seems that we agree on the Shepherd name.
>
> Also, it seems that everyone agrees to the proposed shepherds so far.
>
> The "Client" component still needs a shepherd. Are there any volunteers?
>
> On Fri, Jun 3, 2016 at 12:07 PM, Chiwan Park <chiwanp...@apache.org>
> wrote:
>
> > Hi all,
> >
> > +1 for shepherd
> > I would like to add me to shepherd for FlinkML.
> >
> > Regards,
> > Chiwan Park
> >
> > > On Jun 3, 2016, at 3:29 AM, Henry Saputra <henry.sapu...@gmail.com>
> > wrote:
> > >
> > > +1 for shepherd
> > >
> > > I would prefer using that term rather than maintainer. It is being used
> > in
> > > Incubator PMC to help them keeping healthy development in podlings.
> > >
> > > The term "maintainer" kind of being scrutinized in ASF communities, in
> > > recent episodes happening in Spark community.
> > >
> > > - Henry
> > >
> > > On Wed, Jun 1, 2016 at 12:00 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > >> I like the name "shepherd". It implies a non-authorative role, and
> > implies
> > >> guidance, which is very fitting.
> > >>
> > >> I also thing there is no problem with having a "component shepherd"
> and
> > a
> > >> "pull request shepherd".
> > >>
> > >> Stephan
> > >>
> > >>
> > >> On Wed, Jun 1, 2016 at 7:11 PM, Fabian Hueske <fhue...@gmail.com>
> > wrote:
> > >>
> > >>> I think calling the role maintainer is not a good idea.
> > >>> The Spark community had a maintainer process which they just voted to
> > >>> remove. From my understanding, a maintainer in Spark had a more
> active
> > >> role
> > >>> than the role we are currently discussing.
> > >>>
> > >>> I would prefer to not call the role "maintainer" to make clear that
> the
> > >>> responsibilities are different (less active) and mainly observing.
> > >>>
> > >>> 2016-06-01 13:14 GMT+02:00 Ufuk Celebi <u...@apache.org>:
> > >>>
> > >>>> Thanks! I like the idea of renaming it.  I'm fine with shepherd and
> I
> > >>>> also like Vasia's suggestion "champion".
> > >>>>
> > >>>> I would like to add "Distributed checkpoints" as a separate
> component
> > >>>> to track development for check- and savepoints.
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Wed, Jun 1, 2016 at 10:59 AM, Aljoscha Krettek <
> > aljos...@apache.org
> > >>>
> > >>>> wrote:
> > >>>>> Btw, in Jira, if we clean up our components we can also set a
> > >> component
> > >>>>> Lead that would get notified of issues for that component.
> > >>>>>
> > >>>>> On Wed, 1 Jun 2016 at 10:43 Chesnay Schepler <ches...@apache.org>
> > >>> wrote:
> > >>>>>
> > >>>>>> I'd also go with maintainer.
> > >>>>>>
> > >>>>>> On 01.06.2016 10:32, Aljoscha Krettek wrote:
> > >>>>>>> Hi,
> > >>>>>>> I think maintainer is also fine if we clearly specify that they
> > >> are
> > >>>> not
> > >>>>>>> meant as dictators or gatekeepers of the component that they are
> > >>>>>>> responsible for.
> > >>>>>>>
> > >>>>>>> -Aljoscha
> > >>>>>>>
> > >>>>>>> On Wed, 1 Jun 2016 at 09:48 Vasiliki Kalavri <
> > >>>> vasilikikala...@gmail.com>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi,
> > >>>>>>>>
> > >>>>>>>> we could go for something like "sponsor"

Re: Broadcast data sent increases with # slots per TM

2016-06-09 Thread Stephan Ewen
Till is right. Broadcast joins currently materialize once per slot.
Originally, the purely push based runtime was not good enough to handle it
differently.

By now, we could definitely handle BC Vars differently (only one slot per
TM requests).
For BC Joins, the hash tables do not coordinate spilling currently, which
means that we cannot do multiple joins through the same hash table.


On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann  wrote:

> If I'm not mistaken, then broadcast variables and broadcast inputs of joins
> follow different code paths. Broadcast variables use additional input
> channels and are read before the actual driver code runs. In contrast to
> that, a join operation is a two input operator where the join driver
> decides how to handle the inputs (which one to read first as build input).
>
> This also entails that the broadcast variable optimization, where each task
> manager holds the data only once and copies of the data are discarded (but
> they are transmitted multiple times to the TM), does not apply to the
> broadcast join inputs. Here you should see an slightly worse performance
> degradation with your initial benchmark if you increase the number of
> slots.
>
> Cheers,
> Till
>
> On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
> alexander.s.alexand...@gmail.com> wrote:
>
> > > As far as I know, the reason why the broadcast variables are
> implemented
> > that way is that the senders would have to know which sub-tasks are
> > deployed to which TMs.
> >
> > As the broadcast variables are realized as additionally attached
> "broadcast
> > channels", I am assuming that the same behavior will apply for broadcast
> > joins as well.
> >
> > Is this the case?
> >
> > Regards,
> > Alexander
> >
> >
> > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas :
> >
> > > Hi Till,
> > >
> > > thanks for the fast answer.
> > > I'll think about a concrete way of implementing and open an JIRA.
> > >
> > > Best
> > > Andreas
> > > 
> > > Von: Till Rohrmann 
> > > Gesendet: Mittwoch, 8. Juni 2016 15:53
> > > An: dev@flink.apache.org
> > > Betreff: Re: Broadcast data sent increases with # slots per TM
> > >
> > > Hi Andreas,
> > >
> > > your observation is correct. The data is sent to each slot and the
> > > receiving TM only materializes one copy of the data. The rest of the
> data
> > > is discarded.
> > >
> > > As far as I know, the reason why the broadcast variables are
> implemented
> > > that way is that the senders would have to know which sub-tasks are
> > > deployed to which TMs. Only then, you can decide for which sub-tasks
> you
> > > can send the data together. Since the output emitters are agnostic to
> the
> > > actual deployment, the necessary information would have to be forwarded
> > to
> > > them.
> > >
> > > Another problem is that if you pick one of the sub-tasks to receive the
> > > broadcast set, then you have to make sure, that this sub-task has read
> > and
> > > materialized the broadcast set before the other sub-tasks start
> working.
> > > One could maybe send to one sub-task first the broadcast set and then
> to
> > > all other sub-tasks, after one has sent the BC set, a kind of
> acknowledge
> > > record. That way, the other sub-tasks would block until the broadcast
> set
> > > has been completely transmitted. But here one has to make sure that the
> > > sub-task receiving the BC set has been deployed and is not queued up
> for
> > > scheduling.
> > >
> > > So there are some challenges to solve in order to optimize the BC sets.
> > > Currently, there is nobody working on it. If you want to start working
> on
> > > it, then I would recommend to open a JIRA and start writing a design
> > > document for it.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <
> > andreas.ku...@tu-berlin.de
> > > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > > we experience some unexpected increase of data sent over the network
> > for
> > > > broadcasts with increasing number of slots per Taskmanager.
> > > >
> > > >
> > > > We provided a benchmark [1]. It not only increases the size of data
> > sent
> > > > over the network but also hurts performance as seen in the
> preliminary
> > > > results below. In this results cloud-11 has 25 nodes and ibm-power
> has
> > 8
> > > > nodes with scaling the number of slots per node from 1 - 16.
> > > >
> > > >
> > > > +---+--+-+
> > > > | suite | name | median_time |
> > > > +===+==+=+
> > > > | broadcast.cloud-11| broadcast.01 |8796 |
> > > > | broadcast.cloud-11| broadcast.02 |   14802 |
> > > > | broadcast.cloud-11| broadcast.04 |   30173 |
> > > > | broadcast.cloud-11| broadcast.08 |   56936 |
> > > > | broadcast.cloud-11| broadcast.16 |  

Re: [PROPOSAL] Structure the Flink Open Source Development

2016-06-09 Thread Stephan Ewen
Should state bakends and checkpointing go together?

The two of us could be shepherds for that. Till would be another person
(but he has a lot of components already).

On Wed, Jun 8, 2016 at 9:22 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> I think it would make sense to also move "State Backends" out from
> "Runtime". This is also quite complex on it's own. I would of course
> volunteer for this and I think Stephan, who is the current proposal for
> "Runtime" would also be good.
>
> On Wed, 8 Jun 2016 at 19:22 Stephan Ewen <se...@apache.org> wrote:
>
> > I am adding a dedicated component for "Checkpointing". It would include
> the
> > checkpoint coordinator, barriers, threads, state handles and recovery.
> >
> > I think that part is big and complex enough to warrant its own shepherd.
> I
> > would volunteer for that and be happy to also have a second shepherd.
> >
> > On Tue, Jun 7, 2016 at 7:51 PM, Robert Metzger <rmetz...@apache.org>
> > wrote:
> >
> > > Okay, it seems that we agree on the Shepherd name.
> > >
> > > Also, it seems that everyone agrees to the proposed shepherds so far.
> > >
> > > The "Client" component still needs a shepherd. Are there any
> volunteers?
> > >
> > > On Fri, Jun 3, 2016 at 12:07 PM, Chiwan Park <chiwanp...@apache.org>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > +1 for shepherd
> > > > I would like to add me to shepherd for FlinkML.
> > > >
> > > > Regards,
> > > > Chiwan Park
> > > >
> > > > > On Jun 3, 2016, at 3:29 AM, Henry Saputra <henry.sapu...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > +1 for shepherd
> > > > >
> > > > > I would prefer using that term rather than maintainer. It is being
> > used
> > > > in
> > > > > Incubator PMC to help them keeping healthy development in podlings.
> > > > >
> > > > > The term "maintainer" kind of being scrutinized in ASF communities,
> > in
> > > > > recent episodes happening in Spark community.
> > > > >
> > > > > - Henry
> > > > >
> > > > > On Wed, Jun 1, 2016 at 12:00 PM, Stephan Ewen <se...@apache.org>
> > > wrote:
> > > > >
> > > > >> I like the name "shepherd". It implies a non-authorative role, and
> > > > implies
> > > > >> guidance, which is very fitting.
> > > > >>
> > > > >> I also thing there is no problem with having a "component
> shepherd"
> > > and
> > > > a
> > > > >> "pull request shepherd".
> > > > >>
> > > > >> Stephan
> > > > >>
> > > > >>
> > > > >> On Wed, Jun 1, 2016 at 7:11 PM, Fabian Hueske <fhue...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >>> I think calling the role maintainer is not a good idea.
> > > > >>> The Spark community had a maintainer process which they just
> voted
> > to
> > > > >>> remove. From my understanding, a maintainer in Spark had a more
> > > active
> > > > >> role
> > > > >>> than the role we are currently discussing.
> > > > >>>
> > > > >>> I would prefer to not call the role "maintainer" to make clear
> that
> > > the
> > > > >>> responsibilities are different (less active) and mainly
> observing.
> > > > >>>
> > > > >>> 2016-06-01 13:14 GMT+02:00 Ufuk Celebi <u...@apache.org>:
> > > > >>>
> > > > >>>> Thanks! I like the idea of renaming it.  I'm fine with shepherd
> > and
> > > I
> > > > >>>> also like Vasia's suggestion "champion".
> > > > >>>>
> > > > >>>> I would like to add "Distributed checkpoints" as a separate
> > > component
> > > > >>>> to track development for check- and savepoints.
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> On Wed, Jun 1, 2016 at 10:59 AM, Aljoscha Krettek <
> > > > aljos...@apache.org
> >

Adding a Histogram Metric

2016-06-10 Thread Stephan Ewen
A recent discussion brought up the point of adding a "histogram" metric
type to Flink. This open thread is to gather some more of the requirements
for that metric.

The most important question is whether users need Flink to offer specific
implementations of "Histogram", like for example the "
com.codahale.metrics.Histogram", or if a "org.apache.flink.metrics.Histogram"
interface would work as well.
The histogram could still be reported for example via dropwizard reporters.

*Option (1):* If a Flink Histogram works as well, it would be simple to add
one. The dropwizard reporter would need to wrap the Flink Histogram for
reporting.

*Option (2)*: If the code needs the specific Dropwizard Histogram type,
then one would need a wrapper class that makes a Flink Histogram look like
a dropwizard histogram.

--

As a bit of background for the discussion, here are some thoughts behind
the way that Metrics are currently implemented in Flink.

  - The metric types in Flink are independent from libraries like
"dropwizard" to reduce dependencies and retain freedom to swap
implementations.

  - Metric reporting allows to reuse reporters from dropwizard

  - Some Flink metric implementations are also more lightweight than for
example in dropwizard. Counters for example are not thread safe, but do not
impose memory barriers. That is important for metrics deep in the streaming
runtime.


Re: incremental Checkpointing , Rocksdb HA

2016-06-10 Thread Stephan Ewen
Hi!

The incremental checkpointing is still being worked upon. Aljoscha, Till
and me have thought through this a lot and have now a pretty good
understanding how we want to do it with respect to coordination,
savepoints, restore, garbage collecting unneeded checkpoints, etc.

We want to put this into a design doc as soon as possible, and we'd be
happy to take input and discussion on the design. Please stay tuned for a
little bit...

Greetings,
Stephan


On Thu, Jun 9, 2016 at 8:42 PM, Nick Dimiduk  wrote:

> IIRC, all the above support data locality from back in the MR days. Not
> sure how much data you're planning to checkpoint though -- is locality
> really that important for transient processor state?
>
> On Thu, Jun 9, 2016 at 11:06 AM, CPC  wrote:
>
> > Cassandra backend would be interesting especially  if flink could benefit
> > from cassandra data locality. Cassandra/spark integration is using this
> for
> > information to schedule spark tasks.
> >
> > On 9 June 2016 at 19:55, Nick Dimiduk  wrote:
> >
> > > You might also consider support for a Bigtable
> > > backend: HBase/Accumulo/Cassandra. The data model should be similar
> > > (identical?) to RocksDB and you get HA, recoverability, and support for
> > > really large state "for free".
> > >
> > > On Thursday, June 9, 2016, Chen Qin  wrote:
> > >
> > > > Hi there,
> > > >
> > > > What is progress on incremental checkpointing? Does flink dev has
> plan
> > to
> > > > work on this or JIRA to track this? super interested to know.
> > > >
> > > > I also research and consider use rocksdbstatebackend without running
> > HDFS
> > > > cluster nor talk to S3. Some primitive idea is to use ZK to store /
> > > notify
> > > > state propagation progress and propagate via implement chain
> > replication
> > > on
> > > > top of YARN provisioned storage node.
> > > >
> > > > Thanks,
> > > > Chen
> > > >
> > >
> >
>


Re: Adding a Histogram Metric

2016-06-13 Thread Stephan Ewen
I think it is totally fine to add a "ThreadsafeCounter" that uses an atomic
long internally

On Sat, Jun 11, 2016 at 7:25 PM, Steve Cosenza <scose...@twitter.com> wrote:

> Also, we may be able to avoid the need for concurrent metrics by
> configuring each Finagle source to use a single thread. We'll investigate
> the performance implications this week and update you with the results.
>
> Thanks,
> Steve
>
>
> On Friday, June 10, 2016, Steve Cosenza <scose...@twitter.com> wrote:
>
>> +Chris Hogue who is also working on operationalizing Flink with me
>>
>> Hi Stephan,
>>
>> Thanks for the background on your current implementations!
>>
>> While we don't require a specific implementation for histogram, counter,
>> or gauge, it just became clear that we'll need threadsafe versions of all
>> three of these metrics. This is because our messaging source is implemented
>> using Finagle, and Finagle expects to be able to emit stats concurrently
>> from its managed threads.
>>
>> That being said, if adding threadsafe versions of the Flink counters is
>> not an option, we'd also be fine with directly reading and writing from the
>> singleton Codahale MetricsRegistry that you start up in each TaskManager.
>>
>> Thanks,
>> Steve
>>
>> On Fri, Jun 10, 2016 at 7:10 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> A recent discussion brought up the point of adding a "histogram" metric
>>> type to Flink. This open thread is to gather some more of the requirements
>>> for that metric.
>>>
>>> The most important question is whether users need Flink to offer
>>> specific implementations of "Histogram", like for example the "
>>> com.codahale.metrics.Histogram", or if a "
>>> org.apache.flink.metrics.Histogram" interface would work as well.
>>> The histogram could still be reported for example via dropwizard
>>> reporters.
>>>
>>> *Option (1):* If a Flink Histogram works as well, it would be simple to
>>> add one. The dropwizard reporter would need to wrap the Flink Histogram for
>>> reporting.
>>>
>>> *Option (2)*: If the code needs the specific Dropwizard Histogram type,
>>> then one would need a wrapper class that makes a Flink Histogram look like
>>> a dropwizard histogram.
>>>
>>> --
>>>
>>> As a bit of background for the discussion, here are some thoughts behind
>>> the way that Metrics are currently implemented in Flink.
>>>
>>>   - The metric types in Flink are independent from libraries like
>>> "dropwizard" to reduce dependencies and retain freedom to swap
>>> implementations.
>>>
>>>   - Metric reporting allows to reuse reporters from dropwizard
>>>
>>>   - Some Flink metric implementations are also more lightweight than for
>>> example in dropwizard. Counters for example are not thread safe, but do not
>>> impose memory barriers. That is important for metrics deep in the streaming
>>> runtime.
>>>
>>>
>>>
>>
>
> --
> -Steve
>
> Sent from Gmail Mobile
>


Re: Side-effects of DataSet::count

2016-05-30 Thread Stephan Ewen
Hi Eron!

Yes, the idea is to actually switch all executions to a backtracking
scheduling mode. That simultaneously solves both fine grained recovery and
lazy execution, where later stages build on prior stages.

With all the work around streaming, we have not gotten to this so far, but
it is one feature still in the list...

Greetings,
Stephan


On Mon, May 30, 2016 at 9:55 PM, Eron Wright  wrote:

> Thinking out loud now…
>
> Is the job graph fully mutable?   Can it be cleared?   For example,
> shouldn’t the count method remove the sink after execution completes?
>
> Can numerous job graphs co-exist within a single driver program?How
> would that relate to the session concept?
>
> Seems the count method should use ‘backtracking’ schedule mode, and only
> execute the minimum needed to materialize the count sink.
>
> > On May 29, 2016, at 3:08 PM, Márton Balassi 
> wrote:
> >
> > Hey Eron,
> >
> > Yes, DataSet#collect and count methods implicitly trigger a JobGraph
> > execution, thus they also trigger writing to any previously defined
> sinks.
> > The idea behind this behavior is to enable interactive querying (the one
> > that you are used to get from a shell environment) and it is also a great
> > debugging tool.
> >
> > Best,
> >
> > Marton
> >
> > On Sun, May 29, 2016 at 11:28 PM, Eron Wright  wrote:
> >
> >> I was curious as to how the `count` method on DataSet worked, and was
> >> surprised to see that it executes the entire program graph.   Wouldn’t
> this
> >> cause undesirable side-effects like writing to sinks?Also strange
> that
> >> the graph is mutated with the addition of a sink (that isn’t
> subsequently
> >> removed).
> >>
> >> Surveying the Flink code, there aren’t many situations where the program
> >> graph is implicitly executed (`collect` is another).   Nonetheless, this
> >> has deepened my appreciation for how dynamic the application might be.
> >>
> >> // DataSet.java
> >> public long count() throws Exception {
> >>   final String id = new AbstractID().toString();
> >>
> >>   output(new Utils.CountHelper(id)).name("count()");
> >>
> >>   JobExecutionResult res = getExecutionEnvironment().execute();
> >>   return res. getAccumulatorResult(id);
> >> }
> >> Eron
>
>


Re: Side-effects of DataSet::count

2016-05-31 Thread Stephan Ewen
Hi!

There was some preliminary work on this. By now, the requirements have
grown a bit. The backtracking needs to handle

  - Scheduling for execution (the here raised point), possibly resuming
from available intermediate results
  - Recovery from partially executed programs, where operators execute
whole or not (batch style)
  - Recover from intermediate result since latest completed checkpoint
  - Eventually even recover superstep-based iterations.

So the design needs to be extended slightly. We do not have a design
writeup for this, but I agree, it would be great to have one.
I have a pretty good general idea about this, let me see if I can get to
that next week.

In general, for such things (long standing ideas and designs), we should
have something like Kafka has with its KIPs (Kafka Improvement Proposal) -
a place where to collect them, refine them over time, and
see how people react to them or step up to implement them. We could call
them 3Fs (Flink Feature Forms) ;-)

Greetings,
Stephan


On Tue, May 31, 2016 at 1:02 AM, Greg Hogan <c...@greghogan.com> wrote:

> Hi Stephan,
>
> Is there a design document, prior discussion, or background material on
> this enhancement? Am I correct in understanding that this only applies to
> DataSet since streams run indefinitely?
>
> Thanks,
> Greg
>
> On Mon, May 30, 2016 at 5:49 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > Hi Eron!
> >
> > Yes, the idea is to actually switch all executions to a backtracking
> > scheduling mode. That simultaneously solves both fine grained recovery
> and
> > lazy execution, where later stages build on prior stages.
> >
> > With all the work around streaming, we have not gotten to this so far,
> but
> > it is one feature still in the list...
> >
> > Greetings,
> > Stephan
> >
> >
> > On Mon, May 30, 2016 at 9:55 PM, Eron Wright <ewri...@live.com> wrote:
> >
> > > Thinking out loud now…
> > >
> > > Is the job graph fully mutable?   Can it be cleared?   For example,
> > > shouldn’t the count method remove the sink after execution completes?
> > >
> > > Can numerous job graphs co-exist within a single driver program?How
> > > would that relate to the session concept?
> > >
> > > Seems the count method should use ‘backtracking’ schedule mode, and
> only
> > > execute the minimum needed to materialize the count sink.
> > >
> > > > On May 29, 2016, at 3:08 PM, Márton Balassi <
> balassi.mar...@gmail.com>
> > > wrote:
> > > >
> > > > Hey Eron,
> > > >
> > > > Yes, DataSet#collect and count methods implicitly trigger a JobGraph
> > > > execution, thus they also trigger writing to any previously defined
> > > sinks.
> > > > The idea behind this behavior is to enable interactive querying (the
> > one
> > > > that you are used to get from a shell environment) and it is also a
> > great
> > > > debugging tool.
> > > >
> > > > Best,
> > > >
> > > > Marton
> > > >
> > > > On Sun, May 29, 2016 at 11:28 PM, Eron Wright <ewri...@live.com>
> > wrote:
> > > >
> > > >> I was curious as to how the `count` method on DataSet worked, and
> was
> > > >> surprised to see that it executes the entire program graph.
>  Wouldn’t
> > > this
> > > >> cause undesirable side-effects like writing to sinks?Also
> strange
> > > that
> > > >> the graph is mutated with the addition of a sink (that isn’t
> > > subsequently
> > > >> removed).
> > > >>
> > > >> Surveying the Flink code, there aren’t many situations where the
> > program
> > > >> graph is implicitly executed (`collect` is another).   Nonetheless,
> > this
> > > >> has deepened my appreciation for how dynamic the application might
> be.
> > > >>
> > > >> // DataSet.java
> > > >> public long count() throws Exception {
> > > >>   final String id = new AbstractID().toString();
> > > >>
> > > >>   output(new Utils.CountHelper(id)).name("count()");
> > > >>
> > > >>   JobExecutionResult res = getExecutionEnvironment().execute();
> > > >>   return res. getAccumulatorResult(id);
> > > >> }
> > > >> Eron
> > >
> > >
> >
>


Re: [ANNOUNCE] Build Issues Solved

2016-05-31 Thread Stephan Ewen
Hi Chiwan!

I think the Execution environment is not shared, because what the
TestEnvironment sets is a Context Environment Factory. Every time you call
"ExecutionEnvironment.getExecutionEnvironment()", you get a new environment.

Stephan


On Tue, May 31, 2016 at 11:53 AM, Chiwan Park <chiwanp...@apache.org> wrote:

> I’ve created a JIRA issue [1] related to KNN test cases. I will send a PR
> for it.
>
> From my investigation [2], cluster for ML tests have only one taskmanager
> with 4 slots. Is 2048 insufficient for total number of network numbers? I
> still think the problem is sharing ExecutionEnvironment between test cases.
>
> [1]: https://issues.apache.org/jira/browse/FLINK-3994
> [2]:
> https://github.com/apache/flink/blob/master/flink-test-utils/src/test/scala/org/apache/flink/test/util/FlinkTestBase.scala#L56
>
> Regards,
> Chiwan Park
>
> > On May 31, 2016, at 6:05 PM, Maximilian Michels <m...@apache.org> wrote:
> >
> > Thanks Stephan for the synopsis of our last weeks test instability
> > madness. It's sad to see the shortcomings of Maven test plugins but
> > another lesson learned is that our testing infrastructure should get a
> > bit more attention. We have reached a point several times where our
> > tests where inherently instable. Now we saw that even more problems
> > were hidden in the dark. I would like to see more maintenance
> > dedicated to testing.
> >
> > @Chiwan: Please, no hotfix! Please open a JIRA issue and a pull
> > request with a systematic fix. Those things are too crucial to be
> > fixed on the go. The problems is that Travis reports the number of
> > processors to be "32" (which is used for the number of task slots in
> > local execution). The network buffers are not adjusted accordingly. We
> > should set them correctly in the MiniCluster. Also, we could define an
> > upper limit to the number of task slots for tests.
> >
> > On Tue, May 31, 2016 at 10:59 AM, Chiwan Park <chiwanp...@apache.org>
> wrote:
> >> I think that the tests fail because of sharing ExecutionEnvironment
> between test cases. I’m not sure why it is problem, but it is only
> difference between other ML tests.
> >>
> >> I created a hotfix and pushed it to my repository. When it seems fixed
> [1], I’ll merge the hotfix to master branch.
> >>
> >> [1]: https://travis-ci.org/chiwanpark/flink/builds/134104491
> >>
> >> Regards,
> >> Chiwan Park
> >>
> >>> On May 31, 2016, at 5:43 PM, Chiwan Park <chiwanp...@apache.org>
> wrote:
> >>>
> >>> Maybe it seems about KNN test case which is merged into yesterday.
> I’ll look into ML test.
> >>>
> >>> Regards,
> >>> Chiwan Park
> >>>
> >>>> On May 31, 2016, at 5:38 PM, Ufuk Celebi <u...@apache.org> wrote:
> >>>>
> >>>> Currently, an ML test is reliably failing and occasionally some HA
> >>>> tests. Is someone looking into the ML test?
> >>>>
> >>>> For HA, I will revert a commit, which might cause the HA
> >>>> instabilities. Till is working on a proper fix as far as I know.
> >>>>
> >>>> On Tue, May 31, 2016 at 3:50 AM, Chiwan Park <chiwanp...@apache.org>
> wrote:
> >>>>> Thanks for the great work! :-)
> >>>>>
> >>>>> Regards,
> >>>>> Chiwan Park
> >>>>>
> >>>>>> On May 31, 2016, at 7:47 AM, Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
> >>>>>>
> >>>>>> Awesome work guys!
> >>>>>> And even more thanks for the detailed report...This troubleshooting
> summary
> >>>>>> will be undoubtedly useful for all our maven projects!
> >>>>>>
> >>>>>> Best,
> >>>>>> Flavio
> >>>>>> On 30 May 2016 23:47, "Ufuk Celebi" <u...@apache.org> wrote:
> >>>>>>
> >>>>>>> Thanks for the effort, Max and Stephan! Happy to see the green
> light again.
> >>>>>>>
> >>>>>>> On Mon, May 30, 2016 at 11:03 PM, Stephan Ewen <se...@apache.org>
> wrote:
> >>>>>>>> Hi all!
> >>>>>>>>
> >>>>>>>> After a few weeks of terrible build issues, I am happy to
> announce that
> >>>>>>> the
> >>>>>>>> build works again properly, and we actuall

Re: buffering in operators, implementing statistics

2016-05-26 Thread Stephan Ewen
Hi Stavros!

I think what Aljoscha wants to say is that the community is a bit hard
pressed reviewing new and complex things right now.
There are a lot of threads going on already.

If you want to work on this, why not make your own GitHub project
"Approximate algorithms on Apache Flink" or so?

Greetings,
Stephan



On Wed, May 25, 2016 at 3:02 PM, Aljoscha Krettek 
wrote:

> Hi,
> that link was interesting, thanks! As I said though, it's probably not a
> good fit for Flink right now.
>
> The things that I feel are important right now are:
>
>  - dynamic scaling: the ability of a streaming pipeline to adapt to changes
> in the amount of incoming data. This is tricky with stateful operations and
> long-running pipelines. For Spark this is easier to do because every
> mini-batch is individually scheduled and they can therefore be scheduled on
> differing numbers of machines.
>
>  - an API for joining static (or slowly evolving) data with streaming data:
> this has been coming up in different forms on the mailing lists and when
> talking with people. Apache Beam solves this with "side inputs". In Flink
> we want to add something as well, maybe along the lines of side inputs or
> maybe something more specific for the case of pure joins.
>
>  - working on managed memory: In Flink we were always very conscious about
> how memory was used, we were using our own abstractions for dealing with
> memory and efficient serialization. We call this the "managed memory"
> abstraction. Spark recently also started going in this direction with
> Project Tungsten. For the streaming API there are still some places where
> we could make things more efficient by working on the managed memory more,
> for example, there is no state backend that uses managed memory. We are
> either completely on the Java Heap or use RocksDB there.
>
>  - stream SQL: this is obvious and everybody wants it.
>
>  - A generic cross-runner API: This is what Apache Beam (née Google
> Dataflow) does. It is very interesting to write a program once and then be
> able to run it on different runners. This brings more flexibility for
> users. It's not clear how this will play out in the long run but it's very
> interesting to keep an eye on.
>
> For most of these the Flink community is in various stages of implementing
> it, so that's good. :-)
>
> Cheers,
> Aljoscha
>
> On Mon, 23 May 2016 at 17:48 Stavros Kontopoulos  >
> wrote:
>
> > Hey Aljoscha,
> >
> > Thnax for the useful comments. I have recently looked at spark sketches:
> >
> >
> http://www.slideshare.net/databricks/sketching-big-data-with-spark-randomized-algorithms-for-largescale-data-analytics
> > So there must be value in this effort.
> > In my experience counting in general is a common need for large data
> sets.
> > For example people often in a non streaming setting use redis for
> > its hyperlolog algo.
> >
> > What are other areas you find more important or of higher priority for
> the
> > time being?
> >
> > Best,
> > Stavros
> >
> > On Mon, May 23, 2016 at 6:18 PM, Aljoscha Krettek 
> > wrote:
> >
> > > Hi,
> > > no such changes are planned right now. The separaten between the keys
> is
> > > very strict in order to make the windowing state re-partitionable so
> that
> > > we can implement dynamic rescaling of the parallelism of a program.
> > >
> > > The WindowAll is only used for specific cases where you need a Trigger
> > that
> > > sees all elements of the stream. I personally don't think it is very
> > useful
> > > because it is not scaleable. In theory, for time windows this can be
> > > parallelized but it is not currently done in Flink.
> > >
> > > Do you have a specific use case for the count-min sketch in mind. If
> not,
> > > maybe our energy is better spent on producing examples with real-world
> > > applicability. I'm not against having an example for a count-min
> sketch,
> > > I'm just worried that you might put your energy into something that is
> > not
> > > useful to a lot of people.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Fri, 20 May 2016 at 20:13 Stavros Kontopoulos <
> > st.kontopou...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi thnx for the feedback.
> > > >
> > > > So there is a limitation due to parallel windows implementation.
> > > > No intentions to change that somehow to accommodate similar
> > estimations?
> > > >
> > > > WindowAll in practice is used as step in the pipeline? I mean since
> its
> > > > inherently not parallel cannot scale correct?
> > > > Although there is an exception: "Only for special cases, such as
> > aligned
> > > > time windows is it possible to perform this operation in parallel"
> > > > Probably missing something...
> > > >
> > > > I could try do the example stuff (and open a new feature on jira for
> > > that).
> > > > I will also vote for closing the old issue too since there is no
> other
> > > way
> > > > at least for the time being...
> > > >
> > 

Re: Junit Issue while testing Kafka Source

2016-05-26 Thread Stephan Ewen
The SuccessException does not really have a dependency.

It is just a special Exception class that you throw in your code when you
want to stop.
The code that calls "env.execute()" catches the exception and checks
whether the failure cause is that special exceptions.
Flink propagates the exceptions from the workers back to the client.

Greetings,
Step

On Thu, May 26, 2016 at 12:37 PM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> Hi Stephan,
>
> Yes using DeserializationSchema solution will definitely work.
> I am not able to get the dependency for SuccessException.
> Any help on this
>
> Regards,
> Vinay Patil
>
> *+91-800-728-4749*
>
> On Thu, May 26, 2016 at 3:32 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > Hi!
> >
> > On Flink 1.0, there is the "flink-test-utils_2.10" dependency that has a
> > some useful things.
> >
> > The "SuccessException" seems a quite common thing - I have seen that in
> > other infinite program tests as well (Google Dataflow / Beam)
> >
> > Another way you can architect tests is to have an element in the stream
> > that signals end-of-stream. The DeserializationSchema can check for that
> > and return "end of stream".
> >
> > Greetings,
> > Stephan
> >
> >
> >
> > On Thu, May 26, 2016 at 11:55 AM, Vinay Patil <vinay18.pa...@gmail.com>
> > wrote:
> >
> > > Hi Aljoscha,
> > >
> > > Thank you for answering.
> > > Throwing SuccessException is a good idea , however when I am adding
> > > following dependency, no classes are getting added to the jar:
> > >
> > >
> > > org.apache.flink
> > > flink-tests_2.10
> > > 1.0.3
> > > 
> > >
> > > Is there any other dependency that I have to add ? I have also added
> > > test-utils dependency.
> > >
> > > I am trying the following in my test case :
> > > 1) Consuming data from Kafka using FlinkKafkaConsumer and passing it to
> > map
> > > as Tuple2
> > > 2) In the map function I am just checking if Tuple2 contains data, if
> > yes,
> > > throw the exception("success")
> > > 3) This way I am verifying that the configuration is correct and that
> we
> > > are able to read from kafka.
> > >
> > > Am I doing it right, is there any better approach ?
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > *+91-800-728-4749*
> > >
> > > On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek <aljos...@apache.org
> >
> > > wrote:
> > >
> > > > Hi,
> > > > what we are doing in most internal tests is to verify in a sink
> whether
> > > the
> > > > data is correct and then throw a SuccessException. This brings down
> the
> > > job
> > > > and we check whether we catch a SuccessException to verify that the
> > test
> > > > was successful. Look, for example, at the ValidatingSink in
> > > > EventTimeWindowCheckpointingITCase in the Flink source.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Thu, 26 May 2016 at 01:58 Nick Dimiduk <ndimi...@gmail.com>
> wrote:
> > > >
> > > > > I'm also curious for a solution here. My test code executes the
> flow
> > > > from a
> > > > > separate thread. Once i've joined on all my producer threads and
> I've
> > > > > verified the output, I simply interrupt the flow thread. This spews
> > > > > exceptions, but it all appears to be harmless.
> > > > >
> > > > > Maybe there's a better way? I think you'd need some "death pill" to
> > > send
> > > > > into the stream that signals its termination.
> > > > >
> > > > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <
> > vinay18.pa...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am able to read from a topic using FlinkKafkaConsumer and
> return
> > > the
> > > > > > result, however  when I am testing this scenario in Junit the
> > result
> > > is
> > > > > > getting printed(kafkaStream.print()) but  I am not able to exit
> the
> > > > Job,
> > > > > > env.execute keeps running,
> > > > > > I tried to return env.execute from method but that did not work
> > > either.
> > > > > >
> > > > > > 1) Is there any way to end the execution of job forcefully.
> > > > > > 2) How do I test if the data has come from topic
> > > > > >
> > > > > >- One way I think of is to get the output of stream.print()
> in a
> > > > > >PrintStream and check the result.(but not able to test this
> > since
> > > > job
> > > > > is
> > > > > >not getting exited)
> > > > > >
> > > > > > Please help with these issues
> > > > > >
> > > > > > Regards,
> > > > > > Vinay Patil
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: Junit Issue while testing Kafka Source

2016-05-26 Thread Stephan Ewen
Hi!

On Flink 1.0, there is the "flink-test-utils_2.10" dependency that has a
some useful things.

The "SuccessException" seems a quite common thing - I have seen that in
other infinite program tests as well (Google Dataflow / Beam)

Another way you can architect tests is to have an element in the stream
that signals end-of-stream. The DeserializationSchema can check for that
and return "end of stream".

Greetings,
Stephan



On Thu, May 26, 2016 at 11:55 AM, Vinay Patil 
wrote:

> Hi Aljoscha,
>
> Thank you for answering.
> Throwing SuccessException is a good idea , however when I am adding
> following dependency, no classes are getting added to the jar:
>
>
> org.apache.flink
> flink-tests_2.10
> 1.0.3
> 
>
> Is there any other dependency that I have to add ? I have also added
> test-utils dependency.
>
> I am trying the following in my test case :
> 1) Consuming data from Kafka using FlinkKafkaConsumer and passing it to map
> as Tuple2
> 2) In the map function I am just checking if Tuple2 contains data, if yes,
> throw the exception("success")
> 3) This way I am verifying that the configuration is correct and that we
> are able to read from kafka.
>
> Am I doing it right, is there any better approach ?
>
> Regards,
> Vinay Patil
>
> *+91-800-728-4749*
>
> On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek 
> wrote:
>
> > Hi,
> > what we are doing in most internal tests is to verify in a sink whether
> the
> > data is correct and then throw a SuccessException. This brings down the
> job
> > and we check whether we catch a SuccessException to verify that the test
> > was successful. Look, for example, at the ValidatingSink in
> > EventTimeWindowCheckpointingITCase in the Flink source.
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 26 May 2016 at 01:58 Nick Dimiduk  wrote:
> >
> > > I'm also curious for a solution here. My test code executes the flow
> > from a
> > > separate thread. Once i've joined on all my producer threads and I've
> > > verified the output, I simply interrupt the flow thread. This spews
> > > exceptions, but it all appears to be harmless.
> > >
> > > Maybe there's a better way? I think you'd need some "death pill" to
> send
> > > into the stream that signals its termination.
> > >
> > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am able to read from a topic using FlinkKafkaConsumer and return
> the
> > > > result, however  when I am testing this scenario in Junit the result
> is
> > > > getting printed(kafkaStream.print()) but  I am not able to exit the
> > Job,
> > > > env.execute keeps running,
> > > > I tried to return env.execute from method but that did not work
> either.
> > > >
> > > > 1) Is there any way to end the execution of job forcefully.
> > > > 2) How do I test if the data has come from topic
> > > >
> > > >- One way I think of is to get the output of stream.print() in a
> > > >PrintStream and check the result.(but not able to test this since
> > job
> > > is
> > > >not getting exited)
> > > >
> > > > Please help with these issues
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > >
> >
>


[ANNOUNCE] Build Issues Solved

2016-05-30 Thread Stephan Ewen
Hi all!

After a few weeks of terrible build issues, I am happy to announce that the
build works again properly, and we actually get meaningful CI results.

Here is a story in many acts, from builds deep red to bright green joy.
Kudos to Max, who did most of this troubleshooting. This evening, Max and
me debugged the final issue and got the build back on track.

--
The Journey
--

(1) Failsafe Plugin

The Maven Failsafe Build Plugin had a critical bug due to which failed
tests did not result in a failed build.

That is a pretty bad bug for a plugin whose only task is to run tests and
fail the build if a test fails.

After we recognized that, we upgraded the Failsafe Plugin.


(2) Failsafe Plugin Dependency Issues

After the upgrade, the Failsafe Plugin behaved differently and did not
interoperate with Dependency Shading any more.

Because of that, we switched to the Surefire Plugin.


(3) Fixing all the issues introduced in the meantime

Naturally, a number of test instabilities had been introduced, which needed
to be fixed.


(4) Yarn Tests and Test Scope Refactoring

In the meantime, a Pull Request was merged that moved the Yarn Tests to the
test scope.
Because the configuration searched for tests in the "main" scope, no Yarn
tests were executed for a while, until the scope was fixed.


(5) Yarn Tests and JMX Metrics

After the Yarn Tests were re-activated, we saw them fail due to warnings
created by the newly introduced metrics code. We could fix that by updating
the metrics code and temporarily not registering JMX beans for all metrics.


(6) Yarn / Surefire Deadlock

Finally, some Yarn tests failed reliably in Maven (though not in the IDE).
It turned out that those test a command line interface that interacts with
the standard input stream.

The newly deployed Surefire Plugin uses standard input as well, for
communication with forked JVMs. Since Surefire internally locks the
standard input stream, the Yarn CLI cannot poll the standard input stream
without locking up and stalling the tests.

We adjusted the tests and now the build happily builds again.

-
Conclusions:
-

  - CI is terribly crucial It took us weeks with the fallout of having a
period of unreliably CI.

  - Maven could do a better job. A bug as crucial as the one that started
our problem should not occur in a test plugin like surefire. Also, the
constant change of semantics and dependency scopes is annoying. The
semantic changes are subtle, but for a build as complex as Flink, they make
a difference.

  - File-based communication is rarely a good idea. The bug in the failsafe
plugin was caused by improper file-based communication, and some of our
discovered instabilities as well.

Greetings,
Stephan


PS: Some issues and mysteries remain for us to solve: When we allow our
metrics subsystem to register JMX beans, we see some tests failing due to
spontaneous JVM process kills. Whoever has a pointer there, please ping us!


Re: [ANNOUNCE] Build Issues Solved

2016-05-31 Thread Stephan Ewen
You are right, Chiwan.

I think that this pattern you use should be supported, though. Would be
good to check if the job executes at the point of the "collect()" calls
more than is necessary.
That would explain the network buffer issue then...

On Tue, May 31, 2016 at 12:18 PM, Chiwan Park <chiwanp...@apache.org> wrote:

> Hi Stephan,
>
> Yes, right. But KNNITSuite calls
> ExecutionEnvironment.getExecutionEnvironment only once [1]. I’m testing
> with moving method call of getExecutionEnvironment to each test case.
>
> [1]:
> https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/nn/KNNITSuite.scala#L45
>
> Regards,
> Chiwan Park
>
> > On May 31, 2016, at 7:09 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > Hi Chiwan!
> >
> > I think the Execution environment is not shared, because what the
> > TestEnvironment sets is a Context Environment Factory. Every time you
> call
> > "ExecutionEnvironment.getExecutionEnvironment()", you get a new
> environment.
> >
> > Stephan
> >
> >
> > On Tue, May 31, 2016 at 11:53 AM, Chiwan Park <chiwanp...@apache.org>
> wrote:
> >
> >> I’ve created a JIRA issue [1] related to KNN test cases. I will send a
> PR
> >> for it.
> >>
> >> From my investigation [2], cluster for ML tests have only one
> taskmanager
> >> with 4 slots. Is 2048 insufficient for total number of network numbers?
> I
> >> still think the problem is sharing ExecutionEnvironment between test
> cases.
> >>
> >> [1]: https://issues.apache.org/jira/browse/FLINK-3994
> >> [2]:
> >>
> https://github.com/apache/flink/blob/master/flink-test-utils/src/test/scala/org/apache/flink/test/util/FlinkTestBase.scala#L56
> >>
> >> Regards,
> >> Chiwan Park
> >>
> >>> On May 31, 2016, at 6:05 PM, Maximilian Michels <m...@apache.org>
> wrote:
> >>>
> >>> Thanks Stephan for the synopsis of our last weeks test instability
> >>> madness. It's sad to see the shortcomings of Maven test plugins but
> >>> another lesson learned is that our testing infrastructure should get a
> >>> bit more attention. We have reached a point several times where our
> >>> tests where inherently instable. Now we saw that even more problems
> >>> were hidden in the dark. I would like to see more maintenance
> >>> dedicated to testing.
> >>>
> >>> @Chiwan: Please, no hotfix! Please open a JIRA issue and a pull
> >>> request with a systematic fix. Those things are too crucial to be
> >>> fixed on the go. The problems is that Travis reports the number of
> >>> processors to be "32" (which is used for the number of task slots in
> >>> local execution). The network buffers are not adjusted accordingly. We
> >>> should set them correctly in the MiniCluster. Also, we could define an
> >>> upper limit to the number of task slots for tests.
> >>>
> >>> On Tue, May 31, 2016 at 10:59 AM, Chiwan Park <chiwanp...@apache.org>
> >> wrote:
> >>>> I think that the tests fail because of sharing ExecutionEnvironment
> >> between test cases. I’m not sure why it is problem, but it is only
> >> difference between other ML tests.
> >>>>
> >>>> I created a hotfix and pushed it to my repository. When it seems fixed
> >> [1], I’ll merge the hotfix to master branch.
> >>>>
> >>>> [1]: https://travis-ci.org/chiwanpark/flink/builds/134104491
> >>>>
> >>>> Regards,
> >>>> Chiwan Park
> >>>>
> >>>>> On May 31, 2016, at 5:43 PM, Chiwan Park <chiwanp...@apache.org>
> >> wrote:
> >>>>>
> >>>>> Maybe it seems about KNN test case which is merged into yesterday.
> >> I’ll look into ML test.
> >>>>>
> >>>>> Regards,
> >>>>> Chiwan Park
> >>>>>
> >>>>>> On May 31, 2016, at 5:38 PM, Ufuk Celebi <u...@apache.org> wrote:
> >>>>>>
> >>>>>> Currently, an ML test is reliably failing and occasionally some HA
> >>>>>> tests. Is someone looking into the ML test?
> >>>>>>
> >>>>>> For HA, I will revert a commit, which might cause the HA
> >>>>>> instabilities. Till is working on a proper fix as far as I know.
> >>>>>

Re: Collision of task number values for the same task

2016-05-31 Thread Stephan Ewen
It could be that

(a) The task failed and was restarted.

(b) The program has multiple steps (collect() print()), so that parts of
the graph get re-executed.

(c) You have two operators with the same name that become tasks with the
same name.

Do any of those explanations make sense in your setting?

Stephan


On Tue, May 31, 2016 at 12:48 PM, Alexander Alexandrov <
alexander.s.alexand...@gmail.com> wrote:

> Sure, you can find them attached here (both jobmanager and taskmanager,
> the problem was observed in the jobmanager logs).
>
> If needed I can also share the binary to reproduce the issue.
>
> I think the problem is related to the fact that the input splits are
> lazily assigned to the task slots, and it seems that in case of 8 splits
> for 4 slots, we get each (x/y) combination twice.
>
> Moreover, I am currently analyzing the structure of the log files, and it
> seems that the task ID is not reported consistently across the different
> messages [1,2,3]. This makes the implementation of an ETL job that extracts
> the statistics from the log and feed them into a database quite hard.
>
> Would it be possible to push a fix which adds the task ID consistently
> across all messages in the 1.0.x line? If yes, I will open a JIRA and work
> on that this week.
> I would like to get feedback from other people who are parsing jobmanager
> / taskamanager logs on that in order to avoid possible backwards
> compatibility with job analysis tools on the release line.
>
> [1]
> https://github.com/apache/flink/blob/da23ee38e5b36ddf26a6a5a807efbbbcbfe1d517/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L370-L371
> [2]
> https://github.com/apache/flink/blob/da23ee38e5b36ddf26a6a5a807efbbbcbfe1d517/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L991-L992
>
> Regards,
> A.
>
>
> 2016-05-31 12:01 GMT+02:00 Ufuk Celebi :
>
>> On Tue, May 31, 2016 at 11:53 AM, Alexander Alexandrov
>>  wrote:
>> > Can somebody shed a light on the execution semantics of the scheduler
>> which
>> > will explain this behavior?
>>
>> The execution IDs are unique per execution attempt. Having two tasks
>> with the same subtask index running at the same time is unexpected.
>>
>> Can you share the complete logs, please?
>>
>> – Ufuk
>>
>
>


Re: PojoComparator question

2016-05-31 Thread Stephan Ewen
The "compareSerialized" should probably internally always reuse instances,
where possible.
Since these are never passed into user code or anything, that should be
okay to do.

On Tue, May 31, 2016 at 11:52 AM, Aljoscha Krettek 
wrote:

> Hi,
> I think this is an artifact from the past. Using the "non-reuse"
> deserialize seems more correct, especially in the presence of subclasses.
>
> Best,
> Aljoscha
>
> On Mon, 30 May 2016 at 19:13 Gábor Horváth  wrote:
>
> > Hi!
> >
> > While I was working on code generation support for PojoComparators, I
> > stumbled upon the compareSerialized method [1]. It first creates two new
> > instances and then it is using the reusing overloads of the serializer.
> > Calling the non-reusing overload would create the instance anyways. Is
> > there a reason why the reusing overload is used here?
> >
> > Regards,
> > Gábor
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java#L277
> >
>


Re: Pulling Streaming out of staging and project restructure

2016-01-11 Thread Stephan Ewen
Hi Flavio!

You can always do "mvn -DskipTests clean package". That compiles tests, but
does not execute them.

Stephan


On Mon, Jan 11, 2016 at 11:34 AM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> I hope that it's not too late to suggest to add to restructuring also
> https://issues.apache.org/jira/browse/FLINK-1827 so that to be able to
> compile Flink without compiling also tests (-Dmaven.test.skip=true) and
> save a lot of time...
> I should be fairly easy to fix that.
>
> Best,
> Flavio
>
> On Wed, Jan 6, 2016 at 4:22 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Stephan already opened a PR for restructuring the examples:
> >
> > --> https://github.com/apache/flink/pull/1482
> >
> > Otherwise +1!
> >
> > 2016-01-06 16:21 GMT+01:00 Robert Metzger <rmetz...@apache.org>:
> >
> > > The renaming hasn't been done completely. I would like to fix this
> before
> > > the 1.0.0 release.
> > >
> > > I think the following issues are still open:
> > > - Merge flink-java-examples, flink-scala-examples and
> > > flink-streaming-examples into "flink-examples" + build the streaming
> > > examples for the binary build as well.
> > > - move flink-table to flink-libraries
> > > - move some modules from flink-staging to flink-batch-connectors
> > >
> > >
> > > If nobody disagrees, I'll open a PR for these changes.
> > > I created a JIRA for this discussion:
> > > https://issues.apache.org/jira/browse/FLINK-3205
> > >
> > >
> > > On Fri, Oct 2, 2015 at 7:58 PM, fhueske <fhue...@gmail.com> wrote:
> > >
> > > > +1
> > > >
> > > >
> > > > From: Henry Saputra
> > > > Sent: Friday, October 2, 2015 19:34
> > > > To: dev@flink.apache.org
> > > > Subject: Re: Pulling Streaming out of staging and project restructure
> > > >
> > > >
> > > > +1
> > > >
> > > > On Friday, October 2, 2015, Matthias J. Sax <mj...@apache.org>
> wrote:
> > > >
> > > > > It think, rename "flink-storm-compatibility-core" to just
> > "flink-storm"
> > > > > would be the cleanest solution.
> > > > >
> > > > > So in flink-contrib there would be two modules:
> > > > >   - flink-storm
> > > > >   - flink-storm-examples
> > > > >
> > > > > Please let me know if you have any objection about it.
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 10/02/2015 10:45 AM, Matthias J. Sax wrote:
> > > > > > Sure. Will do that.
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 10/02/2015 10:35 AM, Stephan Ewen wrote:
> > > > > >> @Matthias: How about getting rid of the
> storm-compatibility-parent
> > > and
> > > > > >> making the core and examples projects directly projects in
> > "contrib"
> > > > > >>
> > > > > >> On Fri, Oct 2, 2015 at 10:34 AM, Till Rohrmann <
> > > trohrm...@apache.org
> > > > > <javascript:;>> wrote:
> > > > > >>
> > > > > >>> +1 for the new project structure. Getting rid of our code dump
> > is a
> > > > > good
> > > > > >>> thing.
> > > > > >>>
> > > > > >>> On Fri, Oct 2, 2015 at 10:25 AM, Maximilian Michels <
> > > m...@apache.org
> > > > > <javascript:;>>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>>> +1 Matthias, let's limit the overhead this has for the module
> > > > > >>> maintainers.
> > > > > >>>>
> > > > > >>>> On Fri, Oct 2, 2015 at 12:17 AM, Matthias J. Sax <
> > > mj...@apache.org
> > > > > <javascript:;>>
> > > > > >>> wrote:
> > > > > >>>>> I will commit something to flink-storm-compatibility tomorrow
> > > that
> > > > > >>>>> contains some internal package restructuring. I think,
> renaming
> > > the
> > > > > >>>>> three modules in this commit would be a smart move as both
> > > changes
> > >

Dripping the Flink-on-Tez code for Flink 1.0

2016-01-08 Thread Stephan Ewen
Hi all!

Currently, Flink has a module to run batch program code on Tez rather than
Flink's own distributed execution engine.

I would suggest that we drop this code for the next release (1.0) as part
of a code consolidation:

  - There seems little in both the Flink and the Tez community to use and
expand this functionality.

  - The original motivation (better exploit resource elasticity in YARN)
will no longer be valid in the near future. I am re-working the YARN
integration currently to make it more elastic and make it possible to run
Flink on Mesos.

  - The Flink-on-Tez code is rather POC status. Large scale testing it and
making adding all missing features will take more effort than making
Flink's own YARN integration resource elastic.


Please let me know what you think!
Especially @Kostas, since you wrote the initial POC, I'd be interested in
your opinion.

Greetings,
Stephan


Re: Dripping the Flink-on-Tez code for Flink 1.0

2016-01-10 Thread Stephan Ewen
I was typing with fat fingers again. Meant "dropping code" of course, not
"dripping" ;-)

On Sun, Jan 10, 2016 at 11:55 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> +1
>
> 2016-01-08 17:35 GMT+01:00 Till Rohrmann <trohrm...@apache.org>:
>
> > +1 since it increase maintainability of the code base if it is not really
> > used and thus removed.
> >
> > On Fri, Jan 8, 2016 at 5:33 PM, Ufuk Celebi <u...@apache.org> wrote:
> >
> > > +1
> > >
> > > I wanted to make a similar proposal.
> > >
> > > – Ufuk
> > >
> > > > On 08 Jan 2016, at 17:03, Kostas Tzoumas <ktzou...@apache.org>
> wrote:
> > > >
> > > > for clarification, I was talking about dropping the code, I am unsure
> > > about
> > > > the consequences of dripping code :-)
> > > >
> > > > On Fri, Jan 8, 2016 at 4:57 PM, Kostas Tzoumas <ktzou...@apache.org>
> > > wrote:
> > > >
> > > >> +1 from my side
> > > >>
> > > >> Flink on Tez never got a lot of user traction. It served well as a
> > > >> prototype of "this is possible", but since the core functionality
> will
> > > be
> > > >> subsumed by making Flink on YARN resource elastic, I don't see any
> > > reason
> > > >> we should have it as part of the Flink codebase.
> > > >>
> > > >> Best,
> > > >> Kostas
> > > >>
> > > >> On Fri, Jan 8, 2016 at 4:43 PM, Stephan Ewen <se...@apache.org>
> > wrote:
> > > >>
> > > >>> Hi all!
> > > >>>
> > > >>> Currently, Flink has a module to run batch program code on Tez
> rather
> > > than
> > > >>> Flink's own distributed execution engine.
> > > >>>
> > > >>> I would suggest that we drop this code for the next release (1.0)
> as
> > > part
> > > >>> of a code consolidation:
> > > >>>
> > > >>>  - There seems little in both the Flink and the Tez community to
> use
> > > and
> > > >>> expand this functionality.
> > > >>>
> > > >>>  - The original motivation (better exploit resource elasticity in
> > YARN)
> > > >>> will no longer be valid in the near future. I am re-working the
> YARN
> > > >>> integration currently to make it more elastic and make it possible
> to
> > > run
> > > >>> Flink on Mesos.
> > > >>>
> > > >>>  - The Flink-on-Tez code is rather POC status. Large scale testing
> it
> > > and
> > > >>> making adding all missing features will take more effort than
> making
> > > >>> Flink's own YARN integration resource elastic.
> > > >>>
> > > >>>
> > > >>> Please let me know what you think!
> > > >>> Especially @Kostas, since you wrote the initial POC, I'd be
> > interested
> > > in
> > > >>> your opinion.
> > > >>>
> > > >>> Greetings,
> > > >>> Stephan
> > > >>>
> > > >>
> > > >>
> > >
> > >
> >
>


Re: Release tag for 0.10.1

2016-01-10 Thread Stephan Ewen
All build are built equal against Hadoop, but a specific Hadoop version is
still part of the Flink lib folder when downloaded.

Cross Hadoop-version compatibility is not good, i.e., when Flink has Hadoop
2.4 in the classpath, it does not work with a 2.6 YARN installation. That
is why we pre-build multiple versions.

Best,
Stephan


On Sat, Jan 9, 2016 at 3:19 AM, Nick Dimiduk <ndimi...@gmail.com> wrote:

> Yes, a tag would be very good practice, IMHO. Those of us who need to run
> release + patches appreciate the release hygiene :)
>
> If all builds are created equal re: Hadoop versions, I recommend against
> publishing Hadoop-specific tarballs on the downloads page; it left me quite
> confused, as I'm sure it would other users.
>
> On Friday, January 8, 2016, Stephan Ewen <se...@apache.org> wrote:
>
> > Hi Nick!
> >
> > We have not pushed a release tag, but have a frozen release-0.10.1-RC1
> > branch (https://github.com/apache/flink/tree/release-0.10.1-rc1)
> > A tag would be great, agree!
> >
> > Flink does in its core not depend on Hadoop. The parts that reference
> > Hadoop (HDFS filesystem, YARN, MapReduce function/format compatibility)
> are
> > not using any Hadoop version specific code (with the exception of some
> YARN
> > functions, which are reflectively invoked). So it should work across
> > versions nicely. The main friction we saw were version clashes of
> > transitive dependencies.
> >
> > The Flink CI builds include building Flink with Hadoop 2.5.0, see here:
> > https://github.com/apache/flink/blob/master/.travis.yml
> >
> > Greetings,
> > Stephan
> >
> >
> >
> > On Fri, Jan 8, 2016 at 6:54 PM, Nick Dimiduk <ndimi...@apache.org
> > <javascript:;>> wrote:
> >
> > > An only-slightly related question: Is Flink using Hadoop version
> specific
> > > features in some way? IIRC, the basic APIs should be compatible back as
> > far
> > > as 2.2. I'm surprised to see builds of flink explicitly against many
> > hadoop
> > > versions, but 2.5.x is excluded.
> > >
> > > -n
> > >
> > > On Fri, Jan 8, 2016 at 9:45 AM, Nick Dimiduk <ndimi...@apache.org
> > <javascript:;>> wrote:
> > >
> > > > Hi Devs,
> > > >
> > > > It seems no release tag was pushed to 0.10.1. I presume this was an
> > > > oversight. Is there some place I can look to see from which sha the
> > > 0.10.1
> > > > release was built? Are the RC vote threads the only cannon in this
> > > matter?
> > > >
> > > > Thanks,
> > > > Nick
> > > >
> > >
> >
>


Re: FileNotFoundException thrown by BlobCache when running "mvn test" against flink-runtime 0.10 for Scala 2.11

2016-01-15 Thread Stephan Ewen
Hi!

I cannot access the gist.

There are some tests that check proper error reporting in that case.
Let's make sure there is nothing funky in the sense that this occurs also
outside where this is tested for and goes unrecognized by the tests.

Can you share more of the log?

Thanks,
Stephan


On Fri, Jan 15, 2016 at 9:52 AM, Robert Metzger  wrote:

> I think the exceptions are fine. We have many tests which also check
> certain error conditions. I guess the exceptions are expected by the tests.
>
> On Fri, Jan 15, 2016 at 1:20 AM, Prez Cannady 
> wrote:
>
> > My bad.  Gist with exception is here:
> >
> > https://gist.github.com/revprez/5a730a45674f0fc6e52b <
> > https://gist.github.com/revprez/5a730a45674f0fc6e52b>
> >
> > Prez Cannady
> > p: 617 500 3378
> > e: revp...@opencorrelate.org 
> > GH: https://github.com/opencorrelate 
> > LI: https://www.linkedin.com/in/revprez <
> > https://www.linkedin.com/in/revprez>
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > > On Jan 14, 2016, at 7:16 PM, Prez Cannady 
> > wrote:
> > >
> > > Steps
> > >
> > > Forked, cut bug fix branch off of release–0.10.
> > > One line change to fix class loader issue.
> > > Ran mvn clean test in flink-runtime. All 894 tests pass.
> > > Ran ./tools/change-scala-version 2.11 in the root.
> > > Ran mvn clean test -Dscala.version=2.11.7 in flink-runtime.
> > > Expected result
> > >
> > > All 894 tests pass. without incident
> > >
> > > Actual result
> > >
> > > All 894 tests actually did pass, but frequently seeing the exception
> > below in the log (with context, and also available as a gist:
> > >
> > > Discussion: Not sure if this is an issue, but figured I’d ask before
> > submitting my pull request.
> > >
> > > 6 > 19:06:15,250 INFO  org.apache.flink.runtime.blob.BlobCache
> >  - Downloading e52e6f94e5afa6502299cf982d31105111e1d9a4 from
> > localhost/127.0.0.1:61157 (retry 5)
> > > 6 > 19:06:15,251 DEBUG org.apache.flink.runtime.blob.BlobClient
> > - GET content addressable BLOB
> > e52e6f94e5afa6502299cf982d31105111e1d9a4 from /127.0.0.1:61170
> > > 6 > 19:06:15,251 ERROR org.apache.flink.runtime.blob.BlobCache
> >  - Failed to fetch BLOB
> > e52e6f94e5afa6502299cf982d31105111e1d9a4 from localhost/127.0.0.1:61157
> > and store it under
> >
> /var/folders/68/82g3zl512nj15b82m5f4nhb4gn/T/blobStore-dee0aeaa-8199-4024-ba44-4e39154aa8e8/cache/blob_e52e6f94e5afa6502299cf982d31105111e1d9a4
> > No retries left.
> > > java.io.FileNotFoundException:
> >
> /var/folders/68/82g3zl512nj15b82m5f4nhb4gn/T/blobStore-dee0aeaa-8199-4024-ba44-4e39154aa8e8/cache/blob_e52e6f94e5afa6502299cf982d31105111e1d9a4
> > (Permission denied)
> > > at java.io.FileOutputStream.open0(Native Method)
> > > at java.io.FileOutputStream.open(FileOutputStream.java:270)
> > > at java.io.FileOutputStream.(FileOutputStream.java:213)
> > > at java.io.FileOutputStream.(FileOutputStream.java:162)
> > > at
> > org.apache.flink.runtime.blob.BlobCache.getURL(BlobCache.java:126)
> > > at
> >
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:245)
> > > at
> >
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:114)
> > > at
> >
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManagerTest.testRegisterAndDownload(BlobLibraryCacheManagerTest.java:209)
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > at java.lang.reflect.Method.invoke(Method.java:497)
> > > at
> >
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> > > at
> >
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> > > at
> >
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> > > at
> >
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> > > at
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
> > > at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> > > at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> > > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> > > at
> > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
> > > at
> > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
> > > at
> > 

<    1   2   3   4   5   6   7   8   9   10   >