[jira] [Created] (FLINK-4874) Enabling Flink web interface in local execution

2016-10-20 Thread Krishna Prasad Anna Ramesh Kumar (JIRA)
Krishna Prasad Anna Ramesh Kumar created FLINK-4874:
---

 Summary: Enabling Flink web interface in local execution
 Key: FLINK-4874
 URL: https://issues.apache.org/jira/browse/FLINK-4874
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Affects Versions: 1.1.2
Reporter: Krishna Prasad Anna Ramesh Kumar
Priority: Trivial
 Fix For: 1.1.2


The local environment section in the local execution web page 
(https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/local_execution.html)
 indicates that in the web interface cannot be started while running in a local 
environment. As Till has pointed out in one of the mailing lists topics, this 
can be done by including the following in the program code.

"Configuration config = new Configuration();
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(getP, config);"

And adding this dependency
"
org.apache.flink
flink-runtime-web_2.10
${flink.version}
"

This should be added to the documentation as it very critical for developers 
like time who are trying to learning the framework.

Thanks!




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4873) Add config option to specify "home directory" for YARN client resource sharing

2016-10-20 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-4873:
-

 Summary: Add config option to specify "home directory" for YARN 
client resource sharing
 Key: FLINK-4873
 URL: https://issues.apache.org/jira/browse/FLINK-4873
 Project: Flink
  Issue Type: Improvement
  Components: YARN Client
Affects Versions: 1.2.0, 1.1.3
Reporter: Gyula Fora


The YARN client currently uses FileSystem.getHomeDirectory() to store the jar 
files that needed to be shared on the cluster. This pretty much forces users to 
run HDFS or something compatible with the Hadoop FS api on the cluster.

In some production environments file systems (distributed or simply shared) are 
simply mounted under the same path and do not require the use of the hadoop api 
for convenience. If we want to run Flink on YARN in these cases we would need 
to be able to define the "home directory" where Flink should copy the files for 
sharing.

It could be something like:
yarn.resource.upload.dir in the flink-conf.yaml



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Defining the Semantics of StreamingSQL

2016-10-20 Thread Tyler Akidau
On Thu, Oct 20, 2016 at 5:55 AM Fabian Hueske  wrote:

> Hi everybody,
>
> I cross posted the proposal also to the Apache Calcite dev mailing list to
> collect some feedback from the community.
> Tyler Akidau (Apache Beam committer) responded and commented on the
> proposal.
>
> I am moving our conversion from Google Doc comments to the mailing list
> with Tyler's consent to continue here.
>
> Tyler commented on this sentence:
>
> > "Result refinement does not affect the semantics of a query and should
> therefore not be part of the query. Instead it is a property of the query
> evaluation."
>
> Tyler:
> ---
>
> I don't think I agree with this statement. For streams, the nature of
> refinement is critical to defining their shape.
> And for tables, as soon as you provide a parameter to configure when late
> data are dropped, you've affected the semantics of the query.
> As such, I would argue refinement is essential to defining query semantics
> any time streams are involved, and thus a reasonable justification for
> syntax extension, e.g. the EMIT WHEN proposal:
> https://docs.google.com/document/d/1tSey4CeTrbb4VjWvtSA78OcU6BERXXDZ3t0HzSLij9Q/edit
>
> That said, extracting refinement semantics outside of the query seems like
> a fair compromise for the case where you're trying to support streaming
> within existing standard SQL syntax.
>
> Fabian:
> ---
>
> Yes, I know. We discussed this issue when drafting this proposal.
> When the query starts to drop late data, the computed result will only be
> an approximation of the defined query result. That should be clearly
> pointed out.
>
> Something that just came to my mind: shouldn't the watermark generation
> mode also be part of the query definition?
> Given the same query and same refinement and state cleanup configuration,
> different watermark generation modes could lead to different results
> because it essentially defines when data is late, right?
>
> Tyler:
> ---
>
> Re watermark generation: that's a very good question. Watermarks are going
> to be a characteristic of each table/stream, but I think there are two
> places where the user might have a voice in what the watermarks look like:
>
> 1. Sources: Watermarks are typically established at ingress time into the
> system.
> It's very reasonable that you might want to observe a given table/stream
> with different watermark strategies (e.g. a 100th %ile watermark when you
> want accuracy at the cost of latency, & a 99th %ile watermark when you want
> better latency at the cost of correctness).
> But it's not clear to me that this choice should be part of the query,
> aside from choosing the appropriate table/stream to query. Not all sources
> are going to support all watermark modes.
> So I'd tend to think it would make more sense for the watermark mode to be
> a property of the source, defined outside of the query (as other source
> metadata are, e.g. HBase table name for an HBase connector).
>
> 2. Grouping: Any time a grouping operation is performed (i.e. any time a
> table is created from a stream, if using the semantics proposed in my doc
> above), you can define how the grouping operation affects watermark
> progress. In general, when grouping within a window, the range of valid
> event times is [min timestamp of non-late data in the pane, ∞). In Beam, I
> believe we're going to provide the choice of { MIN_NON_LATE_TIMESTAP,
> MAX_NON_LATE_TIMESTAMP, END_OF_WINDOW }, since those are the most relevant
> ones (& you can always safely move the timestamp beyond the end of the
> window later on). This choice I do believe should be a part of query, as it
> directly relates to the way conversion from a stream to a table is
> happening, which will affect the shape of any stream derived from that
> table later on.
>
> I think the existing (largely implicit) proposal here is actually that the
> event time for a given window during grouping operations is defined by the
> user when they specify the value of the rowtime column in the result. I
> think the optimizer can figure out from that expression whether or not
> rowtime will evaluate to something >=
> MIN_TIMESTAMP_OF_ALL_NON_LATE_DATA_IN_THE_PANE. It might require a special
> aggregator that knows how to ignore late records. But being able to have
> the timestamp of an aggregate defined via the user-provided rowtime value
> would probably be preferable to a syntax extension.
>
> Thoughts?
>
> ---
>
>
> For Flink's Stream SQL our plans were to require the input stream to
> provide watermarks, i.e. treat watermarks as a property of the source, just
> as you described.
> It definitely makes sense to have the same stream with different watermark
> accuracy. In addition, the "Complete Result Offset" (as proposed in our
> document) can also be used to tune the result accuracy by allowing to defer
> the evaluation of a window. This would be similar to having watermarks with
> higher accuracy.
>

