[jira] [Created] (FLINK-4856) Add MapState for keyed streams
Xiaogang Shi created FLINK-4856: --- Summary: Add MapState for keyed streams Key: FLINK-4856 URL: https://issues.apache.org/jira/browse/FLINK-4856 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing Reporter: Xiaogang Shi Many states in keyed streams are organized as key-value pairs. Currently, these states are implemented by storing the entire map into a ValueState or a ListState. The implementation however is very costly because all entries have to be serialized/deserialized when updating a single entry. To improve the efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4855) Add partitionedKeyBy to DataStream
Xiaowei Jiang created FLINK-4855: Summary: Add partitionedKeyBy to DataStream Key: FLINK-4855 URL: https://issues.apache.org/jira/browse/FLINK-4855 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Xiaowei Jiang Assignee: MaGuowei After we do any interesting operations (e.g. reduce) on KeyedStream, the result becomes DataStream. In a lot of cases, the output still has the same or compatible keys with the KeyedStream (logically). But to do further operations on these keys, we are forced to use keyby again. This works semantically, but is costly in two aspects. First, it destroys the possibility of chaining, which is one of the most important optimization technique. Second, keyby will greatly expand the connected components of tasks, which has implications in failover optimization. To address this shortcoming, we propose a new operator partitionedKeyBy. DataStream { public KeyedStreampartitionedKeyBy(KeySelector key) } Semantically, DataStream.partitionedKeyBy(key) is equivalent to DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid as an extra field. This guarantees that records from different tasks will never produce the same keys. With this, it's possible to do ds.keyBy(key1).reduce(func1) .partitionedKeyBy(key1).reduce(func2) .partitionedKeyBy(key2).reduce(func3); Most importantly, in certain cases, we will be able to chains these into a single vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4854) Efficient Batch Operator in Streaming
Xiaowei Jiang created FLINK-4854: Summary: Efficient Batch Operator in Streaming Key: FLINK-4854 URL: https://issues.apache.org/jira/browse/FLINK-4854 Project: Flink Issue Type: Improvement Reporter: Xiaowei Jiang Assignee: MaGuowei Very often, it's more efficient to process a batch of records at once instead of processing them one by one. We can use window to achieve this functionality. However, window will store all records in states, which can be costly. It's desirable to have an efficient implementation of batch operator. The batch operator works per task and behave similarly to aligned windows. Here is an example of how the interface looks like to a user. interface BatchFunction { // add the record to the buffer // returns if the batch is ready to be flushed boolean addRecord(T record); // process all pending records in the buffer void flush(Collector collector) ; } DataStream ds = ... BatchFunction func = ... ds.batch(func); The operator calls addRecord for each record. The batch function saves the record in its own buffer. The addRecord returns if the pending buffer should be flushed. In that case, the operator invokes flush. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4853) Clean up JobManager registration at the ResourceManager
Till Rohrmann created FLINK-4853: Summary: Clean up JobManager registration at the ResourceManager Key: FLINK-4853 URL: https://issues.apache.org/jira/browse/FLINK-4853 Project: Flink Issue Type: Sub-task Components: ResourceManager Reporter: Till Rohrmann Assignee: Till Rohrmann The current {{JobManager}} registration at the {{ResourceManager}} blocks threads in the {{RpcService.execute}} pool. This is not ideal and can be avoided by not waiting on a {{Future}} in this call. I propose to encapsulate the leader id retrieval operation in a distinct service so that it can be separated from the {{ResourceManager}}. This will reduce the complexity of the {{ResourceManager}} and make the individual components easier to test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[DISCUSS] Defining the Semantics of StreamingSQL
Hi everybody, at Flink Forward we had a BOF session about StreamSQL. After the conference, some folks and I sat down and drafted a proposal for Flink's StreamSQL semantics. --> https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU The proposal includes: - A definition for dynamic tables - How to convert streams into dynamic tables - How to convert dynamic tables into streams - How to query dynamic tables - Which types of queries to support - How to specify early results, update rate, and late refinements - How to control the size of the query state - How to write query results to external systems (Kafka, files, Cassandra, HBase, ...) - How to make a query result accessible via Flink's queryable kv-state - A few examples how StreamSQL queries can be defined The proposal does not include a workplan or task breakdown yet. This is something I'll work on in the next days. Please share your thoughts and opinions about the proposal on the mailing list. Thanks, Fabian
[jira] [Created] (FLINK-4852) ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime
Maximilian Michels created FLINK-4852: - Summary: ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime Key: FLINK-4852 URL: https://issues.apache.org/jira/browse/FLINK-4852 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.2.0, 1.1.3 Reporter: Maximilian Michels Fix For: 1.2.0, 1.1.4 As per FLINK-3688 and FLINK-2936 this should already been resolved. Still, when emitting Watermarks and using processing time, you get the following ClassCastException: {noformat} Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161) at org.apache.flink.streaming.api.functions.source.StatefulSequenceSource.run(StatefulSequenceSource.java:68) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:343) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340) at org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement(TimestampsAndPunctuatedWatermarksOperator.java:58) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) ... 11 more Caused by: java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340) at org.apache.flink.streaming.api.operators.StreamMap.processWatermark(StreamMap.java:44) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:340) ... 14 more Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
[jira] [Created] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager
Till Rohrmann created FLINK-4851: Summary: Add FatalErrorHandler and MetricRegistry to ResourceManager Key: FLINK-4851 URL: https://issues.apache.org/jira/browse/FLINK-4851 Project: Flink Issue Type: Sub-task Components: ResourceManager Reporter: Till Rohrmann Assignee: Till Rohrmann The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. In order to harmonize the fatal error handling across all components, we should introduce a {{FatalErrorHandler}}, which handles fatal errors. Additionally, we should also give a {{MetricRegistry}} to the {{ResourceManager}} so that it can report metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector
Thomas FOURNIER created FLINK-4850: -- Summary: FlinkML - SVM predict Operation for Vector and not LaveledVector Key: FLINK-4850 URL: https://issues.apache.org/jira/browse/FLINK-4850 Project: Flink Issue Type: Bug Reporter: Thomas FOURNIER It seems that SVM has predict operation for Vector and not LabeledVector. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink
Hi, yes, I think it's fine if we keep it in the same package as the Evictor. StreamRecord is more of an internal class that should not really be user facing, that's my motivation for replacing it. Cheers, Aljoscha On Mon, 17 Oct 2016 at 19:23 Vishnu Viswanathwrote: > Hi Aljoscha, > > Thanks for the response. > > I did think about creating a new class similar to TimestampedValue as you > suggested, but that class looked almost same as the current StreamRecord > class. (Both have a timestamp field and a value field). > > Do you think it is fine to have another class for holding (timestamp,value) > tuple? > > Regards, > Vishnu > > On Mon, Oct 17, 2016 at 4:19 AM, Aljoscha Krettek > wrote: > > > Hi Vishnu, > > what you suggested is spot on! Please go forward with it like this. > > > > One small suggestion would be to change Tuple2 to something like > > TimestampedValue to not rely on tuples because they can be confusing > for > > people who write Scala code because they are not Scala tuples. That's not > > strictly necessary, though, you can spin it however you like. > > > > Cheers, > > Aljoscha > > > > On Fri, 7 Oct 2016 at 18:46 Vishnu Viswanath < > vishnu.viswanat...@gmail.com > > > > > wrote: > > > > > Hi Radu, > > > > > > Yes we can remove elements randomly using iterator.remove() > > > > > > Regards, > > > Vishnu > > > > > > On Fri, Oct 7, 2016 at 2:57 AM, Radu Tudoran > > > wrote: > > > > > > > Hi, > > > > > > > > I must apologies that I missed some of the email exchanges on this > > thread > > > > and thus my remark/question might have been already settled. > > > > > > > > Does this interface you propose enable to remove also elements out of > > > > order e.g., assuming I have elements 1,2,3,4,5 in the window buffer > to > > be > > > > able to evict 2 and 4? > > > > We discussed about this some email exchanges ago but as I said I am > not > > > > sure if this functionality is captured in this interface. Basically, > > will > > > > the typical remove() method from Iterators be available? > > > > > > > > Best regards, > > > > > > > > > > > > -Original Message- > > > > From: Vishnu Viswanath [mailto:vishnu.viswanat...@gmail.com] > > > > Sent: Friday, October 07, 2016 8:29 AM > > > > To: Dev > > > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink > > > > > > > > Hi Aljoscha, > > > > > > > > To pass the time information to Evictor at the same to not expose the > > > > StreamRecord, I suppose we can change the signature of evictBefore > and > > > > evictAfter to take Iterable > instead > > > > Iterable > > > > > > > > void evictBefore(Iterable > elements, int size, W > > window, > > > > EvictorContext evictorContext); > > > > > > > > The fire() method of EvictingWindowOperator can transform the > > > > Iterable to FluentIterable > and > pass > > > it > > > > on to the evictor(where f0 will be the timestamp and f1 will the > > value). > > > > That way the TimeEvictor will work for EventTime or IngestionTime as > > long > > > > as timestamp is set in the StreamRecord. In case timestamp is not > set, > > > > TimeEvictor can capture this by checking the Tuple2.f0 (which will be > > > > Long.MIN_VALUE) and ignore the eviction. > > > > > > > > If you think this is fine, I will make the changes and also edit the > > > FLIP. > > > > > > > > Regards, > > > > Vishnu > > > > > > > > > > > > On Wed, Oct 5, 2016 at 9:49 PM, Vishnu Viswanath < > > > > vishnu.viswanat...@gmail.com> wrote: > > > > > > > > > Thank you Aljoscha, > > > > > > > > > > Yes, I agree we don't need ProcessingTimeEvcitor. > > > > > I will change the current TimeEvictors to use EventTimeEvictor as > > > > > suggested. > > > > > > > > > > Also, figure out a way to pass timestamp to Evictor interface so > that > > > we > > > > > can avoid exposing StreamRecrods. > > > > > > > > > > Regards, > > > > > Vishnu > > > > > > > > > > > > > > > > > > > > On Tue, Sep 20, 2016 at 4:33 AM, Aljoscha Krettek < > > aljos...@apache.org > > > > > > > > > wrote: > > > > > > > > > >> Hi, > > > > >> now you again see what I mentioned a while back: eviction based on > > > > >> processing time is not really well defined. I think we can > > completely > > > > get > > > > >> rid of "processing time eviction" because it can be replaced by > > > > something > > > > >> like this: > > > > >> > > > > >> DataStream input = ... > > > > >> DataStream withTimestamps = > input.assignTimestampsAndWatermarks(new > > > > >> IngestionTimeExtractor()) // this will assign the current > processing > > > > time > > > > >> as timestamp > > > > >> withTimestamps > > > > >> .keyBy(...) > > > > >> .window(...) > > > > >> .evictor(new EventTimeEvictor()) > > > > >> .apply(...) > > > > >> > > > > >> With this, we would just have to find a good way of passing the > > > > timestamps > > > > >> in the
Re: Flink Metrics
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html Or this: https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/metrics.html if you prefer Flink 1.1 On Mon, 17 Oct 2016 at 19:16 amir bahmanyariwrote: > Hi colleagues, > Is there a link that described Flink Matrices & provides example on how to > utilize it pls? > I really appreciate it... > Cheers > > -- > *From:* Till Rohrmann > *To:* u...@flink.apache.org > *Cc:* dev@flink.apache.org > *Sent:* Monday, October 17, 2016 12:52 AM > *Subject:* Re: Flink Metrics > > Hi Govind, > > I think the DropwizardMeterWrapper implementation is just a reference > implementation where it was decided to report the minute rate. You can > define your own meter class which allows to configure the rate interval > accordingly. > > Concerning Timers, I think nobody requested this metric so far. If you > want, then you can open a JIRA issue and contribute it. The community would > really appreciate that. > > Cheers, > Till > > > On Mon, Oct 17, 2016 at 5:26 AM, Govindarajan Srinivasaraghavan < > govindragh...@gmail.com> wrote: > > > Hi, > > > > I am currently using flink 1.2 snapshot and instrumenting my pipeline > with > > flink metrics. One small suggestion I have is currently the Meter > interface > > only supports getRate() which is always the one minute rate. > > > > It would great if all the rates (1 min, 5 min & 15 min) are exposed to > get > > a better picture in terms of performance. > > > > Also is there any reason why timers are not part of flink metrics core? > > > > Regards, > > Govind > > > > >