[jira] [Created] (FLINK-4856) Add MapState for keyed streams

2016-10-18 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-4856:
---

 Summary: Add MapState for keyed streams
 Key: FLINK-4856
 URL: https://issues.apache.org/jira/browse/FLINK-4856
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Reporter: Xiaogang Shi


Many states in keyed streams are organized as key-value pairs. Currently, these 
states are implemented by storing the entire map into a ValueState or a 
ListState. The implementation however is very costly because all entries have 
to be serialized/deserialized when updating a single entry. To improve the 
efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4855) Add partitionedKeyBy to DataStream

2016-10-18 Thread Xiaowei Jiang (JIRA)
Xiaowei Jiang created FLINK-4855:


 Summary: Add partitionedKeyBy to DataStream
 Key: FLINK-4855
 URL: https://issues.apache.org/jira/browse/FLINK-4855
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Xiaowei Jiang
Assignee: MaGuowei


After we do any interesting operations (e.g. reduce) on KeyedStream, the result 
becomes DataStream. In a lot of cases, the output still has the same or 
compatible keys with the KeyedStream (logically). But to do further operations 
on these keys, we are forced to use keyby again. This works semantically, but 
is costly in two aspects. First, it destroys the possibility of chaining, which 
is one of the most important optimization technique. Second, keyby will greatly 
expand the connected components of tasks, which has implications in failover 
optimization.

To address this shortcoming, we propose a new operator partitionedKeyBy.

DataStream {
public  KeyedStream partitionedKeyBy(KeySelector key)
}

Semantically, DataStream.partitionedKeyBy(key) is equivalent to 
DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid as 
an extra field. This guarantees that records from different tasks will never 
produce the same keys.

With this, it's possible to do

ds.keyBy(key1).reduce(func1)
.partitionedKeyBy(key1).reduce(func2)
.partitionedKeyBy(key2).reduce(func3);

Most importantly, in certain cases, we will be able to chains these into a 
single vertex.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4854) Efficient Batch Operator in Streaming

2016-10-18 Thread Xiaowei Jiang (JIRA)
Xiaowei Jiang created FLINK-4854:


 Summary: Efficient Batch Operator in Streaming
 Key: FLINK-4854
 URL: https://issues.apache.org/jira/browse/FLINK-4854
 Project: Flink
  Issue Type: Improvement
Reporter: Xiaowei Jiang
Assignee: MaGuowei


Very often, it's more efficient to process a batch of records at once instead 
of processing them one by one. We can use window to achieve this functionality. 
However, window will store all records in states, which can be costly. It's 
desirable to have an efficient implementation of batch operator. The batch 
operator works per task and behave similarly to aligned windows. Here is an 
example of how the interface looks like to a user.

interface BatchFunction {
// add the record to the buffer
// returns if the batch is ready to be flushed
boolean addRecord(T record);

// process all pending records in the buffer
void flush(Collector collector) ;
}

DataStream ds = ...
BatchFunction func = ...
ds.batch(func);

The operator calls addRecord for each record. The batch function saves the 
record in its own buffer. The addRecord returns if the pending buffer should be 
flushed. In that case, the operator invokes flush.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4853) Clean up JobManager registration at the ResourceManager

2016-10-18 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4853:


 Summary: Clean up JobManager registration at the ResourceManager
 Key: FLINK-4853
 URL: https://issues.apache.org/jira/browse/FLINK-4853
 Project: Flink
  Issue Type: Sub-task
  Components: ResourceManager
Reporter: Till Rohrmann
Assignee: Till Rohrmann


The current {{JobManager}} registration at the {{ResourceManager}} blocks 
threads in the {{RpcService.execute}} pool. This is not ideal and can be 
avoided by not waiting on a {{Future}} in this call.

I propose to encapsulate the leader id retrieval operation in a distinct 
service so that it can be separated from the {{ResourceManager}}. This will 
reduce the complexity of the {{ResourceManager}} and make the individual 
components easier to test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[DISCUSS] Defining the Semantics of StreamingSQL

2016-10-18 Thread Fabian Hueske
Hi everybody,

at Flink Forward we had a BOF session about StreamSQL.
After the conference, some folks and I sat down and drafted a proposal for
Flink's StreamSQL semantics.

-->
https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU

The proposal includes:
- A definition for dynamic tables
- How to convert streams into dynamic tables
- How to convert dynamic tables into streams
- How to query dynamic tables
- Which types of queries to support
- How to specify early results, update rate, and late refinements
- How to control the size of the query state
- How to write query results to external systems (Kafka, files, Cassandra,
HBase, ...)
- How to make a query result accessible via Flink's queryable kv-state
- A few examples how StreamSQL queries can be defined