Interesting. That makes me wonder if 

Re: Removing flink-contrib/flink-operator-stats

2016-10-20 Thread Stephan Ewen
+1 for removing it

  - It seems quite unstable (is responsible for almost all build failures
right now)
  - It is not integrated with the metric system. Having more metrics is
desirable, but is a separate effort and needs a different approach.

On Wed, Oct 19, 2016 at 4:23 PM, Greg Hogan  wrote:

> Based on a cursory reading of FLINK-1297 I would lean toward dropping the
> code rather than moving to Apache Bahir. This looks to only be appropriate
> for batch and this module was not integrated into the runtime.
>
> If there is a way forward to make use this code in core Flink then that
> would be even better.
>
> On Wed, Oct 19, 2016 at 9:54 AM, Maximilian Michels 
> wrote:
>
> > +1 for removing it in case it is not widely used. Apache Bahir would
> > be a more appropriate place for this module then.
> >
> > -Max
> >
> >
> > On Wed, Oct 19, 2016 at 3:52 PM, Robert Metzger 
> > wrote:
> > > If there are no users and no contributors of the module, I'm +1 to
> remove
> > > it.
> > >
> > > If we decide to remove it from flink-contrib, but there are some
> > > contributors interested in it, I can offer to assist the contributors
> to
> > > add the extension to Apache Bahir.
> > >
> > > On Wed, Oct 19, 2016 at 2:00 PM, Ufuk Celebi  wrote:
> > >
> > >> Hey devs,
> > >>
> > >> I would like to propose the removal of the
> > >> flink-contrib/flink-operator-stats module.
> > >>
> > >> It is currently causing some build stability issues
> > >> (https://issues.apache.org/jira/browse/FLINK-4833) and there is no
> > >> active maintainer for it as far as I can tell.
> > >>
> > >> Are there any objections to this?
> > >>
> > >> I know it always feel bad to remove contributed code, but given the
> > >> size and scope of the project I think that it is fair to be
> > >> conservative about these issues. I think we always planned the contrib
> > >> module to be a staging area where we monitor the usefulness of
> > >> non-core additions.
> > >>
> > >> Best,
> > >>
> > >> Ufuk
> > >>
> >
>


[jira] [Created] (FLINK-4872) Type erasure problem exclusively on cluster execution

2016-10-20 Thread Martin Junghanns (JIRA)
Martin Junghanns created FLINK-4872:
---

 Summary: Type erasure problem exclusively on cluster execution
 Key: FLINK-4872
 URL: https://issues.apache.org/jira/browse/FLINK-4872
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.1.2
Reporter: Martin Junghanns


The following codes runs fine on local and collection execution environment but 
fails when executed on a cluster.

{code:title=Problem.java}
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;

import java.lang.reflect.Array;

public class Problem {

  public static class Pojo {
  }

  public static class Foo extends Tuple1 {
  }

  public static class Bar extends Tuple1 {
  }

  public static class UDF implements MapFunction {

private final Class clazz;

public UDF(Class clazz) {
  this.clazz = clazz;
}

@Override
public Bar map(Foo value) throws Exception {
  Bar bar = new Bar<>();
  //noinspection unchecked
  bar.f0 = (T[]) Array.newInstance(clazz, 10);
  return bar;
}
  }

  public static void main(String[] args) throws Exception {
// runs in local, collection and cluster execution
withLong();
// runs in local and collection execution, fails on cluster execution
withPojo();
  }

  public static void withLong() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Foo foo = new Foo<>();
foo.f0 = 42L;
DataSet barDataSource = env.fromElements(foo);
DataSet map = barDataSource.map(new UDF<>(Long.class));

map.print();
  }

  public static void withPojo() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Foo foo = new Foo<>();
foo.f0 = new Pojo();
DataSet barDataSource = env.fromElements(foo);
DataSet map = barDataSource.map(new UDF<>(Pojo.class));

map.print();
  }
}
{code}

{code:title=ProblemTest.java}
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class ProblemTest extends MultipleProgramsTestBase {

  public ProblemTest(TestExecutionMode mode) {
super(mode);
  }

  @Test
  public void testWithLong() throws Exception {
Problem.withLong();
  }

  @Test
  public void testWithPOJO() throws Exception {
Problem.withPojo();
  }
}
{code}

