Re: Detect late data in processing time

2018-07-30 Thread vino yang
Hi Averell, I personally don't recommend this. In fact, Processing Time uses the local physical clock of the node where the specific task is located, rather than setting it upstream in advance. This is a bit like another time concept provided by Flink - Ingestion Time. So, If you do not specify

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell, Actually, Performing a key partition inside the Source Function is the same as DataStream[Source].keyBy(cumstom partitioner), because keyBy is not a real operator, but a virtual node in a DAG, which does not correspond to a physical operator. Thanks, vino. 2018-07-31 10:52 GMT+08:00

Re: Detect late data in processing time

2018-07-30 Thread Averell
Hi Soheil, Why don't you just use the processing time as event time? Simply overriding extractTimestamp to return your processing time. Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: flink_taskmanager_job_task_operator_records_lag_max == -Inf on Flink 1.4.2

2018-07-30 Thread Tony Wei
Hi Juilo, As Gordon said, the `records_lag_max` metric is a Kafka-shipped metric [1]. And I also found this thread [2] in Kafka mailing list. It seems that it is the design inner Kafka. So I think there is nothing we can do in Flink-Kafka-Connector. BTW, the Kafka document [1] said

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Hi Vino, I'm a little bit confused. If I can do the partitioning from within the source function, using the same hash function on the key to identify the partition, would that be sufficient to avoid shuffling in the next byKey call? Thanks. Averell -- Sent from:

Re: splitting DataStream throws error

2018-07-30 Thread vino yang
Answered Mich privately, copy here: *Hi Mich,* *The use of Split directly on the stream object is wrong. * *It is used to split the data in the stream object, not the format of the stream object data itself. In this scenario, if you want to parse the data, use the map function only after the

Re: scala IT

2018-07-30 Thread vino yang
Hi Nicos, The thrown exception has given you a clear solution hint: The return type of function 'apply(Mu ltiplyByTwoTest.scala:43)' could not be determined automatically, due to type erasure. You can giv e type information hints by using the returns(...) method on the result of the

Re: scala IT

2018-07-30 Thread Hequn Cheng
Hi Nicos, It is weird. Have you updated your code? I check your code and the function has implemented ResultTypeQueryable. The code should works well. Best, Hequn On Tue, Jul 31, 2018 at 6:30 AM, Nicos Maris wrote: > Hi all, > > > the integration test in scala documented at the testing

Re: Flink on Mesos: containers question

