Re: Machine Learning: Flink and MOA

2018-02-23 Thread Theodore Vasiloudis
Hello Christophe,

That's very interesting, I've been working with MOA/SAMOA recently and was
considering if we could create some
easy integration with Flink.

I have a Master student this year that could do some work on this,
hopefully we can create something interesting
there.

Regards,
Theodore

On Wed, Feb 21, 2018 at 7:38 PM, Christophe Salperwyck <
christophe.salperw...@gmail.com> wrote:

> Hi guys,
>
> I know there is FlinkML to do some machine learning with Flink but it
> works on DataSet and not on DataStream, there is also SAMOA which can run
> on Flink but I find it a bit too complicated.
>
> I wanted to see if it would be easy to plug directly MOA on Flink and
> tried to present it in the DataKRK meetup, but I didn't have time at the
> end of the presentation... Nevertheless I spent a bit of time plugging
> Flink and MOA and I thought it might be worth sharing it in case it would
> be interesting for someone. I also take this opportunity to get some
> feedback on it from people in the Flink community if they have a bit of
> time to review it.
>
> Here is the code:
> https://github.com/csalperwyck/moa-flink-ozabag-example
> https://github.com/csalperwyck/moa-flink-traintest-example
>
> Many Flink methods were very convenient to plug these 2 tools :-)
>
> Keep the good work!
>
> Cheers,
> Christophe
> PS: if some people are in bigdatatechwarsaw and interested, we can discuss
> tomorrow :-)
>


Re: [POLL] Who still uses Java 7 with Flink ?

2017-03-23 Thread Theodore Vasiloudis
Hello all,

I'm sure you've considered this already, but what this data does not
include is all the potential future users,
i.e. slower moving organizations (banks etc.) which could be on Java 7
still.

Whether those are relevant is up for debate.

Cheers,
Theo

On Thu, Mar 23, 2017 at 12:14 PM, Robert Metzger 
wrote:

> Yeah, you are right :)
> I'll put something in my calendar for end of May.
>
> On Thu, Mar 23, 2017 at 12:12 PM, Greg Hogan  wrote:
>
>> Robert,
>>
>> Thanks for the report. Shouldn’t we be revisiting this decision at the
>> beginning of the new release cycle rather than near the end? There is
>> currently little cost to staying with Java 7 since no Flink code or pull
>> requests have been written for Java 8.
>>
>> Greg
>>
>>
>>
>> On Mar 23, 2017, at 6:37 AM, Robert Metzger  wrote:
>>
>> Looks like 9% on twitter and 24% on the mailing list are still using Java
>> 7.
>>
>> I would vote to keep supporting Java 7 for Flink 1.3 and then revisit
>> once we are approaching 1.4 in September.
>>
>> On Thu, Mar 16, 2017 at 8:00 AM, Bowen Li 
>> wrote:
>>
>>> There's always a tradeoff we need to make. I'm in favor of upgrading to
>>> Java 8 to bring in all new Java features.
>>>
>>> The common way I've seen (and I agree) other software upgrading major
>>> things like this is 1) upgrade for next big release without backward
>>> compatibility and notify everyone 2) maintain and patch current, old-tech
>>> compatible version at a reasonably limited scope. Building backward
>>> compatibility is too much for an open sourced project
>>>
>>>
>>>
>>> On Wed, Mar 15, 2017 at 7:10 AM, Robert Metzger 
>>> wrote:
>>>
 I've put it also on our Twitter account:
 https://twitter.com/ApacheFlink/status/842015062667755521

 On Wed, Mar 15, 2017 at 2:19 PM, Martin Neumann 
 wrote:

 > I think this easier done in a straw poll than in an email
 conversation.
 > I created one at: http://www.strawpoll.me/12535073
 > (Note that you have multiple choices.)
 >
 >
 > Though I prefer Java 8 most of the time I have to work on Java 7. A
 lot of
 > the infrastructure I work on still runs Java 7, one of the companies I
 > build a prototype for a while back just updated to Java 7 2 years
 ago. I
 > doubt we can ditch Java 7 support any time soon if we want to make it
 easy
 > for companies to use Flink.
 >
 > cheers Martin
 >
 > //PS sorry if this gets sent twice, we just migrated to a new mail
 system
 > and a lot of things are broken
 >
 > 
 > From: Stephan Ewen 
 > Sent: Wednesday, March 15, 2017 12:30:24 PM
 > To: user@flink.apache.org; d...@flink.apache.org
 > Subject: [POLL] Who still uses Java 7 with Flink ?
 >
 > Hi all!
 >
 > I would like to get a feeling how much Java 7 is still being used
 among
 > Flink users.
 >
 > At some point, it would be great to drop Java 7 support and make use
 of
 > Java 8's new features, but first we would need to get a feeling how
 much
 > Java 7 is still used.
 >
 > Would be happy if users on Java 7 respond here, or even users that
 have
 > some insights into how widespread they think Java 7 still is.
 >
 > Thanks,
 > Stephan
 >
 >
 >
 >
 >

>>>
>>>
>>
>>
>


A question about iterations and prioritizing "control" over "data" inputs

2017-03-15 Thread Theodore Vasiloudis
Hello all,

I've started thinking about online learning in Flink and one of the issues
that has come
up in other frameworks is the ability to prioritize "control" over "data"
events in iterations.

To set an example, say we develop an ML model, that ingests events in
parallel, performs
an aggregation to update the model, and then broadcasts the updated model
to back through
an iteration/back edge. Using the above nomenclature the events being
ingested would be
"data" events, and the model update would a "control" event.

I talked about this scenario a bit with couple of people (Paris and
Gianmarco) and one thing
we would like to have is the ability to prioritize the ingestion of control
events over the data events.

If my understanding is correct, currently there is a buffer/queue of events
waiting to be processed
for each operator, and each incoming event ends up at the end of that queue.

If our data source is fast, and the model updates slow, a lot of data
events might be buffered/scheduled
to be processed before each model update, because of the speed difference
between the two
streams. But we would like to update the model that is used to process data
events as soon as
the newest version becomes available.

Is it somehow possible to make the control events "jump" the queue and be
processed as soon
as they arrive over the data events?

