Re: Need the way to create custom metrics

2018-12-18 Thread Chirag Dewan
 I have a similar issue. I raised a JIRA :
 https://issues.apache.org/jira/browse/FLINK-11198
Thanks,
Chirag
   On Wednesday, 19 December, 2018, 11:35:02 AM IST, Fabian Hueske 
 wrote: 
 
 Hi,
AFAIK it is not possible to collect metrics for an AggregateFunction.You can 
open a feature request by creating a Jira issue.

Thanks, 
Fabian


Am Mo., 17. Dez. 2018 um 21:29 Uhr schrieb Gaurav Luthra 
:

Hi,
I need to know the way to implement custom metrics in my flink 
program.Currently, I know we can create custom metrics with the help of 
RuntimeContext. But in my aggregate() I do not have RuntimeContext. I am using 
window operator and applying aggregate() method on it. And I am passing 
AggregateFunction in aggregate() method.
So, Kindly guide me, how can I create custom metrics in my code?
Note: As we know we can not user RichAggregateFunction with aggregate() method 
Thanks & RegardsGaurav LuthraMob:- +91-9901945206
  

Re: Need the way to create custom metrics

2018-12-18 Thread Fabian Hueske
Hi,

AFAIK it is not possible to collect metrics for an AggregateFunction.
You can open a feature request by creating a Jira issue.

Thanks,
Fabian


Am Mo., 17. Dez. 2018 um 21:29 Uhr schrieb Gaurav Luthra <
gauravluthra6...@gmail.com>:

> Hi,
>
> I need to know the way to implement custom metrics in my flink program.
> Currently, I know we can create custom metrics with the help of
> RuntimeContext.
> But in my aggregate() I do not have RuntimeContext. I am using window
> operator and applying aggregate() method on it. And I am passing
> AggregateFunction in aggregate() method.
>
> So, Kindly guide me, how can I create custom metrics in my code?
>
> Note: As we know we can not user RichAggregateFunction with aggregate()
> method
>
> Thanks & Regards
> Gaurav Luthra
> Mob:- +91-9901945206
>


Custom Metrics in Windowed AggregateFunction

2018-12-18 Thread Chirag Dewan
Hi,
I am writing a Flink job for aggregating events in a window. 
I am trying to use the AggregateFunction implementation for this. 
Now, since WindowedStream does not allow a RichAggregateFunction for 
aggregation, I cant use the RuntimeContext to get the Metric group. 
I dont even see any other way of accessing the Metric Group in a non-rich 
function implementation?
Is there any way around here? 
Any help appreciated.
Thanks,
Chirag

Re: Custom S3 endpoint

2018-12-18 Thread Paul Lam
Hi Nick,

What version of Hadoop are you using? AFAIK, you must use Hadoop 2.7+ to 
support custom s3 endpoint, or the `fs.s3a.endpoint` property in core-site.xml 
would be ignored.

Best,
Paul Lam

> 在 2018年12月19日,06:40,Martin, Nick  写道:
> 
> I’m working on Flink 1.7.0 and I’m trying to use the built in S3 libraries 
> like readFile(‘s3://bucket/object’ ) or 
> StreamingFileSink. My storage provider is not AWS, but they implement the 
> same API. So I need to point the S3 client to a different address. The Hadoop 
> documentation shows that there are options in core-site to set that up. The 
> problem is, I can’t seem to get the right dependencies together to use the S3 
> filesystem. As far as I can tell, the pre-built Hadoop/Presto jars don’t use 
> core-site.xml, and the instructions for manual setup given here 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/aws.html#hadoop-provided-s3-file-systems---manual-setup
>  
> )
>  list a set of dependencies that seems to be completely wrong. 
>  
> How can use the S3 sources/sinks with a custom http endpoint?
>  
>  
>  
> Nick Martin
>  
> 
> Notice: This e-mail is intended solely for use of the individual or entity to 
> which it is addressed and may contain information that is proprietary, 
> privileged and/or exempt from disclosure under applicable law. If the reader 
> is not the intended recipient or agent responsible for delivering the message 
> to the intended recipient, you are hereby notified that any dissemination, 
> distribution or copying of this communication is strictly prohibited. This 
> communication may also contain data subject to U.S. export laws. If so, data 
> subject to the International Traffic in Arms Regulation cannot be 
> disseminated, distributed, transferred, or copied, whether incorporated or in 
> its original form, to foreign nationals residing in the U.S. or abroad, 
> absent the express prior approval of the U.S. Department of State. Data 
> subject to the Export Administration Act may not be disseminated, 
> distributed, transferred or copied contrary to U. S. Department of Commerce 
> regulations. If you have received this communication in error, please notify 
> the sender by reply e-mail and destroy the e-mail message and any physical 
> copies made of the communication.
>  Thank you. 
> *



Re: Watermark not firing to push data