Exception:
{code}
The return type of function 'withPojo(Problem.java:58)' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.
org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
Problem.withPojo(Problem.java:60)
Problem.main(Problem.java:38) 
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4871) Add memory calculation for TaskManagers and forward MetricRegistry

2016-10-20 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4871:


 Summary: Add memory calculation for TaskManagers and forward 
MetricRegistry
 Key: FLINK-4871
 URL: https://issues.apache.org/jira/browse/FLINK-4871
 Project: Flink
  Issue Type: Sub-task
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor


Add automatic memory calculation for {{TaskManagers}} executed by the 
{{MiniCluster}}. 

Additionally, change the {{TaskManagerRunner}} to accept a given 
{{MetricRegistry}} so that the one instantiated by the {{MiniCluster}} is used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Implicit class RichExecutionEnvironment - Can't use MlUtils.readLibSVM(path) in QUickStart guide

2016-10-20 Thread Thomas FOURNIER
Yep I've done it: import org.apache.flink.api.scala._

I had reported this issue but still have the same problem.

My code is the following (with imports)

import org.apache.flink.api.scala._
import org.apache.flink.ml._

import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector
import org.apache.flink.ml.math.Vector

object App {

  def main(args: Array[String]) {

val env = ExecutionEnvironment.getExecutionEnvironment
val survival = env.readCsvFile[(String, String, String,
String)]("src/main/resources/haberman.data", ",")


val survivalLV = survival
  .map { tuple =>
val list = tuple.productIterator.toList
val numList = list.map(_.asInstanceOf[String].toDouble)
LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
  }



val astroTrain: DataSet[LabeledVector] =
MLUtils.readLibSVM(env,"src/main/resources/svmguide1")

val astroTest: DataSet[(Vector, Double)] = MLUtils
  .readLibSVM(env, "src/main/resources/svmguide1.t")
  .map(l => (l.vector, l.label))

val svm = SVM()
  .setBlocks(env.getParallelism)
  .setIterations(100)
  .setRegularization(0.001)
  .setStepsize(0.1)
  .setSeed(42)

svm.fit(astroTrain)
println(svm.toString)


val predictionPairs = svm.evaluate(astroTest)
predictionPairs.print()

  }
}



And I can't write:

MLUtils.readLibSVM("src/main/resources/svmguide1")







2016-10-20 16:26 GMT+02:00 Theodore Vasiloudis <
theodoros.vasilou...@gmail.com>:

> This has to do with not doing a wildcard import of the Scala api, it was
> reported and already fixed on master [1]
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.
> nabble.com/jira-Created-FLINK-4792-Update-documentation-
> QuickStart-FlinkML-td13936.html
>
> --
> Sent from a mobile device. May contain autocorrect errors.
>
> On Oct 20, 2016 2:06 PM, "Thomas FOURNIER" 
> wrote:
>
> > Hello,
> >
> > Following QuickStart guide in FlinkML, I have to do the following:
> >
> > val astroTrain:DataSet[LabeledVector] = MLUtils.readLibSVM(env,
> > "src/main/resources/svmguide1")
> >
> > Instead of:
> >
> > val astroTrain:DataSet[LabeledVector] = MLUtils.readLibSVM(
> > "src/main/resources/svmguide1")
> >
> >
> > Nonetheless, this implicit class in ml/packages
> >
> > implicit class RichExecutionEnvironment(executionEnvironment:
> > ExecutionEnvironment) {
> >   def readLibSVM(path: String): DataSet[LabeledVector] = {
> > MLUtils.readLibSVM(executionEnvironment, path)
> >   }
> > }
> >
> >
> > is supposed to pimp MLUtils in the way we want.
> >
> > Does it mean that RichExecutionEnvironment is not imported in the scope ?
> > What can be done to solve this ?
> >
> >
> > Thanks
> >
> > Regards
> > Thomas
> >
>


TopSpeedWindowing - in error: Could not forward element to next operator

2016-10-20 Thread Ovidiu Cristian Marcu
Could you check the following issue on master?

When running this example org.apache.flink.streaming.examples.windowing. 
TopSpeedWindowing
With default configuration I have no errors.

When I change the state backend with RocksDB I receive this error:

java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:388)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:1)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:393)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:1)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:32)
at 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction.apply(ReduceApplyWindowFunction.java:56)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:1)
at 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.fire(EvictingWindowOperator.java:334)
at 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:199)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:177)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:270)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:103)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:383)
... 15 more


[jira] [Created] (FLINK-4870) ContinuousFileMonitoringFunction does not properly handle absolut Windows paths

2016-10-20 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4870:
---

 Summary: ContinuousFileMonitoringFunction does not properly handle 
absolut Windows paths
 Key: FLINK-4870
 URL: https://issues.apache.org/jira/browse/FLINK-4870
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.1.2
Reporter: Chesnay Schepler
Priority: Minor
 Fix For: 1.2.0


The ContinuousFileMonitoringFunction fails for absolute windows paths without  
a dedicated scheme (e.g "C:\\tmp\\test.csv"), since the String path is directly 
fed into the URI constructor (which doesn't handle it properly) instead of 
first creating a flink Path and converting that into an URI.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4869) Store record pointer after record keys

2016-10-20 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-4869:
-

 Summary: Store record pointer after record keys
 Key: FLINK-4869
 URL: https://issues.apache.org/jira/browse/FLINK-4869
 Project: Flink
  Issue Type: Sub-task
  Components: Core