Regards,
Theodore

P.S. This is still very much a theoretical problem, I haven't looked at how
such a pipeline would
be implemented in Flink.


Re: FlinkML and DataStream API

2016-12-21 Thread Theodore Vasiloudis
Hello Mäki,

I think what you would like to do is train a model using batch, and use the
Flink streaming API as a way to serve your model and make predictions.

While we don't have an integrated way to do that in FlinkML currently, I
definitely think that's possible. I know Marton Balassi has been working on
something like this for the ALS algorithm, but I can't find the code right
now on mobile.
The general idea is to keep your model as state and use it to make
predictions on a stream of incoming data.

Model serving is definitely something we'll be working on in the future,
I'll have a master student working on exactly that next semester.

-- 
Sent from a mobile device. May contain autocorrect errors.

On Dec 21, 2016 5:24 PM, "Mäki Hanna"  wrote:

> Hi,
>
>
>
> I’m wondering if there is a way to use FlinkML and make predictions
> continuously for test data coming from a DataStream.
>
>
>
> I know FlinkML only supports the DataSet API (batch) at the moment, but is
> there a way to convert a DataStream into DataSets? I’m thinking of
> something like
>
>
>
> (0. fit model in batch mode)
>
> 1. window the DataStream
>
> 2. convert the windowed stream to DataSets
>
> 3. use the FlinkML methods to make predictions
>
>
>
> BR,
>
> Hanna
>
>
> Disclaimer: This message and any attachments thereto are intended solely
> for the addressed recipient(s) and may contain confidential information. If
> you are not the intended recipient, please notify the sender by reply
> e-mail and delete the e-mail (including any attachments thereto) without
> producing, distributing or retaining any copies thereof. Any review,
> dissemination or other use of, or taking of any action in reliance upon,
> this information by persons or entities other than the intended
> recipient(s) is prohibited. Thank you.
>


Understanding connected streams use without timestamps

2016-11-21 Thread Theodore Vasiloudis
Hello all,

I was playing around with the the IncrementalLearningSkeleton example and I
had a couple of questions regarding the behavior of connected streams.

In the example the elements are assigned timestamps, and there is a stream,
model, that produces
Double[] elements by ingesting and processing a stream of training Integer
data points.

DataStream model = trainingData
.assignTimestampsAndWatermarks(new LinearTimestamp())
.timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
.apply(new PartialModelBuilder());

The model stream is then connected onto a newData stream which allows us to
use the
constantly updated model stream to make predictions for the incoming stream
of newData,
by having a model variable shared between the two map functions in the
coMap class.
The shared model var is updated every time an element from the model stream
arrives (starts
out as null)

DataStream prediction = newData.connect(model).map(new Predictor
());

My confusion comes when I tried a slightly different approach [2], without
using timestamps
or watermarks. In my example I simply create countWindows of say 100
elements,
and I use readTextFile to read in the trainingData and newData :

DataStream model = trainingData
.countWindowAll(100)
.apply(new PartialModelBuilder());

When I then connect the model stream to the newData stream, the map1
function of the
comap never sees the model as not null, as it seems that the map functions
are executed
in order: first the map1 function is executed for all the newData elements,
then the map2
function is executed for all the model elements.

So how does having or not having timestamps affect the behavior of the
connected stream?

How would I handle such a case if the notion of timestamps does not apply
for my data?
(i.e. here I'm interested in streaming historical data, I assume their
order does not matter)


[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java

[2] https://gist.github.com/thvasilo/67bcb9370b03971f380ae43c4ae6e2d0


Additional steps needed for the Java quickstart guide

2016-11-16 Thread Theodore Vasiloudis
Hello all,

I was preparing an exercise for some Master students and I went through
running the Java
quickstart setup [1] again to verify everything works as expected.

I ran into a problem when running from within IDEA, we've encountered this
in the past during trainings.

While the quickstart guide indicates that you should be able to just run
the examples
from the Maven archetype that was not the case for me, what I got instead
was
ClassNotFound exceptions because the default run configuration does not
pull in
the dependencies as it should.

What I needed to do to get the examples to run from within the IDE was:

1) In project structure, add a new module say "mainRunner"
2) In mainRunner's dependencies add the main module (say "quickstart") as a
module depency.
3) In mainRunner's dependencies add the rest of the project library
dependencies.
4) In the run configuration for the example change the "Use claspath of
module" to mainRunner from quickstart.

If this is a problem other people have ran into (as evidenced by [2] [3]) I
think we should either include the instructions in the docs, or if possible
change the Java quickstart archetype to include the extra module.
The Scala quickstart has the mainRunner module already, and the appropriate
instructions
on how to run from within IDEA are in the docs.

I saw there's a PR [4] for the quickstart already, we could include this
there.


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/java_api_quickstart.html
[2]
https://stackoverflow.com/questions/40073413/apache-flink-wordcount-example-exception-in-thread-main-java-lang-noclassdef
[3]
https://stackoverflow.com/questions/30571618/executing-sample-flink-program-in-local
[4] https://github.com/apache/flink/pull/2764


Re: Multiclass classification example

2016-10-19 Thread Theodore Vasiloudis
Hello Kursat,

We don't have a multi class classifier in FlinkML currently.

Regards,
Theodore

-- 
Sent from a mobile device. May contain autocorrect errors.

On Oct 19, 2016 12:33 AM, "Kürşat Kurt"  wrote:

> Hi;
>
>
> I am trying to learn Flink Ml lib.
>
> Where can i find detailed multiclass classification example?
>


Re: FlinkML - Fail to execute QuickStart example

2016-10-17 Thread Theodore Vasiloudis
That is my bad, I must have been testing against a private branch when
writing the guide, the SVM as it stands only has a predict operation for
Vector not LabeledVector.

IMHO I would like to have a predict operator for LabeledVector for all
predictors (that would just call the existing Vector prediction
internally), but IIRC we decided to go with an Evaluate operator instead as
written in the evaluation PR .

I'll make a PR to fix the guide, any chance you can create a JIRA for this?

Regards,
Theodore

On Mon, Oct 17, 2016 at 6:22 PM, Thomas FOURNIER <
thomasfournier...@gmail.com> wrote:

> Hi,
>
> Executing the following code (see QuickStart):
>
> val env = ExecutionEnvironment.getExecutionEnvironment
> val survival = env.readCsvFile[(String, String, String, 
> String)]("src/main/resources/haberman.data", ",")
>
>
> val survivalLV = survival
>   .map { tuple =>
> val list = tuple.productIterator.toList
> val numList = list.map(_.asInstanceOf[String].toDouble)
> LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
>   }
>
>
>
> val astroTrain = MLUtils.readLibSVM(env, "src/main/resources/svmguide1")
> val astroTest = MLUtils.readLibSVM(env, "src/main/resources/svmguide1.t")
>
>
> val svm = SVM()
>   .setBlocks(env.getParallelism)
>   .setIterations(100)
>   .setRegularization(0.001)
>   .setStepsize(0.1)
>   .setSeed(42)
>
> svm.fit(astroTrain)
> svm.predict(astroTest)
>
>
> I encounter the following error:
>
> Exception in thread "main" java.lang.RuntimeException: There is no 
> PredictOperation defined for org.apache.flink.ml.classification.SVM which 
> takes a DataSet[org.apache.flink.ml.common.LabeledVector] as input.
>
> Any idea ?
>
> Thanks
>
> Thomas
>
>
>
>
>


Re: SVM Multiclass classification

2016-10-14 Thread Theodore Vasiloudis
Hello Kursat,

As noted in the documentation, the SVM implementation is for binary
classification only for the time being.

Regards,
Theodore

-- 
Sent from a mobile device. May contain autocorrect errors.

On Oct 13, 2016 8:53 PM, "Kürşat Kurt"  wrote:

> Hi;
>
>
>
> I am trying to classify documents.
>
> When i try to predict (same of training set) there is only 1 and -1
> predictions.
>
> Accuracy is 0%.
>
>
>
>
>
> Can you help me please?
>
>
>
> *val* env = ExecutionEnvironment.getExecutionEnvironment
>
> *val* training = Seq(
>
>   *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 2, 3),
> Array(1.0, 1.0, 1.0))),
>
>   *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 1, 5, 9),
> Array(1.0, 1.0, 1.0, 1.0))),
>
>   *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2), Array(
> 0.0, 1.0))),
>
>   *new* LabeledVector(0.0, *new* SparseVector(10, Array(0), Array(0.0
> ))),
>
>   *new* LabeledVector(2.0, *new* SparseVector(10, Array(0, 2), Array(
> 0.0, 1.0))),
>
>   *new* LabeledVector(2.0, *new* SparseVector(10, Array(0), Array(0.0
> ))),
>
>   *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 3), Array(
> 1.0, 1.0))),
>
>   *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2, 3),
> Array(0.0, 1.0, 1.0))),
>
>   *new* LabeledVector(2.0, *new* SparseVector(10, Array(0, 7, 9),
> Array(0.0, 1.0))),
>
>   *new* LabeledVector(2.0, *new* SparseVector(10, Array(2,3,4), Array(
> 0.0,1.0,1.0))),
>
>   *new* LabeledVector(2.0, *new* SparseVector(10, Array(0, 3), Array(
> 1.0, 1.0))),
>
>   *new* LabeledVector(0.0, *new* SparseVector(10, Array(2, 3,9),
> Array(1.0, 0.0, 1.0)))
>
>
>
> );
>
> *val* trainingDS = env.fromCollection(training)
>
> *val* testingDS = env.fromCollection(training)
>
> *val* svm = *new* SVM().setBlocks(env.getParallelism)
>
> svm.fit(trainingDS)
>
> *val* predictions = *svm*.evaluate(testingDS.map(x => (x.vector, x.
> label)))
>
> predictions.print();
>
>
>
> Sample output:
>
>
>
> (1.0,1.0)
>
> (1.0,1.0)
>
> (0.0,1.0)
>
> (0.0,-1.0)
>
> (2.0,1.0)
>
> (2.0,-1.0)
>
> (1.0,1.0)
>
> (0.0,1.0)
>
> (2.0,1.0)
>
> (2.0,1.0)
>
> (2.0,1.0)
>
> (0.0,1.0)
>


Re: Flink Iterations vs. While loop

2016-09-05 Thread Theodore Vasiloudis
Hello Dan,

are you broadcasting the 85GB of data then? I don't get why you wouldn't
store that file on HDFS so it's accessible by your workers.


If you have the full code available somewhere we might be able to help
better.

For L-BFGS you should only be broadcasting the model (i.e. the weight
vector), and yes that would happen at each iteration, since you are
updating the model at each iteration.

On Fri, Sep 2, 2016 at 5:30 PM, Dan Drewes 
wrote:

> Hi Greg,
>
> thanks for your response!
>
> I just had a look and realized that it's just about 85 GB of data. Sorry
> about that wrong information.
>
> It's read from a csv file on the master node's local file system. The 8
> nodes have more than 40 GB available memory each and since the data is
> equally distributed I assume there should be no need to spill anything on
> disk.
>
> There are 9 iterations.
>
> Is it possible that also with Flink Iterations the data is repeatedly
> distributed? Or the other way around: Might it be that flink "remembers"
> somehow that the data is already distributed even for the while loop?
>
> -Dan
>
>
>
> Am 02.09.2016 um 16:39 schrieb Greg Hogan:
>
> Hi Dan,
>
> Where are you reading the 200 GB "data" from? How much memory per node? If
> the DataSet is read from a distributed filesystem and if with iterations
> Flink must spill to disk then I wouldn't expect much difference. About how
> many iterations are run in the 30 minutes? I don't know that this is
> reported explicitly, but if your convergence function only has one input
> record per iteration then the reported total is the iteration count.
>
> One other thought, we should soon have support for object reuse with
> arrays (FLINK-3695). This would be implemented as DoubleValueArray or
> ValueArray rather than double[] but it would be interesting to
> test for a change in performance.
>
> Greg
>
> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes 
> wrote:
>
>> Hi,
>>
>> for my bachelor thesis I'm testing an implementation of L-BFGS algorithm
>> with Flink Iterations against a version without Flink Iterations but a
>> casual while loop instead. Both programs use the same Map and Reduce
>> transformations in each iteration. It was expected, that the performance of
>> the Flink Iterations would scale better with increasing size of the input
>> data set. However, the measured results on an ibm-power-cluster are very
>> similar for both versions, e.g. around 30 minutes for 200 GB data. The
>> cluster has 8 nodes, was configured with 4 slots per node and I used a
>> total parallelism of 32.
>> In every Iteration of the while loop a new flink job is started and I
>> thought, that also the data would be distributed over the network again in
>> each iteration which should consume a significant and measurable amount of
>> time. Is that thought wrong or what is the computional overhead of the
>> flink iterations that is equalizing this disadvantage?
>> I include the relevant part of both programs and also attach the
>> generated execution plans.
>> Thank you for any ideas as I could not find much about this issue in the
>> flink docs.
>>
>> Best, Dan
>>
>> *Flink Iterations:*
>>
>> DataSet data = ...
>>
>> State state = initialState(m, initweights,0,new double[initweights.length]);
>> DataSet statedataset = env.fromElements(state);
>> //start of iteration sectionIterativeDataSet loop= 
>> statedataset.iterate(niter);;
>>
>>
>> DataSet statewithnewlossgradient = 
>> data.map(difffunction).withBroadcastSet(loop, "state")
>>   .reduce(accumulate)
>>   .map(new NormLossGradient(datasize))
>>   .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>>   .map(new LBFGS());
>>
>>
>> DataSet converged = statewithnewlossgradient.filter(
>>new FilterFunction() {
>>   @Override  public boolean filter(State value) throws Exception {
>>  if(value.getIflag()[0] == 0){
>> return false;
>>  }
>>  return true;
>>   }
>>}
>> );
>>
>> DataSet finalstate = 
>> loop.closeWith(statewithnewlossgradient,converged);
>>
>> *While loop: *
>>
>> DataSet data =...
>> State state = initialState(m, initweights,0,new double[initweights.length]);
>> int cnt=0;do{
>>LBFGS lbfgs = new LBFGS();
>>statedataset=data.map(difffunction).withBroadcastSet(statedataset, 
>> "state")
>>   .reduce(accumulate)
>>   .map(new NormLossGradient(datasize))
>>   .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>>   .map(lbfgs);
>>cnt++;
>> }while (cnt>
>>


Re: Having a single copy of an object read in a RichMapFunction

2016-08-08 Thread Theodore Vasiloudis
Thank you for the help Robert!

Regarding the static field alternative you provided, I'm a bit confused
about the difference between slots and instances.

When you say that by using a static field it will be shared by all
instances of the Map on the slot, does that mean that if the TM has
multiple slots, we again get multiple copies of the data?

Or does it mean that with a static field on a TM with multiple slots, we
only get one copy per TM, i.e. the slots share the same data?

As an example, if I have 1 TM with 3 slots, and I run the Map with a
parallelism of 3, I get 3 copies of the data on that TM. Does using a
static field change that?

On Mon, Aug 8, 2016 at 7:19 AM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Theo,
>
> I think there are some variants you can try out for the problem. I think
> it depends a bit on the performance characteristics you expect:
> - The simplest variant is to run one TM per machine with one slot only.
> This is probably not feasible because you can't use all the CPU cores
> - ... to solve that problem, you could use the same setup, but a worker
> thread pool, sharing one matrix per machine.
> - If you need higher parallelism in Flink, you could also have multiple
> slots per TM, and use a static field in your RichFlatMap class. The static
> field will be shared by all FlatMap instances of the slot.
>
> Flink doesn't have any build-in tooling for sharing memory between
> multiple TaskManagers on the same machine, but you can try to use anything
> available in Java (memory mapped files, JNI, ..)
>
>
> On Fri, Aug 5, 2016 at 7:10 PM, Sameer Wadkar <sam...@axiomine.com> wrote:
>
>> You mean "Connected Streams"? I use that for the same requirement. I way
>> it works it looks like it creates multiple copies per co-map operation. I
>> use the keyed version to match side inputs with the data.
>>
>> Sent from my iPhone
>>
>> On Aug 5, 2016, at 12:36 PM, Theodore Vasiloudis <
>> theodoros.vasilou...@gmail.com> wrote:
>>
>> Yes this is a streaming use case, so broadcast is not an option.
>>
>> If I get it correctly with connected streams I would emulate side input
>> by "streaming" the matrix with a key that all incoming vector records match
>> on?
>>
>> Wouldn't that create multiple copies of the matrix in memory?
>>
>> On Thu, Aug 4, 2016 at 6:56 PM, Sameer W <sam...@axiomine.com> wrote:
>>
>>> Theodore,
>>>
>>> Broadcast variables do that when using the DataSet API -
>>> http://data-artisans.com/how-to-factorize-a-700-gb-matrix-
>>> with-apache-flink/
>>>
>>> See the following lines in the article-
>>> To support the above presented algorithm efficiently we had to improve
>>> Flink’s broadcasting mechanism since it easily becomes the bottleneck of
>>> the implementation. The enhanced Flink version can share broadcast
>>> variables among multiple tasks running on the same machine. *Sharing
>>> avoids having to keep for each task an individual copy of the broadcasted
>>> variable on the heap. This increases the memory efficiency significantly,
>>> especially if the broadcasted variables can grow up to several GBs of size.*
>>>
>>> If you are using in the DataStream API then side-inputs (not yet
>>> implemented) would achieve the same as broadcast variables.  (
>>> https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiX
>>> wSBXjv-M5eOv-MKQYN3m4/edit#) . I use keyed Connected Streams in
>>> situation where I need them for one of my use-cases (propagating rule
>>> changes to the data) where I could have used side-inputs.
>>>
>>> Sameer
>>>
>>>
>>>
>>>
>>> On Thu, Aug 4, 2016 at 8:56 PM, Theodore Vasiloudis <
>>> theodoros.vasilou...@gmail.com> wrote:
>>>
>>>> Hello all,
>>>>
>>>> for a prototype we are looking into we would like to read a big matrix
>>>> from HDFS, and for every element that comes in a stream of vectors do on
>>>> multiplication with the matrix. The matrix should fit in the memory of one
>>>> machine.
>>>>
>>>> We can read in the matrix using a RichMapFunction, but that would mean
>>>> that a copy of the matrix is made for each Task Slot AFAIK, if the
>>>> RichMapFunction is instantiated once per Task Slot.
>>>>
>>>> So I'm wondering how should we try address this problem, is it possible
>>>> to have just one copy of the object in memory per TM?
>>>>
>>>> As a follow-up if we have more than one TM per node, is it possible to
>>>> share memory between them? My guess is that we have to look at some
>>>> external store for that.
>>>>
>>>> Cheers,
>>>> Theo
>>>>
>>>
>>>
>>
>


Re: Having a single copy of an object read in a RichMapFunction

2016-08-05 Thread Theodore Vasiloudis
Yes this is a streaming use case, so broadcast is not an option.

If I get it correctly with connected streams I would emulate side input by
"streaming" the matrix with a key that all incoming vector records match on?

Wouldn't that create multiple copies of the matrix in memory?

On Thu, Aug 4, 2016 at 6:56 PM, Sameer W <sam...@axiomine.com> wrote:

> Theodore,
>
> Broadcast variables do that when using the DataSet API -
> http://data-artisans.com/how-to-factorize-a-700-gb-
> matrix-with-apache-flink/
>
> See the following lines in the article-
> To support the above presented algorithm efficiently we had to improve
> Flink’s broadcasting mechanism since it easily becomes the bottleneck of
> the implementation. The enhanced Flink version can share broadcast
> variables among multiple tasks running on the same machine. *Sharing
> avoids having to keep for each task an individual copy of the broadcasted
> variable on the heap. This increases the memory efficiency significantly,
> especially if the broadcasted variables can grow up to several GBs of size.*
>
> If you are using in the DataStream API then side-inputs (not yet
> implemented) would achieve the same as broadcast variables.  (
> https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-
> MKQYN3m4/edit#) . I use keyed Connected Streams in situation where I need
> them for one of my use-cases (propagating rule changes to the data) where I
> could have used side-inputs.
>
> Sameer
>
>
>
>
> On Thu, Aug 4, 2016 at 8:56 PM, Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com> wrote:
>
>> Hello all,
>>
>> for a prototype we are looking into we would like to read a big matrix
>> from HDFS, and for every element that comes in a stream of vectors do on
>> multiplication with the matrix. The matrix should fit in the memory of one
>> machine.
>>
>> We can read in the matrix using a RichMapFunction, but that would mean
>> that a copy of the matrix is made for each Task Slot AFAIK, if the
>> RichMapFunction is instantiated once per Task Slot.
>>
>> So I'm wondering how should we try address this problem, is it possible
>> to have just one copy of the object in memory per TM?
>>
>> As a follow-up if we have more than one TM per node, is it possible to
>> share memory between them? My guess is that we have to look at some
>> external store for that.
>>
>> Cheers,
>> Theo
>>
>
>


Having a single copy of an object read in a RichMapFunction

2016-08-04 Thread Theodore Vasiloudis
Hello all,

for a prototype we are looking into we would like to read a big matrix from
HDFS, and for every element that comes in a stream of vectors do on
multiplication with the matrix. The matrix should fit in the memory of one
machine.

We can read in the matrix using a RichMapFunction, but that would mean
that a copy of the matrix is made for each Task Slot AFAIK, if the
RichMapFunction is instantiated once per Task Slot.

So I'm wondering how should we try address this problem, is it possible to
have just one copy of the object in memory per TM?

As a follow-up if we have more than one TM per node, is it possible to
share memory between them? My guess is that we have to look at some
external store for that.

Cheers,
Theo


Re: Using ML lib SVM with Java

2016-05-09 Thread Theodore Vasiloudis
Hello Malte,

As Simone said there is no Java support currently for FlinkML unfortunately.

Regards,
Theodore

On Mon, May 9, 2016 at 3:05 PM, Simone Robutti  wrote:

> To my knowledge FlinkML does not support an unified API and most things
> must be used exclusively with Scala Datasets.
>
> 2016-05-09 13:31 GMT+02:00 Malte Schwarzer :
>
>> Hi folks,
>>
>> I tried to get the FlinkML SVM running - but it didn't really work. The
>> SVM.fit() method requires a DataSet parameter but there is no
>> such class/interface in Flink Java. Or am I mixing something up with
>> Scala? Also, I couldn't find a Flink ML example for Java (there is only
>> Scala).
>>
>> Is there any guide how to use the Flink ML Lib with Java? Or is Java
>> currently not yet available for the ML lib?
>>
>>
>> Best regards
>> Malte
>>
>>
>


Re: FYI: Updated Slides Section

2016-04-05 Thread Theodore Vasiloudis
Hello all,

you can find my slides on Large-Scale Machine Learning with FlinkML here
(from SICS Data Science day and FOSDEM 2016):
http://www.slideshare.net/TheodorosVasiloudis/flinkml-large-scale-machine-learning-with-apache-flink

Best,
Theodore

On Mon, Apr 4, 2016 at 3:19 PM, Rubén Casado 
wrote:

> Dear community,
>
> Just in case it is useful, please find below the links to the slides from
> the 1st Flink Madrid Meetup talks given by Fabien Hueske [1] and myself [2]
> (in spanish).
>
> Best
>
> [1]
> http://www.slideshare.net/fhueske/data-stream-processing-with-apache-flink
> [2]
> http://es.slideshare.net/Datadopter/dnde-encaja-apache-flink-en-el-ecosistema-actual-de-tecnologas-big-data
> __
>
> *Dr. Rubén Casado*
> Head of Big Data
> Treelogic
>  
> *ruben.casado.treelogic*
>
> +34 902 286 386 - +34 607 18 28 06
> Parque Tecnológico de Asturias · Parcela 30
> E33428 Llanera · Asturias [Spain]
> www.treelogic.com
> __
>
>
> - Mensaje original -
> De: "Ufuk Celebi" 
> Para: user@flink.apache.org
> CC: d...@flink.apache.org
> Enviados: Lunes, 4 de Abril 2016 11:33:59 GMT +01:00 Amsterdam / Berlín /
> Berna / Roma / Estocolmo / Viena
> Asunto: FYI: Updated Slides Section
>
> Dear Flink community,
>
> I have updated the Material section on the Flink project page and
> moved the slides section to a separate page.
>
> You can find links to slides and talks here now:
> http://flink.apache.org/slides.html
>
> I've added slides for talks from this year by Till Rohrmann, Vasia
> Kalavri, Robert Metzger, Jamie Girer and Kostas Tzoumas. If you think
> that something is missing, feel free to ping in this thread.
>
> – Ufuk
>


Re: Unexpected out of bounds error in UnilateralSortMerger

2016-01-21 Thread Theodore Vasiloudis
> org.apache.flink.ml.math.Vector
> at
> org.apache.flink.ml.classification.SVM$$anon$25$$anon$11$$anon$1.createInstance(SVM.scala:353)
> at
> org.apache.flink.ml.classification.SVM$$anon$25$$anon$11$$anon$1.createInstance(SVM.scala:353)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:114)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:111)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:104)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28)
> at
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>