2018-12-18 Thread Vijay Balakrishnan
Hi,
After looking at the code in EventTimeTrigger, I changed the Watermark to
be System.currentMillisecs + boundSecs( 5 secs) so that the window's maxTS
was <= watermark. I was able to consumer from Kinesis when I had only 50
records.

For TumblingWindow of 5 secs , the window maxTS was usually like around
currTime + 5 secs.
So, I set the watermark to System.currentMillisecs + 5 secs.
This way, the trigger fired and got into the AggregateFunction.getResult().

@Override
public TriggerResult onElement(Object element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
   if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {//<== This
check had to be met
  // if the watermark is already past the window fire immediately
  return TriggerResult.FIRE;
   } else {
  ctx.registerEventTimeTimer(window.maxTimestamp());
  return TriggerResult.CONTINUE;
   }
}


On Mon, Dec 17, 2018 at 10:00 AM Vijay Balakrishnan 
wrote:

> Hi,
> Thx for your reply and pointers on the currentLowWatermark. Looks like the
> Flink UI has tab for Watermarks itself for an Operator.
>
> I dump 5 records into the Kinesis Data Stream and am trying to read the
> same record from the FlinkKinesisConsumer and am not able to.
> I am using the same monitoring.getIntervalStart() in the Watermark
> generation(intervalStart - bound) in *MonitoringAssigner* class that I
> used to generate data on the producer side. I generate intervalStart on the
> Producer side which increments on each record by 3-10 millisecs. The
> watermark is being generated with intervalStart - bound(3 secs)-so, every
> watermark generated is > than the previous one. So, why does it not push
> data out ?  It gets into the MGroupingWindowAggregate.add(..) method but
> never gets into the MGroupingWindowAggregate.getResult(..) method ?? It
> works when i produce 1000 records or so into Kinesis data stream.
>
> Here is a gist of my code-
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> //FlinkConsumer
> Properties kinesisConsumerConfig = new Properties();
> ..
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
> "1");
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
> "2000");//2000
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> ConsumerConfigConstants.InitialPosition.*TRIM_HORIZON*.name());
> FlinkKinesisConsumer kinesisConsumer = FlinkKinesisConsumer<>(
> kinesisTopicRead, new MonitoringMapKinesisSchema(),
> kinesisConsumerConfig);
> final DataStreamSource monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
> DataStream kinesisStream =
> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
> *MonitoringAssigner*(3000));//code at bottom
>
> org.apache.flink.streaming.api.windowing.time.Time timeWindow =
> Time.seconds(5);
> final WindowedStream windowStream =
> kinesisStream.timeWindow(timeWindow);
> DataStream enrichedComponentInstanceStream1 =
> windowStream.*aggregate*(
> new *MGroupingWindowAggregate*(),//AggregateFunction
> impl
> new *MGroupingAggregateWindowProcessing*(...));
>
> public class MonitoringAssigner implements
> AssignerWithPunctuatedWatermarks {
> private long *bound = 3 * 1000*;//3 secs out of order bound in
> millisecs
> public MonitoringAssigner(long bound) {
> this.bound = bound;
> }
> public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
> extractedTimestamp) {
> long nextWatermark = extractedTimestamp - bound;
> return new Watermark(nextWatermark);
> }
> public long extractTimestamp(Monitoring monitoring, long previousTS) {
> LocalDateTime intervalStart = Utils.getLocalDateTime(
> *monitoring.getIntervalStart()*);//2012-07-12 02:21:06.057
> long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);
> return extractedTS;
> //return System.currentTimeMillis(); //this works fine.
> }
> }
>
> TIA,
> Vijay
>
> On Sat, Dec 15, 2018 at 5:42 AM Hequn Cheng  wrote:
>
>> Hi Vijay,
>>
>> Could you provide more information about your problem? For example
>> - Which kind of window do you use?
>> - What's the window size?
>> - A relatively complete code is better :-)
>>
>> As for the problem, it is probably the event time has not reached the end
>> of the window. You can monitor the watermark in the web dashboard[1].
>> Also, changing even time to processing time is another way to verify if
>> it is a watermark problem.
>>
>> Best, Hequn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html
>>
>>
>> On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>> Observations on Watermarks:
>>> Read this great article:
>>> 

Custom S3 endpoint