2018-07-30 Thread Renjie Liu
Hi: As I said the docker process and job manager process are the same one. To start task manager in docker, you need to specify in the job master config "mesos.resourcemanager.tasks.container.type" to "docker", otherwise flink will just start task manager as processes. I don't understand what do

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell, The keyBy transformation will trigger the key partition, which is one of the various partition types supported by Flink, which causes the data to be shuffled. It routes the keys of the same hash value to the same node based on the hash of the key you passed (or generated by the custom

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Oh, Thank you Vino. I was not aware of that reshuffling after every custom partitioning. Why would that needed though? Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Counting elements that appear "behind" the watermark

2018-07-30 Thread Elias Levy
You can create a ProcessFunction. That gives you access to getRuntimeContext to register metrics, to the element timestamp, and the current watermark. Keep in mind that operators first process a record and then process any watermark that was the result of that record, so that when you get the

Re: Counting elements that appear "behind" the watermark

2018-07-30 Thread Hequn Cheng
Hi Julio, If I understand correctly, you want to adjust your watermarks automatically? It is true that there are no direct ways to get metric from the AssignerWithPeriodicWatermarks. Adding ProcessFunction before assignTimestampsAndWatermarks seems a solution. In the ProcessFunction, you can

Re: Description of Flink event time processing

2018-07-30 Thread Elias Levy
Fabian, You have any time to review the changes? On Thu, Jul 19, 2018 at 2:19 AM Fabian Hueske wrote: > Hi Elias, > > Thanks for the update! > I'll try to have another look soon. > > Best, Fabian > > 2018-07-11 1:30 GMT+02:00 Elias Levy : > >> Thanks for all the comments. I've updated the

scala IT

2018-07-30 Thread Nicos Maris
Hi all, the integration test in scala documented at the testing section fails: https://travis-ci.org/nicosmaris/HelloDockerScalaSbt/builds/410075764 In previous commits of my demo repo, I tried typextractor , basictypeinfo and resultTypeQuerable with no success. I am new to flink and to Scala

Re: splitting DataStream throws error

2018-07-30 Thread Mich Talebzadeh
Thanks So the assumption is that one cannot perform split on DataStream[String] directly? Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw *

Re: splitting DataStream throws error

2018-07-30 Thread Chesnay Schepler
You define a flatMap function that takes a string, calls String#split on it and collects the array. On 30.07.2018 22:04, Mich Talebzadeh wrote: Hi, I have Kafka streaming feeds where a row looks like below where fields are separated by "," I can split them easily with split function

Re: AM Delegation Token Regeneration

2018-07-30 Thread Shuyi Chen
Hi Paul, currently, Flink intentionally disables DT and only use keytab. I am not aware that DT regeneration is part of FLIP-6 (@till, correct me if I am wrong). I've created a security improvement design

splitting DataStream throws error

2018-07-30 Thread Mich Talebzadeh
Hi, I have Kafka streaming feeds where a row looks like below where fields are separated by "," I can split them easily with split function scala> val oneline = "05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48" oneline: String =

RE: Flink on Mesos: containers question

2018-07-30 Thread NEKRASSOV, ALEXEI
Renjie, In my observation Task Managers don’t run in Docker containers – they run as JVM processes directly on the VM. The only Docker container is the one that runs Job Manager. What am I missing? Thanks, Alex From: Renjie Liu [mailto:liurenjie2...@gmail.com] Sent: Friday, July 20, 2018 8:56

Delay in REST/UI readiness during JM recovery

2018-07-30 Thread Joey Echeverria
I’m running Flink 1.5.0 in Kubernetes with HA enabled, but only a single Job Manager running. I’m using Zookeeper to store the fencing/leader information and S3 to store the job manager state. We’ve been running around 250 or so streaming jobs and we’ve noticed that if the job manager pod is

Re: flink_taskmanager_job_task_operator_records_lag_max == -Inf on Flink 1.4.2

2018-07-30 Thread Julio Biason
Hey Gordon, (Reviving this long thread) I think I found part of the problem: It seems the metric is capturing the lag from time to time and reseting the value in-between. I managed to replicate this attaching a SQL Sink (JDBCOutputFormat) connecting to an outside database -- something that took

Counting elements that appear "behind" the watermark

2018-07-30 Thread Julio Biason
Hello, Our current watermark model is "some time behind the most recent seen element" (very close to what the docs have in "Periodic Watermark" https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks). It fits our current

Re: flink command line with save point job is not being submitted

2018-07-30 Thread vino yang
Hi Darshan, This is a known problem with Flink, and no specific exception information is given, making diagnosis more difficult. I personally guess that you are using a local file system, which may be the cause of the problem. Can you specify a HDFS with access permission for Savepoint? Thanks,

Re: Committing Kafka Transactions during Savepoint

2018-07-30 Thread vino yang
Hi Scott, For EXACTLY_ONCE in sink end with Kafka 0.11+ producer, The answer is YES. There is a official documentation you can have a good knowledge of this topic[1]. [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011 Thanks, vino. 2018-07-27

flink command line with save point job is not being submitted

2018-07-30 Thread Darshan Singh
I am trying to submit a job with the savepoint/checkpoint and it is failing with below error. Without -s flag it works fine. Am i missing something here? Thanks >bin/flink run -d -c st -s file:///tmp/db/checkpoint/ ./target/poc-1.0-SNAPSHOT-jar-with-dependencies.jar Starting execution of

Re: Detect late data in processing time

2018-07-30 Thread vino yang
Hi Soheil, Watermark indicates the progress of the Event time. The reason it exists is because there is a Time skew between Event time and Processing time. Hequn is correct and Watermark cannot be used for processing time. The processing time will be based on the TM local system clock. Usually,

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell, As far as I know, the custom partitioner will inevitably lead to shuffle of data. Even if it is bundled in the logic of the source function, isn't the behavior different? Thanks, vino. 2018-07-30 20:32 GMT+08:00 Averell : > Thanks Vino. > > Yes, I can do that after the source

Re: Detect late data in processing time

2018-07-30 Thread Hequn Cheng
Hi Soheil, No, we can't set watermark during processing time. And there are no late data considering processing time window. So the problem is what data is bad data when you use processing time? Maybe there are other ways to solve your problem. Best, Hequn On Mon, Jul 30, 2018 at 8:22 PM,

Re: Order of events in a Keyed Stream

2018-07-30 Thread Harshvardhan Agrawal
Thanks for the response guys. Based on Niels response, it seems like a keyby immediately after reading from the source should map all messages with the account number on the same slot. On Sun, Jul 29, 2018 at 05:33 Renjie Liu wrote: > Hi, > Another way to ensure order is by adding a logical

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Thanks Vino. Yes, I can do that after the source function. But that means data would be shuffled - sending from every source to the right partition. I think that by doing the partition from within the file source would help to save that shuffling. Thanks. Averell. -- Sent from:

Detect late data in processing time

2018-07-30 Thread Soheil Pourbafrani
In Event Time, we can gather bad data using OutputTag, because in Event Time we have Watermark and we can detect late data. But in processing time mode we don't have any watermark to detect bad data. I want to know can we set watermark (for example according to taskmanager's timestamp) and use

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell, Yes, you can not do it in the source function. I think you can call keyBy with a partitioner (based on NodeID) after source. Why do you have to use the customized partitioner in the source function? Thanks, vino. 2018-07-30 19:56 GMT+08:00 Averell : > Thank you Vino. > > Yes, I

AM Delegation Token Regeneration

2018-07-30 Thread Paul Lam
Hi, At present, Flink distribute keytabs via YARN to the nodes that is running a Flink job, and this might be a potential security problem. I’ve read FLINK-3670 and the corresponding mail list discussions, and I think a more appropriate implementation would be like Spark’s: regenerate

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Thank you Vino. Yes, I went thru that official guide before posting this question. The problem was that I could not see any call to one of those mentioned partitioning methods (partitionCustom, shuffle, rebalance, rescale, or broadcast) in the original readFile function. I'm still trying to look

Re: Logs are not easy to read through webUI

2018-07-30 Thread vino yang
Hi Xinyu, Thanks for your suggestion. I will CC this suggestion to some PMC members of Flink. Thanks, vino. 2018-07-30 18:03 GMT+08:00 Xinyu Zhang : > Hi vino > > Yes, it's only from the perspective of performance of reading log or > metrics. If the logs with timestamps(e.g.

Re: Questions on Unbounded number of keys

2018-07-30 Thread Fabian Hueske
Hi Chang, The state handle objects are not created per key but just once per function instance. Instead they route state accesses to the backend (JVM heap or RocksDB) for the currently active key. Best, Fabian 2018-07-30 12:19 GMT+02:00 Chang Liu : > Hi Andrey, > > Thanks for your reply. My

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell, Did you know Flink allow you to customize a partitioner? Some resource : official documentation : https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/#physical-partitioning discussing in mailing list :

Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Hi everyone, We are collecting log files from tens of thousands of network nodes, and we need to do some data insights using that. The files are coming with the corresponding node ID in the file name, and I want to do custom partitioning using that Node ID. Right now (with Flink 1.5) I think that

Re: S3 file source - continuous monitoring - many files missed

2018-07-30 Thread Averell
Here is my https://github.com/lvhuyen/flink implementation of the change. 3 files were updated: StreamExecutionEnvironment.java, StreamExecutionEnvironment.scala, and ContinuousFileMonitoringFunction.java. All the thanks to Fabian. -- Sent from:

Re: Running a Python streaming job with Java dependencies

2018-07-30 Thread Chesnay Schepler
To use java classes not bundled with FLink you will have to place a jar containing said classes into the /lib directory of the distribution. On 25.07.2018 23:32, Joe Malt wrote: Hi, I'm trying to run a job with Flink's new Python streaming API but I'm running into issues with Java imports.

Flink job is not reading from certain kafka topic partitions

2018-07-30 Thread vivekyadav01
We have a Flink stream job that uses FlinkKafkaConsumer010. We keep the kafkaconsumer operator's parallelism same as the number of partition in kafka topic so that each partition is attached to one subtask. Quite frequently the job stops reading from certain partitions. On investigating under job

Flink job is not stops reading or stops reading from certain kafka topic partitions

2018-07-30 Thread vivekyadav01
We have a Flink stream job that uses FlinkKafkaConsumer010. We keep the kafkaconsumer operator's parallelism same as the number of partition in kafka topic so that each partition is attached to one subtask. Quite frequently the job stops reading from certain partitions. On investigating under job

Re: Questions on Unbounded number of keys

2018-07-30 Thread Chang Liu
Hi Andrey, Thanks for your reply. My question might be silly, but there is still one part I would like to fully understand. For example, in the following example: class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed by Session ID lazy val userId: ValueState[String]

Re: Logs are not easy to read through webUI

2018-07-30 Thread Xinyu Zhang
Hi vino Yes, it's only from the perspective of performance of reading log or metrics. If the logs with timestamps(e.g. jobmanager.log.2018-07-29) will never change, maybe blob store can cache some of them to improve performance. BTW, please considering to develop an API for reading logs. I

Re: Logs are not easy to read through webUI

2018-07-30 Thread vino yang
Hi Xinyu, Thank you for your clarification on "periodic reading". If Flink considers developing an API for reading logs, I think this is a good idea. Regarding the problem of TM reading logs, your idea is good from a performance perspective. But Flink didn't provide any web services for the TM

Re: watermark VS window trigger

2018-07-30 Thread vino yang
Hi Soheil, I feel that some of your understanding is a bit problematic. *"After that according to the current watermark, data with the timestamp between the last watermark and current watermark will be released and go to the next steps"* The main role of Watermark here is to define the progress

Logs are not easy to read through webUI

2018-07-30 Thread Xinyu Zhang
Thanks for your reply. "periodic reading" means reading all logs in a given time interval. For example, my logs is daily divided, I can get all logs of yesterday through a parameter like '2018-07-29/2018-07-30'. TM which provides a web service to display information will lessen the burden of

Re: RuntimeException with valve output watermark when using CoGroup

2018-07-30 Thread Taneli Saastamoinen
On 27 July 2018 at 19:21, Chesnay Schepler wrote: > At first glance this looks like a bug. Is the nothing in the stack trace after the NullPointerException? Hmm, there is actually, sorry about that: Caused by: java.lang.NullPointerException at

Re: Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Thanks again. The Hbase connector works fine in Flink // Start Hbase table stuff val tableName = "MARKETDATAHBASESPEEDFLINK" val hbaseConf = HBaseConfiguration.create() // Connecting to remote Hbase hbaseConf.set("hbase.master", hbaseHost)

Re: Logs are not easy to read through webUI

2018-07-30 Thread vino yang
Hi Xinyu, This is indeed a problem. Especially when the amount of logs is large, it may even cause the UI to stall for a long time. The same is true for YRAN, and there is really no good way to do it at the moment. Thank you for your suggestion, do you mean "periodic reading" refers to full or

watermark VS window trigger

2018-07-30 Thread Soheil Pourbafrani
Suppose we have a time window of 10 milliseconds and we use EventTime. First, we determine how Flink can get time and watermark from incoming messages, after that, we set a key for the stream and set a time window. aggregatedTuple .assignTimestampsAndWatermarks(new

Re: Working out through individual messages in Flink

2018-07-30 Thread Fabian Hueske
A *Table*Source [1], is a special input connector for Flink's relational APIs (Table API and SQL) [2]. You can transform and filter with these APIs as well (it's probably even easier). In SQL this would be the SELECT and WHERE clauses of a query. However, there is no *Table*Sink for HBase and you

Re: Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Thanks Fabian. That was very useful. How about an operation like below? // create builder val KafkaTableSource = Kafka011JsonTableSource.builder() // set Kafka topic .forTopic(topicsValue) // set Kafka consumer properties

Logs are not easy to read through webUI

2018-07-30 Thread Xinyu Zhang
Hi all We use flink on yarn and flink version is 1.4. When a streaming job run for a long time, the webUI cannot show logs. This may be becasue the log size is too large. However, if we use the DailyRollingAppender to divide logs (granularity is `day`) in log4j.properties, we will never see the

Re: Working out through individual messages in Flink

2018-07-30 Thread Renjie Liu
Hi, Mich: You can add write a sink function for that. On Mon, Jul 30, 2018 at 2:58 PM Mich Talebzadeh wrote: > > Hi, > > I have a Kafka topic that transmits 100 security prices ever 2 seconds. > > In Spark streaming I go through the RDD and walk through rows one by one > and check prices > In

Re: Flink wrong Watermark in Periodic watermark

2018-07-30 Thread Xingcan Cui
HI Soheil, That may relate to your parallelism since each extractor instance compute its own watermarks. Try to print the max timestamps with the current thread’s name and you will notice this. Best, Xingcan > On Jul 30, 2018, at 3:05 PM, Soheil Pourbafrani wrote: > > Using Flink EventTime

Flink wrong Watermark in Periodic watermark

2018-07-30 Thread Soheil Pourbafrani
Using Flink EventTime feature, I implement the class AssignerWithPeriodicWatermark such that: public static class SampleTimestampExtractor implements AssignerWithPeriodicWatermarks> { private static final long serialVersionUID = 1L; private long MAX_TIMESTAMP; private final long DELEY

Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Hi, I have a Kafka topic that transmits 100 security prices ever 2 seconds. In Spark streaming I go through the RDD and walk through rows one by one and check prices In prices are valuable I store them into an Hbase table val dstream = KafkaUtils.createDirectStream[String, String,