On Thu, Jan 21, 2016 at 10:51 AM, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:

> This is the stack trace from running with the patched branch:
>
>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Job execution failed.
>> at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>> at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>> at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>> at
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>> at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
>> at
>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:591)
>> at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
>> at fosdem.SVMClassification$.main(SVMClassification.scala:114)
>> at fosdem.SVMClassification.main(SVMClassification.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:796)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:323)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1160)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>> at
>

Re: Unexpected out of bounds error in UnilateralSortMerger

2016-01-21 Thread Theodore Vasiloudis
 at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
> at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ArrayIndexOutOfBoundsException
> at
> org.apache.flink.core.memory.HeapMemorySegment.put(HeapMemorySegment.java:128)
> at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:195)
> at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
> at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:266)
> at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251)
> at com.esotericsoftware.kryo.io.Output.writeInts(Output.java:669)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:63)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:52)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
> ... 9 more
>

On Wed, Jan 20, 2016 at 9:45 PM, Stephan Ewen <se...@apache.org> wrote:

> Can you again post the stack trace? With the patched branch, the reference
> mapper should not be used any more (which is where the original exception
> occurred).
>
> On Wed, Jan 20, 2016 at 7:38 PM, Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com> wrote:
>
>> Alright I will try to do that.
>>
>> I've tried running the job with a CSV file as input, and using
>> DenseVectors to represent the features, still the same IndexOutOfBounds
>> error.
>>
>> On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> You could change the version of Stephan’s branch via mvn versions:set
>>> -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now
>>> after you install the Flink binaries you can reference them in your project
>>> by setting the version of your Flink dependencies to
>>> MyCustomBuildVersion. That way, you are sure that the right
>>> dependencies are used.
>>>
>>> Alternatively, you could compile an example program with example input
>>> data which can reproduce the problem. Then I could also take a look at it.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <
>>> theodoros.vasilou...@gmail.com> wrote:
>>>
>>>> OK here's what I tried:
>>>>
>>>> * Build Flink (mvn clean install) from the branch you linked (kryo)
>>>> * Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version,
>>>> added local maven repo to resolvers so that it picks up the previously
>>>> installed version (I hope)
>>>> * Launch local cluster from newly built Flink, try to run job
>>>>
>>>> Still getting the same error.
>>>>
>>>> Is there a way to ensure that SBT is picking up the local version of
>>>> Flink to build the uber-jar?
>>>> Does it matter in this case, or is it enough that I'm sure the launched
>>>> Flink instance comes from the branch you linked?
>>>>
>>>>
>>>> On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> The bug looks to be in the serialization via Kryo while spilling
>>>>