2018-12-18 Thread Martin, Nick
I'm working on Flink 1.7.0 and I'm trying to use the built in S3 libraries like 
readFile('s3://bucket/object') or StreamingFileSink. My storage provider is not 
AWS, but they implement the same API. So I need to point the S3 client to a 
different address. The Hadoop documentation shows that there are options in 
core-site to set that up. The problem is, I can't seem to get the right 
dependencies together to use the S3 filesystem. As far as I can tell, the 
pre-built Hadoop/Presto jars don't use core-site.xml, and the instructions for 
manual setup given here 
(https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/aws.html#hadoop-provided-s3-file-systems---manual-setup)
 list a set of dependencies that seems to be completely wrong.

How can use the S3 sources/sinks with a custom http endpoint?



Nick Martin



--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you. 
*


Kafka consumer, is there a way to filter out messages using key only?

2018-12-18 Thread Hao Sun
Hi, I am using 1.7 on K8S.

I have a huge amount of data in kafka, but I only need a tiny portion of it.
It is a keyed stream, the value in JSON encoded. I want to avoid
deserialization of the value, since it is very expensive. Can I only filter
based on the key?
I know there is a KeyedDeserializationSchema, but can I use it to filter
data?

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


Re: TimerService Troubleshooting/Metrics

2018-12-18 Thread Andrey Zagrebin
Hi Sayat,

As far as I know, there are no timer service metrics exposed at the moment.
I pull in Stefan into the thread, maybe, he could add more.

In case of RocksDB, you can try enabling RocksDB internal metrics [1].
Timer service uses RocksDB state backend to queue timers and has a dedicated 
column in RocksDB.
The metrics of this column might help, like 
'state.backend.rocksdb.metrics.estimate-num-keys’.

Best,
Andrey

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#rocksdb-native-metrics

> On 14 Dec 2018, at 16:01, sayat  wrote:
> 
> Dear Flink Community,
> 
> Is there a way of troubleshooting timer service? In the docs, it says that 
> the service might degrade a job performance significantly. Is there a way how 
> to expose and see timer service metrics i.e. length of the priority queue, 
> how many time the service fires etc?



How to migrate Kafka Producer ?

2018-12-18 Thread Edward Rojas
Hi,

I'm planning to migrate from kafka connector 0.11 to the new universal kafka
connector 1.0.0+ but I'm having some troubles.

The kafka consumer seems to be compatible but when trying to migrate the
kafka producer I get an incompatibility error for the state migration. 
It looks like the producer uses a list state of type
"NextTransactionalIdHint", but this class is specific for each Producer
(FlinkKafkaProducer011.NextTransactionalIdHint  vs
FlinkKafkaProducer.NextTransactionalIdHint) and therefore the states are not
compatible.


I would like to know what is the recommended way to perform this kind of
migration without losing the state ?

Thanks in advance,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink metrics in kubernetes deployment

2018-12-18 Thread Chesnay Schepler
If you're working with 1.7/master you're probably running into 
https://issues.apache.org/jira/browse/FLINK-11127 .


On 17.12.2018 18:12, eric hoffmann wrote:

Hi,
In a Kubernetes delpoyment, im not able to display metrics in the dashboard, I 
try to expose and fix the metrics.internal.query-service.port variable
But nothing. Do you have any ideas?
Thx
Eric






Re: Dataset column statistics

2018-12-18 Thread Flavio Pompermaier
Great, thanks!

On Tue, Dec 18, 2018 at 3:26 AM Kurt Young  wrote:

> Hi,
>
> We have implemented ANALYZE TABLE in our internal version of Flink, and we
> will try to contribute back to the community.
>
> Best,
> Kurt
>
>
> On Thu, Nov 29, 2018 at 9:23 PM Fabian Hueske  wrote:
>
>> I'd try to tune it in a single query.
>> If that does not work, go for as few queries as possible, splitting by
>> column for better projection push-down.
>>
>> This is the first time I hear somebody requesting ANALYZE TABLE.
>> I don't see a reason why it shouldn't be added in the future.
>>
>>
>>
>> Am Do., 29. Nov. 2018 um 12:08 Uhr schrieb Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> What do you advice to compute column stats?
>>> Should I run multiple job (one per column) or try to compute all at once?
>>>
>>> Are you ever going to consider supporting ANALYZE TABLE (like in Hive or
>>> Spark) in Flink Table API?
>>>
>>> Best,
>>> Flavio
>>>
>>> On Thu, Nov 29, 2018 at 9:45 AM Fabian Hueske  wrote:
>>>
 Hi,

 You could try to enable object reuse.
 Alternatively you can give more heap memory or fine tune the GC
 parameters.

 I would not consider it a bug in Flink, but might be something that
 could be improved.

 Fabian


 Am Mi., 28. Nov. 2018 um 18:19 Uhr schrieb Flavio Pompermaier <
 pomperma...@okkam.it>:

> Hi to all,
> I have a batch dataset  and I want to get some standard info about its
> columns (like min, max, avg etc).
> In order to achieve this I wrote a simple program that use SQL on
> table API like the following:
>
> SELECT
> MAX(col1), MIN(col1), AVG(col1),
> MAX(col2), MIN(col2), AVG(col2),
> MAX(col3), MIN(col3), AVG(col3)
> FROM MYTABLE
>
> In my dataset I have about 50 fields and the query becomes quite big
> (and the job plan too).
> It seems that this kind of job cause the cluster to crash (too much
> garbage collection).
> Is there any smarter way to achieve this goal (apart from running a
> job per column)?
> Is this "normal" or is this a bug of Flink?
>
> Best,
> Flavio
>

>>>