Affects Versions: 1.2.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor


{{NormalizedKeySorter}} serializes records into a {{RandomAccessInputView}} 
separate from the memory segments used for the sort keys. By storing the 
pointer after the sort keys the addition of the offset is moved from 
{{NormalizedKeySorter.compare}} which is O(n log n)) to other methods which are 
O\(n).

Will run a performance comparison before submitting a PR to how significant a 
performance improvement this would yield.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4868) Insertion sort could avoid the swaps

2016-10-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4868:
--

 Summary: Insertion sort could avoid the swaps
 Key: FLINK-4868
 URL: https://issues.apache.org/jira/browse/FLINK-4868
 Project: Flink
  Issue Type: Sub-task
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Minor


This is about the fallback to insertion sort at the beginning of 
{{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
we are at the bottom of the quick sort recursion tree.

The inner loop does a series of swaps on adjacent elements for moving a block 
of several elements one slot to the right and inserting the ith element at the 
hole. However, it would be faster to first copy the ith element to a temp 
location, and then move the block of elements to the right without swaps, and 
then copy the original ith element to the hole.

Moving the block of elements without swaps could be achieved by calling 
{{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
calls in {{MemorySegment.swap}} currently being done).

(Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
memcpy, so I'm not sure if we can do the entire block of elements with maybe 
even one {{UNSAFE.copyMemory}}.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4867:
--

 Summary: Investigate code generation for improving sort performance
 Key: FLINK-4867
 URL: https://issues.apache.org/jira/browse/FLINK-4867
 Project: Flink
  Issue Type: Sub-task
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Minor


This issue is for investigating whether code generation could speed up sorting. 
We should make some performance measurements on hand-written code that is 
similar to what we could generate, to see whether investing more time into this 
is worth it. If we find that it is worth it, we can open a second Jira for the 
actual implementation of the code generation.

I think we could generate one class at places where we currently instantiate 
{{QuickSort}}. This generated class would include the functionality of 
{{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
{{MemorySegment.compare}}, and {{MemorySegment.swap}}.

Btw. I'm planning to give this as a student project at a TU Berlin course in 
the next few months.

Some concrete ideas about how could a generated sorter be faster than the 
current sorting code:
* {{MemorySegment.compare}} could be specialized for
** Length: for small records, the loop could be unrolled
** Endiannes (currently it is optimized for big endian; and in the little 
endian case (e.g. x86) it does a Long.reverseBytes for each long read)
* {{MemorySegment.swapBytes}}
** In case of small records, using three {{UNSAFE.copyMemory}} is probably not 
as efficient as a specialized swap, because
*** We could use total loop unrolling in generated code (because we know the 
exact record size)
*** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
*** We will only need 2/3 the memory bandwidth, because the temporary storage 
could be a register if we swap one byte (or one {{long}}) at a time
** several checks might be eliminated
* Better inlining behaviour could be achieved 
** Virtual function calls to the methods of {{InMemorySorter}} could be 
eliminated. (Note, that these are problematic to devirtualize by the JVM if 
there are different derived classes used in a single Flink job (see \[8,7\]).)
** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
excessive checks make it too large
** {{MemorySegment.compare}} is probably also not inlined currently, because 
those two while loops are too large

\[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
long, Object, long, long)
\[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
\[8\] 
http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
\[9\] 
http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Implicit class RichExecutionEnvironment - Can't use MlUtils.readLibSVM(path) in QUickStart guide

2016-10-20 Thread Theodore Vasiloudis
This has to do with not doing a wildcard import of the Scala api, it was
reported and already fixed on master [1]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/jira-Created-FLINK-4792-Update-documentation-QuickStart-FlinkML-td13936.html

-- 
Sent from a mobile device. May contain autocorrect errors.

On Oct 20, 2016 2:06 PM, "Thomas FOURNIER" 
wrote:

> Hello,
>
> Following QuickStart guide in FlinkML, I have to do the following:
>
> val astroTrain:DataSet[LabeledVector] = MLUtils.readLibSVM(env,
> "src/main/resources/svmguide1")
>
> Instead of:
>
> val astroTrain:DataSet[LabeledVector] = MLUtils.readLibSVM(
> "src/main/resources/svmguide1")
>
>
> Nonetheless, this implicit class in ml/packages
>
> implicit class RichExecutionEnvironment(executionEnvironment:
> ExecutionEnvironment) {
>   def readLibSVM(path: String): DataSet[LabeledVector] = {
> MLUtils.readLibSVM(executionEnvironment, path)
>   }
> }
>
>
> is supposed to pimp MLUtils in the way we want.
>
> Does it mean that RichExecutionEnvironment is not imported in the scope ?
> What can be done to solve this ?
>
>
> Thanks
>
> Regards
> Thomas
>


[jira] [Created] (FLINK-4866) Make Trigger.clear() Abstract to Enforce Implementation

2016-10-20 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-4866:
---

 Summary: Make Trigger.clear() Abstract to Enforce Implementation
 Key: FLINK-4866
 URL: https://issues.apache.org/jira/browse/FLINK-4866
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Aljoscha Krettek


If the method is not abstract implementors of custom triggers will not realise 
that it could be necessary and they will likely not clean up their state/timers 
properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: FlinkML - Evaluate function should manage LabeledVector

2016-10-20 Thread Thomas FOURNIER
Done here: FLINK-4865 

2016-10-20 14:07 GMT+02:00 Thomas FOURNIER :

> Ok thanks.
>
> I'm going to create a specific JIRA on this. Ok ?
>
> 2016-10-20 12:54 GMT+02:00 Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com>:
>
>> I think this might be problematic with the current way we define the
>> predict operations because they require that both the Testing and
>> PredictionValue types are available.
>>
>> Here's what I had to do to get it to work (in
>> ml/pipeline/Predictor.scala):
>>
>> import org.apache.flink.ml.math.{Vector => FlinkVector}
>> implicit def labeledVectorEvaluateDataSetOperation[
>> Instance <: Estimator[Instance],
>> Model,
>> FlinkVector,
>> Double](
>> implicit predictOperation: PredictOperation[Instance, Model,
>> FlinkVector, Double],
>>   testingTypeInformation: TypeInformation[FlinkVector],
>>   predictionValueTypeInformation: TypeInformation[Double])
>> : EvaluateDataSetOperation[Instance, LabeledVector, Double] = {
>>   new EvaluateDataSetOperation[Instance, LabeledVector, Double] {
>> override def evaluateDataSet(
>>   instance: Instance,
>>   evaluateParameters: ParameterMap,
>>   testing: DataSet[LabeledVector])
>> : DataSet[(Double,  Double)] = {
>>   val resultingParameters = instance.parameters ++ evaluateParameters
>>   val model = predictOperation.getModel(instance,
>> resultingParameters)
>>
>>   implicit val resultTypeInformation =
>> createTypeInformation[(FlinkVector, Double)]
>>
>>   testing.mapWithBcVariable(model){
>> (element, model) => {
>>   (element.label.asInstanceOf[Double],
>> predictOperation.predict(element.vector.asInstanceOf[FlinkVector],
>> model))
>> }
>>   }
>> }
>>   }
>> }
>>
>> I'm not a fan of casting objects, but the compiler complains here
>> otherwise.
>>
>> Maybe someone has some input as to why the casting is necessary here,
>> given
>> that the underlying types are correct? Probably has to do with some type
>> erasure I'm not seeing here.
>>
>> --Theo
>>
>> On Wed, Oct 19, 2016 at 10:30 PM, Thomas FOURNIER <
>> thomasfournier...@gmail.com> wrote:
>>
>> > Hi,
>> >
>> > Two questions:
>> >
>> > 1- I was thinking of doing this:
>> >
>> > implicit def evaluateLabeledVector[T <: LabeledVector] = {
>> >
>> >   new EvaluateDataSetOperation[SVM,T,Double]() {
>> >
>> > override def evaluateDataSet(instance: SVM, evaluateParameters:
>> > ParameterMap, testing: DataSet[T]): DataSet[(Double, Double)] = {
>> >   val predictor = ...
>> >   testing.map(l => (l.label, predictor.predict(l.vector)))
>> >
>> > }
>> >   }
>> > }
>> >
>> > How can I access to my predictor object (predictor has type
>> > PredictOperation[SVM, DenseVector, T, Double]) ?
>> >
>> > 2- My first idea was to develop a predictOperation[T <: LabeledVector]
>> > so that I could use implicit def defaultEvaluateDatasetOperation
>> >
>> > to get an EvaluateDataSetOperationObject. Is it also valid or not ?
>> >
>> > Thanks
>> > Regards
>> >
>> > Thomas
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 2016-10-19 16:26 GMT+02:00 Theodore Vasiloudis <
>> > theodoros.vasilou...@gmail.com>:
>> >
>> > > Hello Thomas,
>> > >
>> > > since you are calling evaluate here, you should be creating an
>> > > EvaluateDataSet operation that works with LabeledVector, I see you are
>> > > creating a new PredictOperation.
>> > >
>> > > On Wed, Oct 19, 2016 at 3:05 PM, Thomas FOURNIER <
>> > > thomasfournier...@gmail.com> wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I'd like to improve SVM evaluate function so that it can use
>> > > LabeledVector
>> > > > (and not only Vector).
>> > > > Indeed, what is done in test is the following (data is a
>> > > > DataSet[LabeledVector]):
>> > > >
>> > > > val test = data.map(l => (l.vector, l.label))
>> > > > svm.evaluate(test)
>> > > >
>> > > > We would like to do:
>> > > > sm.evaluate(data)
>> > > >
>> > > >
>> > > > Adding this "new" code:
>> > > >
>> > > > implicit def predictLabeledPoint[T <: LabeledVector] = {
>> > > >  new PredictOperation  ...
>> > > > }
>> > > >
>> > > > gives me a predictOperation that should be used with
>> > > > defaultEvaluateDataSetOperation
>> > > > with the correct signature (ie with T <: LabeledVector and not T<:
>> > > Vector).
>> > > >
>> > > > Nonetheless, tests are failing:
>> > > >
>> > > >
>> > > > it should "predict with LabeledDataPoint" in {
>> > > >
>> > > >   val env = ExecutionEnvironment.getExecutionEnvironment
>> > > >
>> > > >   val svm = SVM().
>> > > > setBlocks(env.getParallelism).
>> > > > setIterations(100).
>> > > > setLocalIterations(100).
>> > > > setRegularization(0.002).
>> > > > setStepsize(0.1).
>> > > > setSeed(0)
>> > > >
>> > > >   val trainingDS = env.fromCollection(Classification.trainingData)
>> > > >   svm.fit(trainingDS)
>> > > >   val 

[jira] [Created] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector

2016-10-20 Thread Thomas FOURNIER (JIRA)
Thomas FOURNIER created FLINK-4865:
--

 Summary: FlinkML - Add EvaluateDataSet operation for LabeledVector
 Key: FLINK-4865
 URL: https://issues.apache.org/jira/browse/FLINK-4865
 Project: Flink
  Issue Type: New Feature
Reporter: Thomas FOURNIER
Priority: Minor


We can only call "evaluate" method on a DataSet[(Double,Vector)].

Eg: svm/evaluate(test) where test: DataSet[(Double,Vector)]

We want also to call this method on DataSet[LabeledVector]

 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4864) Shade Calcite dependency in flink-table

2016-10-20 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-4864:


 Summary: Shade Calcite dependency in flink-table
 Key: FLINK-4864
 URL: https://issues.apache.org/jira/browse/FLINK-4864
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.2.0
Reporter: Fabian Hueske


The Table API has a dependency on Apache Calcite.
A user reported to have version conflicts when having a own Calcite dependency 
in the classpath.

The solution would be to shade away the Calcite dependency (Calcite's 
transitive dependencies are already shaded).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Implicit class RichExecutionEnvironment - Can't use MlUtils.readLibSVM(path) in QUickStart guide

2016-10-20 Thread Thomas FOURNIER
Hello,

Following QuickStart guide in FlinkML, I have to do the following:

val astroTrain:DataSet[LabeledVector] = MLUtils.readLibSVM(env,
"src/main/resources/svmguide1")

Instead of:

val astroTrain:DataSet[LabeledVector] = MLUtils.readLibSVM(
"src/main/resources/svmguide1")


Nonetheless, this implicit class in ml/packages

implicit class RichExecutionEnvironment(executionEnvironment:
ExecutionEnvironment) {
  def readLibSVM(path: String): DataSet[LabeledVector] = {
MLUtils.readLibSVM(executionEnvironment, path)
  }
}


is supposed to pimp MLUtils in the way we want.

Does it mean that RichExecutionEnvironment is not imported in the scope ?
What can be done to solve this ?


Thanks

Regards
Thomas


Re: FlinkML - Evaluate function should manage LabeledVector

2016-10-20 Thread Theodore Vasiloudis
I think this might be problematic with the current way we define the
predict operations because they require that both the Testing and
PredictionValue types are available.

Here's what I had to do to get it to work (in ml/pipeline/Predictor.scala):

import org.apache.flink.ml.math.{Vector => FlinkVector}
implicit def labeledVectorEvaluateDataSetOperation[
Instance <: Estimator[Instance],
Model,
FlinkVector,
Double](
implicit predictOperation: PredictOperation[Instance, Model,
FlinkVector, Double],
  testingTypeInformation: TypeInformation[FlinkVector],
  predictionValueTypeInformation: TypeInformation[Double])
: EvaluateDataSetOperation[Instance, LabeledVector, Double] = {
  new EvaluateDataSetOperation[Instance, LabeledVector, Double] {
override def evaluateDataSet(
  instance: Instance,
  evaluateParameters: ParameterMap,
  testing: DataSet[LabeledVector])
: DataSet[(Double,  Double)] = {
  val resultingParameters = instance.parameters ++ evaluateParameters
  val model = predictOperation.getModel(instance, resultingParameters)

  implicit val resultTypeInformation =
createTypeInformation[(FlinkVector, Double)]

  testing.mapWithBcVariable(model){
(element, model) => {
  (element.label.asInstanceOf[Double],
predictOperation.predict(element.vector.asInstanceOf[FlinkVector],
model))
}
  }
}
  }
}

I'm not a fan of casting objects, but the compiler complains here otherwise.

Maybe someone has some input as to why the casting is necessary here, given
that the underlying types are correct? Probably has to do with some type
erasure I'm not seeing here.

--Theo

On Wed, Oct 19, 2016 at 10:30 PM, Thomas FOURNIER <
thomasfournier...@gmail.com> wrote:

> Hi,
>
> Two questions:
>
> 1- I was thinking of doing this:
>
> implicit def evaluateLabeledVector[T <: LabeledVector] = {
>
>   new EvaluateDataSetOperation[SVM,T,Double]() {
>
> override def evaluateDataSet(instance: SVM, evaluateParameters:
> ParameterMap, testing: DataSet[T]): DataSet[(Double, Double)] = {
>   val predictor = ...
>   testing.map(l => (l.label, predictor.predict(l.vector)))
>
> }
>   }
> }
>
> How can I access to my predictor object (predictor has type
> PredictOperation[SVM, DenseVector, T, Double]) ?
>
> 2- My first idea was to develop a predictOperation[T <: LabeledVector]
> so that I could use implicit def defaultEvaluateDatasetOperation
>
> to get an EvaluateDataSetOperationObject. Is it also valid or not ?
>
> Thanks
> Regards
>
> Thomas
>
>
>
>
>
>
>
> 2016-10-19 16:26 GMT+02:00 Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com>:
>
> > Hello Thomas,
> >
> > since you are calling evaluate here, you should be creating an
> > EvaluateDataSet operation that works with LabeledVector, I see you are
> > creating a new PredictOperation.
> >
> > On Wed, Oct 19, 2016 at 3:05 PM, Thomas FOURNIER <
> > thomasfournier...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I'd like to improve SVM evaluate function so that it can use
> > LabeledVector
> > > (and not only Vector).
> > > Indeed, what is done in test is the following (data is a
> > > DataSet[LabeledVector]):
> > >
> > > val test = data.map(l => (l.vector, l.label))
> > > svm.evaluate(test)
> > >
> > > We would like to do:
> > > sm.evaluate(data)
> > >
> > >
> > > Adding this "new" code:
> > >
> > > implicit def predictLabeledPoint[T <: LabeledVector] = {
> > >  new PredictOperation  ...
> > > }
> > >
> > > gives me a predictOperation that should be used with
> > > defaultEvaluateDataSetOperation
> > > with the correct signature (ie with T <: LabeledVector and not T<:
> > Vector).
> > >
> > > Nonetheless, tests are failing:
> > >
> > >
> > > it should "predict with LabeledDataPoint" in {
> > >
> > >   val env = ExecutionEnvironment.getExecutionEnvironment
> > >
> > >   val svm = SVM().
> > > setBlocks(env.getParallelism).
> > > setIterations(100).
> > > setLocalIterations(100).
> > > setRegularization(0.002).
> > > setStepsize(0.1).
> > > setSeed(0)
> > >
> > >   val trainingDS = env.fromCollection(Classification.trainingData)
> > >   svm.fit(trainingDS)
> > >   val predictionPairs = svm.evaluate(trainingDS)
> > >
> > >   
> > > }
> > >
> > > There is no PredictOperation defined for
> > > org.apache.flink.ml.classification.SVM which takes a
> > > DataSet[org.apache.flink.ml.common.LabeledVector] as input.
> > > java.lang.RuntimeException: There is no PredictOperation defined for
> > > org.apache.flink.ml.classification.SVM which takes a
> > > DataSet[org.apache.flink.ml.common.LabeledVector] as input.
> > >
> > >
> > >
> > > Thanks
> > >
> > > Regards
> > > Thomas
> > >
> >
>


Re: Add partitionedKeyBy to DataStream

2016-10-20 Thread Till Rohrmann
Hi Xiaowei,

I like the idea to reuse a partitioning and thus saving a shuffle
operation. It would be great if we could fail at runtime in case the
partitioning changed somehow. That way a logical user failure won't go
unnoticed.

Would it make sense to name the method partitionedByKey(...) because the
data is already partitioned?

Cheers,
Till

On Thu, Oct 20, 2016 at 9:53 AM, Xiaowei Jiang  wrote:

> After we do any interesting operations (e.g. reduce) on KeyedStream, the
> result becomes DataStream. In a lot of cases, the output still has the same
> or compatible keys with the KeyedStream (logically). But to do further
> operations on these keys, we are forced to use keyby again. This works
> semantically, but is costly in two aspects. First, it destroys the
> possibility of chaining, which is one of the most important optimization
> technique. Second, keyby will greatly expand the connected components of
> tasks, which has implications in failover optimization.
>
> To address this shortcoming, we propose a new operator partitionedKeyBy.
>
> DataStream {
> public  KeyedStream partitionedKeyBy(KeySelector key)
> }
>
> Semantically, DataStream.partitionedKeyBy(key) is equivalent to
> DataStream.keyBy(partitionedKey) where partitionedKey is key plus the
> taskid as an extra field. This guarantees that records from different tasks
> will never produce the same keys.
>
> With this, it's possible to do
>
> ds.keyBy(key1).reduce(func1)
> .partitionedKeyBy(key1).reduce(func2)
> .partitionedKeyBy(key2).reduce(func3);
>
> Most importantly, in certain cases, we will be able to chains these into a
> single vertex.
>
> Please share your thoughts. The JIRA is at https://issues.apache.org/j
> ira/browse/FLINK-4855
>
> Xiaowei
>


Re: Efficient Batch Operator in Streaming

2016-10-20 Thread Till Rohrmann
Hi Xiaowei,

thanks for sharing this proposal. How would fault tolerance work with the
BatchFunction? Since the batch function seems to manage its own buffer,
users would also have to make sure that in-flight elements which are
buffered but not yet processed are checkpointed, wouldn't they?

Cheers,
Till

On Thu, Oct 20, 2016 at 9:50 AM, Xiaowei Jiang  wrote:

> Very often, it's more efficient to process a batch of records at once
> instead of processing them one by one. We can use window to achieve this
> functionality. However, window will store all records in states, which can
> be costly. It's desirable to have an efficient implementation of batch
> operator. The batch operator works per task and behave similarly to aligned
> windows. Here is an example of how the interface looks like to a user.
>
> interface BatchFunction {
> // add the record to the buffer
> // returns if the batch is ready to be flushed
> boolean addRecord(T record);
>
> // process all pending records in the buffer
> void flush(Collector collector) ;
> }
>
> DataStream ds = ...
> BatchFunction func = ...
> ds.batch(func);
>
> The operator calls addRecord for each record. The batch function saves the
> record in its own buffer. The addRecord returns if the pending buffer
> should be flushed. In that case, the operator invokes flush.
>
> Please share your thoughts. The corresponding JIRA is
> https://issues.apache.org/jira/browse/FLINK-4854
>
> Xiaowei
>


Re: Efficient Batch Operator in Streaming

2016-10-20 Thread Chesnay Schepler
Could you not do the same thing today with a FlatMap function that 
stores incoming elements

and only computes and collects a result when a certain threshold is reached?

On 20.10.2016 09:50, Xiaowei Jiang wrote:

Very often, it's more efficient to process a batch of records at once
instead of processing them one by one. We can use window to achieve this
functionality. However, window will store all records in states, which can
be costly. It's desirable to have an efficient implementation of batch
operator. The batch operator works per task and behave similarly to aligned
windows. Here is an example of how the interface looks like to a user.

interface BatchFunction {
 // add the record to the buffer
 // returns if the batch is ready to be flushed
 boolean addRecord(T record);

 // process all pending records in the buffer
 void flush(Collector collector) ;
}

DataStream ds = ...
BatchFunction func = ...
ds.batch(func);

The operator calls addRecord for each record. The batch function saves the
record in its own buffer. The addRecord returns if the pending buffer
should be flushed. In that case, the operator invokes flush.

Please share your thoughts. The corresponding JIRA is
https://issues.apache.org/jira/browse/FLINK-4854

Xiaowei





FLIP-6 and running many "small" jobs

2016-10-20 Thread Maciek Próchniak

Hi,

we're looking at FLIP-6 and while it looks really great we started to 
wonder how it fits in our use case.


We currently have around 20 processes but the idea is to have many more 
of them. Many of them are pretty "small" - them don't large sources, are 
stateless, mainly filtering data.


As I understand, FLIP-6 makes job even more heavyweight thing than today 
- e.g. each job will have it's own jobmanager process etc.


Our concern is that each job will now require more resources - e.g. the 
number of threads, memory and so on. We are thinking about a way to make 
some jobs share these resources - of course that mean they won't be 
really isolated from each other.


So far the only idea we see is deploying these small jobs together, as 
one job - but this leads to some problems, like how to track which 
version is really deployed (we talk about stateless processes so the 
only problem is maintaining source kafka offsets)


Unfortunatelly our jobs can have many different sources and outcomes, so 
we don't think doing sth similar to King would work for us...


Do you have any views/ideas about such use case? Or is common view that 
we should deploy our stuff to mesos and let it handle resource 
allocation? But still - for some jobs we'd need sth like "1/4" slot :)


thanks,

maciek



Add partitionedKeyBy to DataStream

2016-10-20 Thread Xiaowei Jiang
After we do any interesting operations (e.g. reduce) on KeyedStream, the
result becomes DataStream. In a lot of cases, the output still has the same
or compatible keys with the KeyedStream (logically). But to do further
operations on these keys, we are forced to use keyby again. This works
semantically, but is costly in two aspects. First, it destroys the
possibility of chaining, which is one of the most important optimization
technique. Second, keyby will greatly expand the connected components of
tasks, which has implications in failover optimization.

To address this shortcoming, we propose a new operator partitionedKeyBy.

DataStream {
public  KeyedStream partitionedKeyBy(KeySelector key)
}

Semantically, DataStream.partitionedKeyBy(key) is equivalent to
DataStream.keyBy(partitionedKey) where partitionedKey is key plus the
taskid as an extra field. This guarantees that records from different tasks
will never produce the same keys.

With this, it's possible to do

ds.keyBy(key1).reduce(func1)
.partitionedKeyBy(key1).reduce(func2)
.partitionedKeyBy(key2).reduce(func3);

Most importantly, in certain cases, we will be able to chains these into a
single vertex.

Please share your thoughts. The JIRA is at https://issues.apache.org/j
ira/browse/FLINK-4855

Xiaowei


Efficient Batch Operator in Streaming

2016-10-20 Thread Xiaowei Jiang
Very often, it's more efficient to process a batch of records at once
instead of processing them one by one. We can use window to achieve this
functionality. However, window will store all records in states, which can
be costly. It's desirable to have an efficient implementation of batch
operator. The batch operator works per task and behave similarly to aligned
windows. Here is an example of how the interface looks like to a user.

interface BatchFunction {
// add the record to the buffer
// returns if the batch is ready to be flushed
boolean addRecord(T record);

// process all pending records in the buffer
void flush(Collector collector) ;
}

DataStream ds = ...
BatchFunction func = ...
ds.batch(func);

The operator calls addRecord for each record. The batch function saves the
record in its own buffer. The addRecord returns if the pending buffer
should be flushed. In that case, the operator invokes flush.

Please share your thoughts. The corresponding JIRA is
https://issues.apache.org/jira/browse/FLINK-4854

Xiaowei


[jira] [Created] (FLINK-4863) states of merging window and trigger are set to different TimeWindows on merge

2016-10-20 Thread Manu Zhang (JIRA)
Manu Zhang created FLINK-4863:
-

 Summary: states of merging window and trigger are set to different 
TimeWindows on merge
 Key: FLINK-4863
 URL: https://issues.apache.org/jira/browse/FLINK-4863
 Project: Flink
  Issue Type: Bug
  Components: Streaming, Windowing Operators
Reporter: Manu Zhang


While window state is set to the mergeResult's stateWindow (one of original 
windows), trigger state is set to the mergeResult itself. This will fail 
{{Timer}} of  {{ContinuousEventTimeTrigger}} since its window cannot be found 
in the window state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)