Re: Unexpected out of bounds error in UnilateralSortMerger

2016-01-20 Thread Theodore Vasiloudis
Alright I will try to do that.

I've tried running the job with a CSV file as input, and using DenseVectors
to represent the features, still the same IndexOutOfBounds error.

On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> You could change the version of Stephan’s branch via mvn versions:set
> -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now after
> you install the Flink binaries you can reference them in your project by
> setting the version of your Flink dependencies to MyCustomBuildVersion.
> That way, you are sure that the right dependencies are used.
>
> Alternatively, you could compile an example program with example input
> data which can reproduce the problem. Then I could also take a look at it.
>
> Cheers,
> Till
> ​
>
> On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com> wrote:
>
>> OK here's what I tried:
>>
>> * Build Flink (mvn clean install) from the branch you linked (kryo)
>> * Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version,
>> added local maven repo to resolvers so that it picks up the previously
>> installed version (I hope)
>> * Launch local cluster from newly built Flink, try to run job
>>
>> Still getting the same error.
>>
>> Is there a way to ensure that SBT is picking up the local version of
>> Flink to build the uber-jar?
>> Does it matter in this case, or is it enough that I'm sure the launched
>> Flink instance comes from the branch you linked?
>>
>>
>> On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> The bug looks to be in the serialization via Kryo while spilling
>>> windows. Note that Kryo is here used as a fallback serializer, since the
>>> SparseVector is not transparent type to Flink.
>>>
>>> I think there are two possible reasons:
>>>   1) Kryo, or our Kryo setup has an issue here
>>>   2) Kryo is inconsistently configured. There are multiple Kryo
>>> instances used across the serializers in the sorter. There may be a bug
>>> that they are not initialized in sync.
>>>
>>>
>>> To check this, can you build Flink with this pull request (
>>> https://github.com/apache/flink/pull/1528) or from this branch (
>>> https://github.com/StephanEwen/incubator-flink kryo) and see if that
>>> fixes it?
>>>
>>>
>>> Thanks,
>>> Stephan
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <
>>> theodoros.vasilou...@gmail.com> wrote:
>>>
>>>> I haven't been able to reproduce this with other datasets. Taking a
>>>> smaller sample from the large dataset I'm using (link to data
>>>> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>)
>>>> causes the same problem however.
>>>>
>>>> I'm wondering if the implementation of readLibSVM is what's wrong here.
>>>> I've tried the new version commited recently by Chiwan, but I still get the
>>>> same error.
>>>>
>>>> I'll see if I can spot a bug in readLibSVM.
>>>>
>>>> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <
>>>> theodoros.vasilou...@gmail.com> wrote:
>>>>
>>>>> It's on 0.10.
>>>>>
>>>>> I've tried explicitly registering SparseVector (which is done anyway
>>>>> by registerFlinkMLTypes
>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
>>>>> which is called when the SVM predict or evaluate functions are called
>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
>>>>> in my job but I still get the same. I will try a couple different datasets
>>>>> and try to see if it's the number of features that is causing this or
>>>>> something else.
>>>>>
>>>>> So far it works fine for a dataset with 8 features, but the large one
>>>>> has 2000 and I get the above error there. I will try large datasets with a
>>>>> few features and small datasets with many features as well.
>>>>>
>>>>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org