The proposal does not include a workplan or task breakdown yet.
This is something I'll work on in the next days.

Please share your thoughts and opinions about the proposal on the mailing
list.

Thanks,
Fabian


[jira] [Created] (FLINK-4852) ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime

2016-10-18 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-4852:
-

 Summary: ClassCastException when assigning Watermarks with 
TimeCharacteristic.ProcessingTime
 Key: FLINK-4852
 URL: https://issues.apache.org/jira/browse/FLINK-4852
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.2.0, 1.1.3
Reporter: Maximilian Michels
 Fix For: 1.2.0, 1.1.4



As per FLINK-3688 and FLINK-2936 this should already been resolved. Still, when 
emitting Watermarks and using processing time, you get the following 
ClassCastException:

{noformat}

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Could not forward element to next 
operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at 
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
at 
org.apache.flink.streaming.api.functions.source.StatefulSequenceSource.run(StatefulSequenceSource.java:68)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:343)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement(TimestampsAndPunctuatedWatermarksOperator.java:58)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
... 11 more
Caused by: java.lang.RuntimeException: 
org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340)
at 
org.apache.flink.streaming.api.operators.StreamMap.processWatermark(StreamMap.java:44)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:340)
... 14 more
Caused by: java.lang.ClassCastException: 
org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)

[jira] [Created] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager

2016-10-18 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4851:


 Summary: Add FatalErrorHandler and MetricRegistry to 
ResourceManager
 Key: FLINK-4851
 URL: https://issues.apache.org/jira/browse/FLINK-4851
 Project: Flink
  Issue Type: Sub-task
  Components: ResourceManager
Reporter: Till Rohrmann
Assignee: Till Rohrmann


The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. In 
order to harmonize the fatal error handling across all components, we should 
introduce a {{FatalErrorHandler}}, which handles fatal errors. Additionally, we 
should also give a {{MetricRegistry}} to the {{ResourceManager}} so that it can 
report metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector

2016-10-18 Thread Thomas FOURNIER (JIRA)
Thomas FOURNIER created FLINK-4850:
--

 Summary: FlinkML - SVM predict Operation for Vector and not 
LaveledVector
 Key: FLINK-4850
 URL: https://issues.apache.org/jira/browse/FLINK-4850
 Project: Flink
  Issue Type: Bug
Reporter: Thomas FOURNIER


It seems that SVM has predict operation for Vector and not LabeledVector.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink

2016-10-18 Thread Aljoscha Krettek
Hi,
yes, I think it's fine if we keep it in the same package as the Evictor.
StreamRecord is more of an internal class that should not really be user
facing, that's my motivation for replacing it.

Cheers,
Aljoscha

On Mon, 17 Oct 2016 at 19:23 Vishnu Viswanath 
wrote:

> Hi Aljoscha,
>
> Thanks for the response.
>
> I did think about creating a new class similar to TimestampedValue as you
> suggested, but that class looked almost same as the current StreamRecord
> class. (Both have a timestamp field and a value field).
>
> Do you think it is fine to have another class for holding (timestamp,value)
> tuple?
>
> Regards,
> Vishnu
>
> On Mon, Oct 17, 2016 at 4:19 AM, Aljoscha Krettek 
> wrote:
>
> > Hi Vishnu,
> > what you suggested is spot on! Please go forward with it like this.
> >
> > One small suggestion would be to change Tuple2 to something like
> > TimestampedValue to not rely on tuples because they can be confusing
> for
> > people who write Scala code because they are not Scala tuples. That's not
> > strictly necessary, though, you can spin it however you like.
> >
> > Cheers,
> > Aljoscha
> >
> > On Fri, 7 Oct 2016 at 18:46 Vishnu Viswanath <
> vishnu.viswanat...@gmail.com
> > >
> > wrote:
> >
> > > Hi Radu,
> > >
> > > Yes we can remove elements randomly using iterator.remove()
> > >
> > > Regards,
> > > Vishnu
> > >
> > > On Fri, Oct 7, 2016 at 2:57 AM, Radu Tudoran 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I must apologies that I missed some of the email exchanges on this
> > thread
> > > > and thus my remark/question might have been already settled.
> > > >
> > > > Does this interface you propose enable to remove also elements out of
> > > > order e.g., assuming I have elements 1,2,3,4,5 in the window buffer
> to
> > be
> > > > able to evict 2 and 4?
> > > > We discussed about this some email exchanges ago but as I said I am
> not
> > > > sure if this functionality is captured in this interface. Basically,
> > will
> > > > the typical remove() method from Iterators be available?
> > > >
> > > > Best regards,
> > > >
> > > >
> > > > -Original Message-
> > > > From: Vishnu Viswanath [mailto:vishnu.viswanat...@gmail.com]
> > > > Sent: Friday, October 07, 2016 8:29 AM
> > > > To: Dev
> > > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink
> > > >
> > > > Hi Aljoscha,
> > > >
> > > > To pass the time information to Evictor at the same to not expose the
> > > > StreamRecord, I suppose we can change the signature of evictBefore
> and
> > > > evictAfter to take Iterable> instead
> > > > Iterable
> > > >
> > > > void evictBefore(Iterable> elements, int size, W
> > window,
> > > > EvictorContext evictorContext);
> > > >
> > > > The fire() method of EvictingWindowOperator can transform the
> > > > Iterable to FluentIterable> and
> pass
> > > it
> > > > on to the evictor(where f0 will be the timestamp and f1 will the
> > value).
> > > > That way the TimeEvictor will work for EventTime or IngestionTime as
> > long
> > > > as timestamp is set in the StreamRecord. In case timestamp is not
> set,
> > > > TimeEvictor can capture this by checking the Tuple2.f0 (which will be
> > > > Long.MIN_VALUE) and ignore the eviction.
> > > >
> > > > If you think this is fine, I will make the changes and also edit the
> > > FLIP.
> > > >
> > > > Regards,
> > > > Vishnu
> > > >
> > > >
> > > > On Wed, Oct 5, 2016 at 9:49 PM, Vishnu Viswanath <
> > > > vishnu.viswanat...@gmail.com> wrote:
> > > >
> > > > > Thank you Aljoscha,
> > > > >
> > > > > Yes, I agree we don't need ProcessingTimeEvcitor.
> > > > > I will change the current TimeEvictors to use EventTimeEvictor as
> > > > > suggested.
> > > > >
> > > > > Also, figure out a way to pass timestamp to Evictor interface so
> that
> > > we
> > > > > can avoid exposing StreamRecrods.
> > > > >
> > > > > Regards,
> > > > > Vishnu
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Sep 20, 2016 at 4:33 AM, Aljoscha Krettek <
> > aljos...@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > >> Hi,
> > > > >> now you again see what I mentioned a while back: eviction based on
> > > > >> processing time is not really well defined. I think we can
> > completely
> > > > get
> > > > >> rid of "processing time eviction" because it can be replaced by
> > > > something
> > > > >> like this:
> > > > >>
> > > > >> DataStream input = ...
> > > > >> DataStream withTimestamps =
> input.assignTimestampsAndWatermarks(new
> > > > >> IngestionTimeExtractor()) // this will assign the current
> processing
> > > > time
> > > > >> as timestamp
> > > > >> withTimestamps
> > > > >>   .keyBy(...)
> > > > >>   .window(...)
> > > > >>   .evictor(new EventTimeEvictor())
> > > > >>   .apply(...)
> > > > >>
> > > > >> With this, we would just have to find a good way of passing the
> > > > timestamps
> > > > >> in the 

Re: Flink Metrics

2016-10-18 Thread Aljoscha Krettek
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html
Or this:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/metrics.html
if
you prefer Flink 1.1

On Mon, 17 Oct 2016 at 19:16 amir bahmanyari  wrote:

> Hi colleagues,
> Is there a link that described Flink Matrices & provides example on how to
> utilize it pls?
> I really appreciate it...
> Cheers
>
> --
> *From:* Till Rohrmann 
> *To:* u...@flink.apache.org
> *Cc:* dev@flink.apache.org
> *Sent:* Monday, October 17, 2016 12:52 AM
> *Subject:* Re: Flink Metrics
>
> Hi Govind,
>
> I think the DropwizardMeterWrapper implementation is just a reference
> implementation where it was decided to report the minute rate. You can
> define your own meter class which allows to configure the rate interval
> accordingly.
>
> Concerning Timers, I think nobody requested this metric so far. If you
> want, then you can open a JIRA issue and contribute it. The community would
> really appreciate that.
>
> Cheers,
> Till
> ​
>
> On Mon, Oct 17, 2016 at 5:26 AM, Govindarajan Srinivasaraghavan <
> govindragh...@gmail.com> wrote:
>
> > Hi,
> >
> > I am currently using flink 1.2 snapshot and instrumenting my pipeline
> with
> > flink metrics. One small suggestion I have is currently the Meter
> interface
> > only supports getRate() which is always the one minute rate.
> >
> > It would great if all the rates (1 min, 5 min & 15 min) are exposed to
> get
> > a better picture in terms of performance.
> >
> > Also is there any reason why timers are not part of flink metrics core?
> >
> > Regards,
> > Govind
> >
>
>
>