Re: Unexpected out of bounds error in UnilateralSortMerger

2016-01-20 Thread Theodore Vasiloudis
It's on 0.10.

I've tried explicitly registering SparseVector (which is done anyway by
registerFlinkMLTypes
<https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
which is called when the SVM predict or evaluate functions are called
<https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
in my job but I still get the same. I will try a couple different datasets
and try to see if it's the number of features that is causing this or
something else.

So far it works fine for a dataset with 8 features, but the large one has
2000 and I get the above error there. I will try large datasets with a few
features and small datasets with many features as well.

On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>
> It is probably an incorrectly configured Kryo instance (not a problem of
> the sorter).
> What is strange is that it occurs in the "MapReferenceResolver" - there
> should be no reference resolution during serialization / deserialization.
>
> Can you try what happens when you explicitly register the type
> SparseVector at the ExecutionEnvironment?
>
> Stephan
>
>
> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com> wrote:
>
>> Hello all,
>>
>> I'm trying to run a job using FlinkML and I'm confused about the source
>> of an error.
>>
>> The job reads a libSVM formatted file and trains an SVM classifier on it.
>>
>> I've tried this with small datasets and everything works out fine.
>>
>> When trying to run the same job on a large dataset (~11GB uncompressed)
>> however, I get the following error:
>>
>>
>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>> 'SortMerger spilling thread' terminated due to an exception:
>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>> Serialization trace:
>>> indices (org.apache.flink.ml.math.SparseVector)
>>> at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>> at
>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>> at
>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>> at
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>> at
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
>>> 14, Size: 2
>>> Serialization trace:
>>> indices (org.apache.flink.ml.math.SparseVector)
>>> at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>> Serialization trace:
>>> indices (org.apache.flink.ml.math.SparseVector)
>>> at
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>> at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>> at
>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>> at
>>> org.apache.flink.ru

Re: Scala Breeze Dependencies not resolving when adding flink-ml on build.sbt

2015-10-28 Thread Theodore Vasiloudis
This sounds similar to this problem:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-ML-as-Dependency-td1582.html

The reason is (quoting Till, replace gradle with sbt here):

the flink-ml pom contains as a dependency an artifact with artifactId
> breeze_${scala.binary.version}. The variable scala.binary.version is
> defined in the parent pom and not substituted when flink-ml is installed.
> Therefore gradle tries to find a dependency with the name
> breeze_${scala.binary.version}


Anwar's solution should work, I just tested it on a basic Flink build, but
I haven't tried running anything yet.
The resolution error does go away though. So your sbt should include
something like:

libraryDependencies ++= Seq(
  "org.apache.flink" % "flink-scala" % flinkVersion,
  "org.apache.flink" % "flink-clients" % flinkVersion,
  ("org.apache.flink" % "flink-ml" % flinkVersion)
.exclude("org.scalanlp", "breeze_${scala.binary.version}"),
  "org.scalanlp" %% "breeze" % "0.11.2")



On Wed, Oct 28, 2015 at 10:34 AM, Frederick Ayala 
wrote:

> Hi,
>
> I am getting an error when adding flink-ml to the libraryDependencies on
> my build.sbt file:
>
> [error] (*:update) sbt.ResolveException: unresolved dependency:
> org.scalanlp#breeze_${scala.binary.version};0.11.2: not found
>
> My libraryDependencies is:
>
> libraryDependencies ++= Seq("org.apache.flink" % "flink-scala" % "0.9.1",
> "org.apache.flink" % "flink-streaming-scala" % "0.9.1", "org.apache.flink"
> % "flink-clients" % "0.9.1",
> "org.apache.flink" % "flink-ml" % "0.9.1")
>
> I am using scalaVersion := "2.10.6"
>
> If I remove flink-ml all the other dependencies are resolved.
>
> Could you help me to figure out a solution for this?
>
> Thanks!
>
> Frederick Ayala
>


Re: compile flink-gelly-scala using sbt

2015-10-28 Thread Theodore Vasiloudis
Your build.sbt seems correct.
It might be that you are missing some basic imports.

In your code have you imported

import org.apache.flink.api.scala._

?


On Tue, Oct 27, 2015 at 8:45 PM, Vasiliki Kalavri  wrote:

> Hi Do,
>
> I don't really have experience with sbt, but one thing that might cause
> problems is that your dependencies point to Flink 0.9.1 and gelly-scala
> wasn't part of that release. You can either try to use the 0.10-SNAPSHOT or
> wait a few days for the 0.10 release.
>
> Cheers,
> -Vasia.
>
> On 27 October 2015 at 18:05, Le Quoc Do  wrote:
>
>> Hi,
>>
>> I try to compile flink-gelly-scala using sbt. However, I got the
>> following error:
>>
>> *error]
>> /home/ubuntu/git/flink-learning/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala:42:
>> value getJavaEnv is not a member of
>> org.apache.flink.api.scala.ExecutionEnvironment*
>> *[error] wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet,
>> edges.javaSet, env.getJavaEnv))*
>> *[error]
>>^*
>> *[error]
>> /home/ubuntu/git/flink-learning/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala:51:
>> value getJavaEnv is not a member of
>> org.apache.flink.api.scala.ExecutionEnvironment*
>> *[error] wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet,
>> env.getJavaEnv))*
>>
>> The content of built.sbt file:
>>
>> *name := "flink-graph-metrics"*
>>
>> *version := "1.0"*
>>
>> *scalaVersion := "2.11.6"*
>>
>> *libraryDependencies ++= Seq("org.apache.flink" % "flink-scala" %
>> "0.9.1", "org.apache.flink" % "flink-clients" % "0.9.1", "org.apache.flink"
>> % "flink-gelly"  % "0.9.1")*
>>
>> *fork in run := true*
>>
>>
>> Do you know how to fix this problem?
>>
>> Thanks,
>> Do
>>
>> ==
>> Le Quoc Do
>> Dresden University of Technology
>> Faculty of Computer Science
>> Institute for System Architecture
>> Systems Engineering Group
>> 01062 Dresden
>> E-Mail: d...@se.inf.tu-dresden.de
>>
>
>


Re: compile flink-gelly-scala using sbt

2015-10-28 Thread Theodore Vasiloudis
Could you share a minimal code example where this happens?

On Wed, Oct 28, 2015 at 4:22 PM, Le Quoc Do <lequo...@gmail.com> wrote:

> Hi Theodore and Vasia,
>
> Thanks for your reply.
>
> I can compile my code by add dependency jars manually.
>
> Yes, in my code, I already import Flink-scala (import 
> org.apache.flink.api.scala._).
> However when I run my code,
> I get the following error:
>
> *ava.lang.NoSuchMethodError:
> org.apache.flink.api.scala.ExecutionEnvironment.getJavaEnv()Lorg/apache/flink/api/java/ExecutionEnvironment;
> at org.apache.flink.graph.scala.Graph$.fromDataSet(Graph.scala:53)*
>
> any suggestions?
>
> Thanks,
> Do
>
> ==
> Le Quoc Do
> Dresden University of Technology
> Faculty of Computer Science
> Institute for System Architecture
> Systems Engineering Group
> 01062 Dresden
> E-Mail: d...@se.inf.tu-dresden.de
>
> On Wed, Oct 28, 2015 at 3:50 PM, Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com> wrote:
>
>> Your build.sbt seems correct.
>> It might be that you are missing some basic imports.
>>
>> In your code have you imported
>>
>> import org.apache.flink.api.scala._
>>
>> ?
>>
>>
>> On Tue, Oct 27, 2015 at 8:45 PM, Vasiliki Kalavri <
>> vasilikikala...@gmail.com> wrote:
>>
>>> Hi Do,
>>>
>>> I don't really have experience with sbt, but one thing that might cause
>>> problems is that your dependencies point to Flink 0.9.1 and gelly-scala
>>> wasn't part of that release. You can either try to use the 0.10-SNAPSHOT or
>>> wait a few days for the 0.10 release.
>>>
>>> Cheers,
>>> -Vasia.
>>>
>>> On 27 October 2015 at 18:05, Le Quoc Do <lequo...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I try to compile flink-gelly-scala using sbt. However, I got the
>>>> following error:
>>>>
>>>> *error]
>>>> /home/ubuntu/git/flink-learning/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala:42:
>>>> value getJavaEnv is not a member of
>>>> org.apache.flink.api.scala.ExecutionEnvironment*
>>>> *[error] wrapGraph(jg.Graph.fromDataSet[K, VV,
>>>> EV](vertices.javaSet, edges.javaSet, env.getJavaEnv))*
>>>> *[error]
>>>>  ^*
>>>> *[error]
>>>> /home/ubuntu/git/flink-learning/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala:51:
>>>> value getJavaEnv is not a member of
>>>> org.apache.flink.api.scala.ExecutionEnvironment*
>>>> *[error] wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet,
>>>> env.getJavaEnv))*
>>>>
>>>> The content of built.sbt file:
>>>>
>>>> *name := "flink-graph-metrics"*
>>>>
>>>> *version := "1.0"*
>>>>
>>>> *scalaVersion := "2.11.6"*
>>>>
>>>> *libraryDependencies ++= Seq("org.apache.flink" % "flink-scala" %
>>>> "0.9.1", "org.apache.flink" % "flink-clients" % "0.9.1", "org.apache.flink"
>>>> % "flink-gelly"  % "0.9.1")*
>>>>
>>>> *fork in run := true*
>>>>
>>>>
>>>> Do you know how to fix this problem?
>>>>
>>>> Thanks,
>>>> Do
>>>>
>>>> ==
>>>> Le Quoc Do
>>>> Dresden University of Technology
>>>> Faculty of Computer Science
>>>> Institute for System Architecture
>>>> Systems Engineering Group
>>>> 01062 Dresden
>>>> E-Mail: d...@se.inf.tu-dresden.de
>>>>
>>>
>>>
>>
>


Re: Extracting weights from linear regression model

2015-10-08 Thread Theodore Vasiloudis
Hello Trevor,

I assume you using the MultipleLinearRegression class in a manner similar
to our examples, i.e.:

// Create multiple linear regression learnerval mlr =
MultipleLinearRegression().setIterations(10).setStepsize(0.5).setConvergenceThreshold(0.001)
// Obtain training and testing data setval trainingDS:
DataSet[LabeledVector] = ...val testingDS: DataSet[Vector] = ...
// Fit the linear model to the provided datamlr.fit(trainingDS)

After you've run the fit method, you can get the weights by calling:

val WeightVector(weights, intercept) = mlr.weightsOption.get.collect().head

weights should be a flink.ml.math.DenseVector object and the intercept a Double.

Regards,

Theodore




On Wed, Oct 7, 2015 at 10:52 PM, Trevor Grant 
wrote:

> Sorry if this is a novice question, but I can't figure out how to extract
> the weights vector from a multiple linear regression model.  I can
> fit/predict, but I can't get the weight vector.
>
> Any advice would be appreciated (even snide go read the docs comments, so
> long as they point me to applicable docs, because I've been struggling with
> this all day).
>
> Thanks!
> tg
>
>
> Trevor Grant
> Data Scientist
> https://github.com/rawkintrevo
> http://stackexchange.com/users/3002022/rawkintrevo
>
> *"Fortunate is he, who is able to know the causes of things."  -Virgil*
>
>