Re: Are there pipeline API's for ETL?

2020-01-10 Thread vino yang
Hi kant,

Can you provide more context about your question? What do you mean about
"pipeline API"?

IMO, you can build an ETL pipeline via composing several Flink transform
APIs. About choosing which transform APIs, it depends on your business
logic.

Here are the generic APIs list.[1]

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/index.html

kant kodali  于2020年1月11日周六 上午9:06写道:

> Hi All,
>
> I am wondering if there are pipeline API's for ETL?
>
> Thanks!
>
>
>


Re: Checkpoints issue and job failing

2020-01-05 Thread vino yang
Hi Navneeth,

Since the file still exists, this exception is very strange.

I want to ask, does it happen by accident or frequently?

Another concern is that since the 1.4 version is very far away, all
maintenance and response are not as timely as the recent versions. I
personally recommend upgrading as soon as possible.

I can ping @Piotr Nowojski   and see if it is possible
to explain the cause of this problem.

Best,
Vino

Navneeth Krishnan  于2020年1月4日周六 上午1:03写道:

> Thanks Congxian & Vino.
>
> Yes, the file do exist and I don't see any problem in accessing it.
>
> Regarding flink 1.9, we haven't migrated yet but we are planning to do.
> Since we have to test it might take sometime.
>
> Thanks
>
> On Fri, Jan 3, 2020 at 2:14 AM Congxian Qiu 
> wrote:
>
>> Hi
>>
>> Do you have ever check that this problem exists on Flink 1.9?
>>
>> Best,
>> Congxian
>>
>>
>> vino yang  于2020年1月3日周五 下午3:54写道:
>>
>>> Hi Navneeth,
>>>
>>> Did you check if the path contains in the exception is really can not be
>>> found?
>>>
>>> Best,
>>> Vino
>>>
>>> Navneeth Krishnan  于2020年1月3日周五 上午8:23写道:
>>>
>>>> Hi All,
>>>>
>>>> We are running into checkpoint timeout issue more frequently in
>>>> production and we also see the below exception. We are running flink 1.4.0
>>>> and the checkpoints are saved on NFS. Can someone suggest how to overcome
>>>> this?
>>>>
>>>> [image: image.png]
>>>>
>>>> java.lang.IllegalStateException: Could not initialize operator state 
>>>> backend.
>>>>at 
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>>>>at 
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>>>>at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>>>at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>>>at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>>at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.io.FileNotFoundException: 
>>>> /mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01
>>>>  (No such file or directory)
>>>>at java.io.FileInputStream.open0(Native Method)
>>>>at java.io.FileInputStream.open(FileInputStream.java:195)
>>>>at java.io.FileInputStream.(FileInputStream.java:138)
>>>>at 
>>>> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>>>>
>>>>
>>>> Thanks
>>>>
>>>>


Re: Flink logging issue with logback

2020-01-05 Thread vino yang
Hi Bajaj,

>> Logs from main method(outside of job graph) do not show up in jobmanager
logs.

IMO, it's normal phenomena.

Other ideas, please check the JVM options mentioned by Yang.

Best,
Vino


Yang Wang  于2020年1月6日周一 上午11:18写道:

> Hi Bajaj, Abhinav,
>
> Could you share the start-command of jobmanager and taskmanager. If it is
> started correctly, we
> will have a the following jvm options.
>
> -Dlog.file=/path/of/taskmanager.log
> -Dlogback.configurationFile=file:///path/of/logback.xml
>
>
>
> Best,
> Yang
>
> Bajaj, Abhinav  于2020年1月4日周六 上午7:23写道:
>
>> Hi,
>>
>>
>>
>> I am investigating a logging issue with Flink.
>>
>>
>>
>> *Setup*
>>
>>- Using Flink-1.7.1 using logback as suggested in Flink documentation
>>here
>>
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/best_practices.html#use-logback-when-running-flink-on-a-cluster>
>>.
>>- Submitting the Flink job from the Flink dashboard.
>>
>>
>>
>> *Observations*
>>
>>- Logs from main method(outside of job graph) do not show up in
>>jobmanager logs.
>>- Logs from the operators like map or custom operators do show up in
>>the taskmanager logs.
>>- Logs from main method do show up in jobmanager logs when using
>>log4j in place of logback.
>>
>>
>>
>> Has anyone else noticed similar behavior or is this a known issue with
>> logback integration in Flink?
>>
>> Any suggestions on potential workaround or fix?
>>
>>
>>
>> Appreciate your time and help.
>>
>>
>>
>> ~ Abhinav Bajaj
>>
>>
>>
>


Re: Checkpoints issue and job failing

2020-01-02 Thread vino yang
Hi Navneeth,

Did you check if the path contains in the exception is really can not be
found?

Best,
Vino

Navneeth Krishnan  于2020年1月3日周五 上午8:23写道:

> Hi All,
>
> We are running into checkpoint timeout issue more frequently in production
> and we also see the below exception. We are running flink 1.4.0 and the
> checkpoints are saved on NFS. Can someone suggest how to overcome this?
>
> [image: image.png]
>
> java.lang.IllegalStateException: Could not initialize operator state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: 
> /mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01
>  (No such file or directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>
>
> Thanks
>
>


Re: Session Window with dynamic gap

2020-01-02 Thread vino yang
Hi KristoffSC,

>> Are there any plans to add support of Flink State into
SessionWindowTimeGapExtractor?

As I said, `SessionWindowTimeGapExtractor` is neither a general UDF nor an
operator.

But I cannot give a clear answer. Let me ping @Aljoscha Krettek
  to give the answer.

Best,
Vino

KristoffSC  于2020年1月3日周五 上午6:17写道:

> Ok,
> I did some more tests and yep, it seems that there is no way to use Flink's
> State in class that will implement SessionWindowTimeGapExtractor.
>
> Even if I will implement this interface on a class that is an operator,
> whenever extract method is called it does not have any access to Flink's
> state. Even calling getRuntimeContext() from it throws an exception.
>
> Are there any plans to add support of Flink State into
> SessionWindowTimeGapExtractor?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Session Window with dynamic gap

2020-01-02 Thread vino yang
Hi KristoffSC,

Firstly, IMO, you can implement this feature by customizing the
`SessionWindowTimeGapExtractor`.

Additionally, let me clearify a concept. A component that implements the
`SessionWindowTimeGapExtractor` interface should not be an operator in
Flink.

In Flink's concepts, Window is an operator, it contains several components:
assigner, trigger, evictors and so on.[1]

>From Flink's codebase, I did not find a specific implementation of this
interface. And it may not access the Flink's state. However, you can still
customize this interface and got the new dynamic gap value via accessing
the third-party systems (Kafka, Redis, ZooKeeper...). For each element,
when assigning a session window for it, Flink always
invoke `SessionWindowTimeGapExtractor#extract()` method. So it makes sense.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html

KristoffSC  于2020年1月2日周四 下午7:37写道:

> Hi all,
> I'm exploring Flink for our new project.
>
> Currently I'm playing with Session Windows with dynamic Gap. In short, I
> would like to be able to change the value of the gap on demand, for example
> on config update.
>
> So I'm having this code:
>
>
> messageStream
> .keyBy(tradeKeySelector)
> .window(ProcessingTimeSessionWindows.withDynamicGap(new
>   SessionWindowTimeGapExtractor() {
> @Override
> public long extract(EnrichedMessage element) {
>* // Try to dynamically change the gap here
> // milliseconds.
> return 5000;*
> }
> }))
> .process(new CumulativeTransactionOperator())
> .name("Aggregate Transaction Builder");
>
> I would assume something like "broadcast pattern" here, although this is
> related to operators and we are interested with
> SessionWindowTimeGapExtractor here.
>
> Probably we will keep the gap size in a Flink State, not sure if it has to
> be keyed state or "operator state". Updates will come from external
> system.
>
> So I guess, what i need here is actually an operator that will implements
> SessionWindowTimeGapExtractor interface. Instance of this operator will
> keep/update the state based on Config updates and returns the gap size like
> SessionWindowTimeGapExtractor.
>
> Would it be a valid approach for this use case? Is it any other way to have
> such a config in Flink state?
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Sub-user

2020-01-02 Thread vino yang
Hi Jary,

All the Flink's mailing list information can be found here[1].

[1]: https://flink.apache.org/community.html#mailing-lists

Best,
Vino

Benchao Li  于2020年1月2日周四 下午4:56写道:

> Hi Jary,
>
> You need to send a email to *user-subscr...@flink.apache.org
> * to subscribe, not user@flink.apache.org
> .
>
> Jary Zhen  于2020年1月2日周四 下午4:53写道:
>
>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: An issue with low-throughput on Flink 1.8.3 running Yahoo streaming benchmarks

2019-12-30 Thread vino yang
Hi Shinhyung,

Can you compare the performance of the different Flink versions based on
the same environment (Or at least the same configuration of the node and
framework)?

I see there are some different configurations of both clusters and
frameworks. It would be better to comparison in the same environment so
that we can figure out why there are more than 4x performance differences.

WDYT?

Best,
Vino

Shinhyung Yang  于2019年12月30日周一 下午1:45写道:

> Dear Flink Users,
>
> I'm running the Yahoo streaming benchmarks (the original version) [1]
> on Flink 1.8.3 and got 60K tuples per second. Because I got 282K
> tuples per second with Flink 1.1.3, I would like to ask your opinions
> where I should look at.
>
> I have been using one node for a JobManager and 10 nodes for a
> TaskManager per each.
>
> Below is my current setting for the benchmark and Flink 1.8.3:
>
> * 16 vCPUs and 24 GiB for the JobManager node
> * 32 vCPUs and 32 GiB for each TaskManager node
>
> # localConf.yaml
> kafka.partitions: 5
> process.hosts: 1
> process.cores: 32
>
> # flink-conf.yaml
> jobmanager.heap.size: 5120m
> taskmanager.heap.size: 20480m
> taskmanager.numberOfTaskSlots: 16
> parallelism.default: 1
>
> And the following is the previous settings for the benchmark and Flink
> 1.1.3:
>
> * 16 vCPUs and 24 GiB for the JobManager node and 10 TaskManager nodes
>
> #localConf.yaml
> kafka.partitions: 5
> process.hosts: 1
> process.cores: 16
>
> # flink-conf.yaml
> jobmanager.heap.mb: 1024
> taskmanager.heap.mb: 15360
> taskmanager.numberOfTaskSlots: 16
> taskmanager.memory.preallocate: false
> parallelism.default: 1
> taskmanager.network.numberOfBuffers: 6432
>
>
> Thank you and with best regards,
> Shinhyung Yang
>
> [1]: https://github.com/yahoo/streaming-benchmarks
>


Re: Flink TaskManager Memory

2019-12-26 Thread vino yang
Hi Tim,

Reference a blog comes from Ververica:

"When you choose RocksDB as your state backend, your state lives as a
serialized byte-string in either the off-heap memory or the local disk."

It also contains many tune config options you can consider.[1]

Best,
Vino

[1]: https://www.ververica.com/blog/manage-rocksdb-memory-size-apache-flink

Timothy Victor  于2019年12月27日周五 上午7:11写道:

> For Streaming Jobs that use RocksDB my understanding is that state is
> allocated off-year via RocksDB.
>
> If this is true then does it still make sense to leave 70% (default
> taskmanager.memory.fraction) of the heap for Flink Manged memory given that
> it is likely not being used for state?Or am I missing something, and it
> is in fact still used?
>
> Thanks
>
> Tim
>


Re: Flink Dataset to ParquetOutputFormat

2019-12-26 Thread vino yang
Hi Anji,

Actually, I am not familiar with how to partition via timestamp. Flink's
streaming BucketingSink provides this feature.[1] You may refer to this
link and customize your sink.

I can ping a professional committer who knows more detail of FS connector
than me, @kklou...@gmail.com  may give you help.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink

aj  于2019年12月27日周五 上午1:51写道:

> Thanks Vino.
>
> I am able to write data in parquet now. But now the issue is how to write
> a dataset to multiple output path as per timestamp partition.
> I want to partition data on date wise.
>
> I am writing like this currently that will write to single output path.
>
> DataSet> df = allEvents.flatMap(new 
> EventMapProcessor(schema.toString())).withParameters(configuration);
>
> Job job = Job.getInstance();
> AvroParquetOutputFormat.setSchema(job, book_bike.getClassSchema());
> HadoopOutputFormat parquetFormat = new HadoopOutputFormat GenericRecord>(new AvroParquetOutputFormat(), job);
> FileOutputFormat.setOutputPath(job, new Path(outputDirectory));
>
> df.output(parquetFormat);
> env.execute();
>
>
> Please suggest.
>
> Thanks,
> Anuj
>
> On Mon, Dec 23, 2019 at 12:59 PM vino yang  wrote:
>
>> Hi Anuj,
>>
>> After searching in Github, I found a demo repository about how to use
>> parquet in Flink.[1]
>>
>> You can have a look. I can not make sure whether it is helpful or not.
>>
>> [1]: https://github.com/FelixNeutatz/parquet-flinktacular
>>
>> Best,
>> Vino
>>
>> aj  于2019年12月21日周六 下午7:03写道:
>>
>>> Hello All,
>>>
>>> I am getting a set of events in JSON that I am dumping in the hourly
>>> bucket in S3.
>>> I am reading this hourly bucket and created a DataSet.
>>>
>>> I want to write this dataset as a parquet but I am not able to figure
>>> out. Can somebody help me with this?
>>>
>>>
>>> Thanks,
>>> Anuj
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


Re: Rewind offset to a previous position and ensure certainty.

2019-12-25 Thread vino yang
Hi Ruibin,

Are you finding how to generate watermark pre Kafka partition?
Flink provides Kafka-partition-aware watermark generation. [1]

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition

邢瑞斌  于2019年12月25日周三 下午8:27写道:

> Hi,
>
> I'm trying to use Kafka as an event store and I want to create several
> partitions to improve read/write throughput. Occasionally I need to rewind
> offset to a previous position for recomputing. Since order isn't guaranteed
> among partitions in Kafka, does this mean that Flink won't produce the same
> results as before when rewind even if it uses event time? For example,
> consumer for a partition progresses extremely fast and raises watermark, so
> events from other partitions are discarded. Is there any ways to prevent
> this from happening?
>
> Thanks in advance!
>
> Ruibin
>


Re: Rewind offset to a previous position and ensure certainty.

2019-12-25 Thread vino yang
Hi Ruibin,

Are you finding how to generate watermark pre Kafka partition?
Flink provides Kafka-partition-aware watermark generation. [1]

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition

邢瑞斌  于2019年12月25日周三 下午8:27写道:

> Hi,
>
> I'm trying to use Kafka as an event store and I want to create several
> partitions to improve read/write throughput. Occasionally I need to rewind
> offset to a previous position for recomputing. Since order isn't guaranteed
> among partitions in Kafka, does this mean that Flink won't produce the same
> results as before when rewind even if it uses event time? For example,
> consumer for a partition progresses extremely fast and raises watermark, so
> events from other partitions are discarded. Is there any ways to prevent
> this from happening?
>
> Thanks in advance!
>
> Ruibin
>


Re: Apache Flink - Flink Metrics collection using Prometheus on EMR from streaming mode

2019-12-24 Thread vino yang
Hi Mans,

IMO, the mechanism of metrics reporter does not depend on any deployment
mode.

>> is there any Prometheus configuration or service discovery option
available that will dynamically pick up the metrics from the Filnk job and
task managers running in cluster ?

Can you share more information about your scene?

>> I believe for a batch job I can configure flink config to use Prometheus
gateway configuration but I think this is not recommended for a streaming
job.

What does this mean? Why the Prometheus gateway configuration for Flink
batch job is not recommended for a streaming job?

Best,
Vino

M Singh  于2019年12月24日周二 下午4:02写道:

> Hi:
>
> I wanted to find out what's the best way of collecting Flink metrics using
> Prometheus in a streaming application on EMR/Hadoop.
>
> Since the Flink streaming jobs could be running on any node - is there any
> Prometheus configuration or service discovery option available that will
> dynamically pick up the metrics from the Filnk job and task managers
> running in cluster ?
>
> I believe for a batch job I can configure flink config to use Prometheus
> gateway configuration but I think this is not recommended for a streaming
> job.
>
> Please let me know if you have any advice.
>
> Thanks
>
> Mans
>


Re: Flink Dataset to ParquetOutputFormat

2019-12-22 Thread vino yang
Hi Anuj,

After searching in Github, I found a demo repository about how to use
parquet in Flink.[1]

You can have a look. I can not make sure whether it is helpful or not.

[1]: https://github.com/FelixNeutatz/parquet-flinktacular

Best,
Vino

aj  于2019年12月21日周六 下午7:03写道:

> Hello All,
>
> I am getting a set of events in JSON that I am dumping in the hourly
> bucket in S3.
> I am reading this hourly bucket and created a DataSet.
>
> I want to write this dataset as a parquet but I am not able to figure out.
> Can somebody help me with this?
>
>
> Thanks,
> Anuj
>
>
> 
>


Re: Flink On K8s, build docker image very slowly, is there some way to make it faster?

2019-12-22 Thread vino yang
Hi Lake,

Can you clearly count or identify which steps are taking a long time?

Best,
Vino

LakeShen  于2019年12月23日周一 下午2:46写道:

> Hi community , when I run the flink task on k8s , the first thing is that
> to build the flink task jar to
> Docker Image . I find that It would spend much time to build docker image.
> Is there some way to makr it faster.
> Thank your replay.
>


Re: Flink Prometheus metric doubt

2019-12-19 Thread vino yang
Hi Jesus,

IMHO, maybe @Chesnay Schepler  can provide more
information.

Best,
Vino

Jesús Vásquez  于2019年12月19日周四 下午6:57写道:

> Hi all, i'm monitoring Flink jobs using prometheus.
> I have been trying to use the metrics flink_jobmanager_job_uptime/downtime
> in order to create an alert, that fires when one of this values emits -1
> since the doc says this is the behavior of the metric when the job gets to
> a completed state.
> The thing is that i have tested the behavior when one of my job fails and
> the mentioned metrics never emit something different than zero. Finally the
> metric disappears after the job has failed.
> Am i missing something or is this the expected behavior ?
>


Re: Unit testing filter function in flink

2019-12-19 Thread vino yang
Hi Vishwas,

Apache Flink provides some test harness to test your application code on
multiple levels of the testing pyramid.

You can use them to test your UDF. Please see more examples offered by the
official documentation[1].

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

Vishwas Siravara  于2019年12月20日周五 上午6:27写道:

> Hi guys,
> I want to test a function like :
>
> private[flink] def filterStream(dataStream: DataStream[GenericRecord]): 
> DataStream[GenericRecord] = {
>   dataStream.filter(new FilterFunction[GenericRecord] {
> override def filter(value: GenericRecord): Boolean = {
>   if (value == null || value.get(StipFields.requestMessageType) == null) {
> return false;
>   } else {
> 
> ExecutionEnv.messageTypeList.contains(value.get(StipFields.requestMessageType)
>   .toString) && 
> ExecutionEnv.pcrList.contains(value.get(StipFields.pcrList).toString) && 
> (value.get(StipFields
>   .rejectCode).asInstanceOf[Int] == 0) && 
> !(value.get(StipFields.processingCode).toString.equals("33"))
>   }
> }
>   })
> }
>
> How can I do this ?
>
> Best,
> Vishwas
>
>


Re: Can trigger fire early brefore specific element get into ProcessingTimeSessionWindow

2019-12-19 Thread vino yang
Hi Utopia,

Flink provides a high scalability window mechanism.[1]

For your scene, you can customize your window assigner and trigger.

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html

Best,
Vino

Utopia  于2019年12月19日周四 下午5:56写道:

> Hi,
>
> I want to fire and evaluate the ProcessingTimeSessionWindow when a
> specific element come into current window. But I want to exclude the
> specific element when processing window function and remaining it for the
> next evaluation.
>
> Thanks
>
> Best  regards
> Utopia
>


Re: DataStream API min max aggregation on other fields

2019-12-19 Thread vino yang
Hi weizheng,

IMHO, I do not know where is not clear to you? Is the result not correct?
Can you share the correct result based on your understanding?

The "keyBy" specifies group field and min/max do the aggregation in the
other field based on the position you specified.

Best,
Vino

Lu Weizheng  于2019年12月19日周四 下午5:00写道:

> Hi all,
>
> On a KeyedStream, when I use maxBy or minBy, I will get the max or min
> element. It means other fields will be kept as the max or min element. This
> is quite clear. However, when I use max or min, how do Flink do on other
> fields?
>
> val tupleStream = senv.fromElements(
>   (0, 0, 0), (0, 1, 1), (0, 2, 2),
>   (1, 0, 6), (1, 1, 7), (1, 2, 8)
> )
> //  (0,0,0)
> //  (0,0,1)
> //  (0,0,2)
> //  (1,0,6)
> //  (1,0,7)
> //  (1,0,8)
> val maxByStream = tupleStream.keyBy(0).max(2).print()
>
> In this case, the second field use the first element's 0.
>
> class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{
>
>   var isRunning: Boolean = true
>   var i = 0
>
>   val rand = new Random()
>
>   override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {
>
> while (isRunning) {
>
>   // 将数据源收集写入SourceContext
>   srcCtx.collect((0, i, i))
>   i += 1
>   Thread.sleep(1000)
> }
>   }
>
>   override def cancel(): Unit = {
> isRunning = false
>   }
> }
>
> //(0,0,0)
> //(0,1,2)
> //(0,3,4)
> //(0,5,6)
> //(0,7,8)
> //(0,9,10)
>
> val maxWindowStream = senv.addSource(new IntTupleSource)
>   .keyBy(0)
>   .timeWindow(Time.milliseconds(2000))
>   .max(2).print()
>
>
>
> In this case, the result is not so clear...
>
> So, for max and min, the two operator can not make sure the result of
> other fields ?
>
> Thank you so much if anyone can replay.
>
> Weizheng
>


Re: Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-18 Thread vino yang
Hi Mans,

IMO, one job manager represents one Flink cluster and one Flink cluster has
a suite of Flink configuration e.g. metrics reporter.

Some metrics reporters support tag feature, you can specify it to
distinguish different Flink cluster.[1]

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter

Best,
Vino

M Singh  于2019年12月19日周四 上午2:54写道:

> Hi:
>
> I am using AWS EMR with Flink application and two of the job managers are
> running on the same host.  I am looking at the metrics documentation (Apache
> Flink 1.9 Documentation: Metrics
> )
> and and see the following:
>
> Apache Flink 1.9 Documentation: Metrics
>
>
> 
>
>- metrics.scope.jm
>   - Default: .jobmanager
>   - Applied to all metrics that were scoped to a job manager.
>   -
>
> ...
> List of all Variables
> 
>
>- JobManager: 
>- TaskManager: , 
>- Job: , 
>- Task: , , , ,
>
>- Operator: ,, 
>
>
>
> My question is there a way to distinguish b/w the two job managers ? I see
> only the  variable for JobManager and since the two are running on
> the same host, the value is the same.  Is there any other variable that I
> can use to distinguish the two.
>
> For taskmanager I have taskmanager id but am not sure about the job
> manager.
>
> Thanks
>
> Mans
>
>


Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread vino yang
Hi Utopia,

IMO, your analysis is correct.

Best,
Vino

Utopia  于2019年12月19日周四 上午12:44写道:

> Hi Vino,
>
> Maybe it is due to the type of window. What I used is
> ProcessingTimeSessionWindows, while keyedState is scoped to *window and
> key*. Window changes so that the ValueState is different.
>
> Best  regards
> Utopia
> 在 2019年12月18日 +0800 22:30,Utopia ,写道:
>
> Hi Vino,
>
> Thanks for your reply !
>
> The key of my input data is same value. So I think there is only one
> partition.
>
> And Why sometimes I can get the value stored in the ValueState before
> update?
>
> before update value : 3
>>
>> after update value: 4
>>
>>
> What’s more, How can I stored the previous value so that I can get the
> value when next element come in and invoke the onElement method?
>
>
>
> Best  regards
> Utopia
> 在 2019年12月18日 +0800 21:57,vino yang ,写道:
>
> Hi Utopia,
>
> The behavior may be correct.
>
> First, the default value is null. It's the correct value.
> `ValueStateDescriptor` has multiple constructors, some of them can let you
> specify a default value. However, these constructors are deprecated. And
> the doc does not recommend them.[1] For the other constructors which can
> not specify default values, it would be null.
>
> Second, before the window, there is a `keyBy` operation. it will partition
> your data. For each partition, the default value state is null.
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html
>
> Utopia  于2019年12月18日周三 下午7:20写道:
>
>> Hi,
>>
>> I want to get the last value stored in ValueState when processing element
>> in Trigger.
>>
>> But as the log shows that sometimes I can get the value, sometimes not.
>>
>> Only one key in my data(SensorReading).
>>
>> ValueState:
>>
>> class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
>>
>>   private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
>> classOf[Long])
>>
>>   var value = 1
>>
>>   override def onElement( r: SensorReading, timestamp: Long, window: 
>> TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
>>
>> println("before update value : " + 
>> ctx.getPartitionedState(descriptor).value())
>>
>> ctx.getPartitionedState(descriptor).update(value)
>>
>> value += 1
>>
>> println("after update value: " + 
>> ctx.getPartitionedState(descriptor).value())
>>
>> ctx.registerProcessingTimeTimer(window.maxTimestamp)
>> TriggerResult.CONTINUE
>>   }
>>
>>   override def onEventTime(time: Long, window: TimeWindow, ctx: 
>> Trigger.TriggerContext) = TriggerResult.CONTINUE
>>
>>   override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
>> Trigger.TriggerContext) = TriggerResult.FIRE
>>
>>   override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit 
>> = {
>> ctx.deleteProcessingTimeTimer(window.maxTimestamp)
>>   }
>>
>>   override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): 
>> Unit = {
>> val windowMaxTimestamp = window.maxTimestamp
>> if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
>> ctx.registerProcessingTimeTimer(windowMaxTimestamp)
>>   }
>>
>>   override def canMerge: Boolean = true
>>
>> }
>>
>>
>> Main process:
>>
>> object MyCustomWindows {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> env.getConfig.setAutoWatermarkInterval(1000L)
>>
>> val sensorData: DataStream[SensorReading] = env
>>   .addSource(new SensorSource)
>>   .assignTimestampsAndWatermarks(new SensorTimeAssigner)
>>
>> val countsPerThirtySecs = sensorData
>>   .keyBy(_.id)
>>   .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
>>   .trigger(new ProcessingTimeTrigger)
>>   .process(new CountFunction)
>>
>> env.execute()
>>   }
>> }
>>
>>
>> Log results:
>>
>> before update value : null
>> after update value: 1
>> before update value : null
>> after update value: 2
>> before update value : null
>> after update value: 3
>> before update value : 3
>> after update value: 4
>> before update value : null
>> after update value: 5
>> before update value : null
>> after update value: 6
>> before update value : null
>> after update value: 7
>> before update value : null
>> after update value: 8
>> before update value : null
>> after update value: 9
>> before update value : 9
>> after update value: 10
>>
>>
>>
>> Best  regards
>> Utopia
>>
>


Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread vino yang
Hi Utopia,

The behavior may be correct.

First, the default value is null. It's the correct value.
`ValueStateDescriptor` has multiple constructors, some of them can let you
specify a default value. However, these constructors are deprecated. And
the doc does not recommend them.[1] For the other constructors which can
not specify default values, it would be null.

Second, before the window, there is a `keyBy` operation. it will partition
your data. For each partition, the default value state is null.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html

Utopia  于2019年12月18日周三 下午7:20写道:

> Hi,
>
> I want to get the last value stored in ValueState when processing element
> in Trigger.
>
> But as the log shows that sometimes I can get the value, sometimes not.
>
> Only one key in my data(SensorReading).
>
> ValueState:
>
> class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
>
>   private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
> classOf[Long])
>
>   var value = 1
>
>   override def onElement( r: SensorReading, timestamp: Long, window: 
> TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
>
> println("before update value : " + 
> ctx.getPartitionedState(descriptor).value())
>
> ctx.getPartitionedState(descriptor).update(value)
>
> value += 1
>
> println("after update value: " + 
> ctx.getPartitionedState(descriptor).value())
>
> ctx.registerProcessingTimeTimer(window.maxTimestamp)
> TriggerResult.CONTINUE
>   }
>
>   override def onEventTime(time: Long, window: TimeWindow, ctx: 
> Trigger.TriggerContext) = TriggerResult.CONTINUE
>
>   override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
> Trigger.TriggerContext) = TriggerResult.FIRE
>
>   override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = 
> {
> ctx.deleteProcessingTimeTimer(window.maxTimestamp)
>   }
>
>   override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit 
> = {
> val windowMaxTimestamp = window.maxTimestamp
> if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
> ctx.registerProcessingTimeTimer(windowMaxTimestamp)
>   }
>
>   override def canMerge: Boolean = true
>
> }
>
>
> Main process:
>
> object MyCustomWindows {
>
>   def main(args: Array[String]): Unit = {
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(1000L)
>
> val sensorData: DataStream[SensorReading] = env
>   .addSource(new SensorSource)
>   .assignTimestampsAndWatermarks(new SensorTimeAssigner)
>
> val countsPerThirtySecs = sensorData
>   .keyBy(_.id)
>   .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
>   .trigger(new ProcessingTimeTrigger)
>   .process(new CountFunction)
>
> env.execute()
>   }
> }
>
>
> Log results:
>
> before update value : null
> after update value: 1
> before update value : null
> after update value: 2
> before update value : null
> after update value: 3
> before update value : 3
> after update value: 4
> before update value : null
> after update value: 5
> before update value : null
> after update value: 6
> before update value : null
> after update value: 7
> before update value : null
> after update value: 8
> before update value : null
> after update value: 9
> before update value : 9
> after update value: 10
>
>
>
> Best  regards
> Utopia
>


Re: [Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread vino yang
Hi ouywl,

*>>Thread.currentThread().getContextClassLoader();*

What does this statement mean in your program?

In addition, can you share your implementation of the customized file
system plugin and the related exception?

Best,
Vino

ouywl  于2019年12月18日周三 下午4:59写道:

> Hi all,
> We have implemented a filesystem plugin for sink data to hdfs1, and
> the yarn for flink running is used hdfs2. So when the job running, the
> jobmanager use the conf of hdfs1 to create filesystem, the filesystem
> plugin  is conflict with flink component.
> We implemeted step:
>   1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>   2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add
> kerberos auth in ”FileSystemFactoryEnhance"
>   3. Add a service entry. Create a file
> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which
> contains the class name of “ FileSystemFactoryEnhance.class”
>
> And  the job mainclass is :
>“ *public static void main(String[] args) throws Exception{*
>
> *StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();*
>
>
>
>
>
>
>
>
> *env.enableCheckpointing(60*1000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> 
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.getConfig().enableSysoutLogging();Properties props = new 
> Properties();props.put("bootstrap.servers", SERVERS);
> props.put("group.id ", GROUPID);
> props.put("enable.auto.commit", "true");// 
> props.put("auto.commit.interval.ms ", 
> "1000");props.put("session.timeout.ms ", 
> "3");props.put("auto.offset.reset", "latest");
> props.put("key.deserializer", 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
> props.put("value.deserializer", StringDeserializer.class.getName());
> FlinkKafkaConsumer010 consumer011 = new 
> FlinkKafkaConsumer010("zyf_test_2", new SimpleStringSchema(), props); 
>DataStream source = env.addSource(consumer011).setParallelism(1);  
>   source.print();Thread.currentThread().getContextClassLoader();
> StreamingFileSink sink = StreamingFileSink.forRowFormat(new 
> Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8"))  
>   .build();source.addSink(sink);env.execute();}”And start the 
> job, the jobmanager filesystem is error, the log means the jobmananger use 
> “FileSystemFactoryEnhance” filesystem and confict.As the url 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems
>  
> 
>  how to avoid use “Thread.currentThread().getContextClassLoader()"*
>
>
> ouywl
> ou...@139.com
>
> 
>
>


Re: RichAsyncFunction Timeout

2019-12-17 Thread vino yang
Hi  Polarisary,

IMO, firstly, it would be better to monitor the OS and Flink/HBase metrics.
For example:


   - Flink and HBase cluster Network I/O metrics;
   - Flink TM CPU/Memory/Backpressure metrics and so on;

You can view these metrics to find some potential reasons. If you can not
figure it out, you can share these metrics with the community.

Best,
Vino

Polarisary  于2019年12月18日周三 上午11:09写道:

> Hi ALL,
> When I use RichAsyncFunction read data from hbase, it always timeout after
> a few minutes. but the hbase connection is not close, it also can get data
> in the override method timeout.
>
> Following is the code, does somebody know why trigger timeout.
>
> 
>
> AsyncDataStream.unorderedWait(uidDs, new AsyncHBaseRequest(hTableName,
> 
> HBaseConfigurationUtil.serializeConfiguration(hbaseClientConf), hbaseSchema)
> , 5, TimeUnit.MINUTES, 10)
>
>
>
> @Override
> public void timeout(Tuple1 input, ResultFuture Short, Short, Long, Integer, Long>> resultFuture) throws Exception {
>
> Row r = 
> readHelper.parseToRow(table.get(readHelper.createGet("13491261515587439bf2f217")));
> logger.error("Timeout Error, input [{}], conn {}, row [{}]", input.f0, 
> hConnection.isClosed(), r.toString());
> }
>
> @Override
> public void asyncInvoke(Tuple1 input, ResultFuture Short, Short, Long, Integer, Long>> resultFuture) throws Exception {
> FamilyFilter filter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new 
> BinaryComparator(Bytes.toBytes("f1")));
> String rkStart = 
> UserInstallAppLookupTableSource.getLeftRowkeyByUid(input.f0, 0);
> String rkEnd = 
> UserInstallAppLookupTableSource.getLeftRowkeyByUid(input.f0, 9L);
>
>
>
>
> polaris...@gmail.com
>
>
>
>
>


Re: Questions about taskmanager.memory.off-heap and taskmanager.memory.preallocate

2019-12-17 Thread vino yang
Hi Ethan,

Share two things:


   - I have found "taskmanager.memory.preallocate" config option has been
   removed in the master codebase.
   - After researching git history, I found the description of "
   taskmanager.memory.preallocate" was written by @Chesnay Schepler
 (from 1.8 branch). So maybe he can give more
   context or information. Correct me, if I am wrong.

Best,
Vino.

Ethan Li  于2019年12月18日周三 上午10:07写道:

> I didn’t realize we was not chatting in the mailing list :)
>
> I think it’s wrong because it kind of says full GC is triggered by
> reaching MaxDirecMemorySize.
>
>
> On Dec 16, 2019, at 11:03 PM, Xintong Song  wrote:
>
> Glad that helped. I'm also posting this conversation to the public mailing
> list, in case other people have similar questions.
>
> And regarding the GC statement, I think the document is correct.
> - Flink Memory Manager guarantees that the amount of allocated managed
> memory never exceed the configured capacity, thus managed memory allocation
> should not trigger OOM.
> - When preallocation is enabled, managed memory segments are allocated and
> pooled by Flink Memory Manager, no matter there are tasks requesting them
> or not. The segments will not be deallocated until the cluster is shutdown.
> - When preallocation is disabled, managed memory segments are allocated
> only when tasks requesting them, and destroyed immediately when tasks
> return them to the Memory Manager. However, what this statement trying to
> say is that, the memory is not deallocated directly when the memory segment
> is destroyed, but will have to wait until the GC to be truly released.
>
> Thank you~
> Xintong Song
>
>
>
> On Tue, Dec 17, 2019 at 12:30 PM Ethan Li 
> wrote:
>
>> Thank you very much Xintong! It’s much clear to me now.
>>
>> I am still on standalone cluster setup.  Before I was using 350GB on-heap
>> memory on a 378GB box. I saw a lot of swap activities. Now I understand
>> that it’s because RocksDB didn’t have enough memory to use, so OS forces
>> JVM to swap. It can explain why the cluster was not stable and kept
>> crashing.
>>
>> Now that I put 150GB off-heap and 150GB on-heap, the cluster is more
>> stable than before. I thought it was because GC was reduced because now we
>> have less heap memory. Now I understand that it’s because I have 78GB
>> memory available for rocksDB to use, 50GB more than before. And it explains
>> why I don’t see swaps anymore.
>>
>> This makes sense to me now. I just have to set preallocation to false to
>> use the other 150 GB off-heap memory for rocksDB and do some tuning on
>> these memory configs.
>>
>>
>> One thing I noticed is that in
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#taskmanager-memory-preallocate
>>
>>  If this configuration is set to false cleaning up of the allocated
>> off-heap memory happens only when the configured JVM parameter
>> MaxDirectMemorySize is reached by triggering a full GC
>>
>> I think this statement is not correct. GC is not trigged by reaching
>> MaxDirectMemorySize. It will throw "java.lang.OutOfMemoryError: Direct
>> buffer memory” if MaxDirectMemorySize is reached.
>>
>> Thank you again for your help!
>>
>> Best,
>> Ethan
>>
>>
>> On Dec 16, 2019, at 9:44 PM, Xintong Song  wrote:
>>
>> Hi Ethan,
>>
>> When you say "it's doing better than before", what is your setups before?
>> Is it on-heap managed memory? With preallocation enabled or disabled? Also,
>> what deployment (standalone, yarn, or local executor) do you run Flink on?
>> It's hard to tell why the performance becomes better without knowing the
>> information above.
>>
>> Since you are using RocksDB, and configure managed memory to off-heap,
>> you should set pre-allocation to false. Steaming job with RocksDB state
>> backend does not use managed memory at all. Setting managed memory to
>> off-heap only makes Flink to launch JVM with smaller heap space, leaving
>> more space outside JVM. Setting pre-allocation to false makes Flink
>> allocate those managed memory on-demand, and since there's no demand the
>> managed memory will not be allocated. Therefore, the memory space left
>> outside JVM can be fully leveraged by RocksDB.
>>
>> Regarding related source codes, I would recommend the following:
>> - MemoryManager - For how managed memory is allocated / used. Related to
>> pre-allocation.
>> - ContaineredTaskManagerParameters - For how the JVM memory parameters
>> are decided. Related to on-heap / off-heap managed memory.
>> - TaskManagerServices#fromConfiguration - For how different components
>> are created, as well as how their memory sizes are decided. Also related to
>> on-heap / off-heap managed memory.
>>
>> Thank you~
>> Xintong Song
>>
>>
>>
>> On Tue, Dec 17, 2019 at 11:00 AM Ethan Li 
>> wrote:
>>
>>> Thank you Xintong, Vino for taking your time answering my question. I
>>> didn’t know managed memory is only for batch jobs.
>>>
>>>
>>>
>>> I tried to set to use off-heap Flink managed memory 

Re: Fw: Metrics based on data filtered from DataStreamSource

2019-12-16 Thread vino yang
Hi Sideny,

>> I'd prefer not to consume messages I don't plan on actually handling.

It depends on your design. If you produce different types into different
partitions, then it's easy to filter different types from the Kafka
consumer(only consume partial partition).

If you do not distinguish different types in the partitions of the Kafka
topic. You can filter messages based on type in Flink job.

>> I MUST consume the messages, count those I want to filter out and then
simply not handle them?

I did not say "you MUST", I said "you can".

Actually, there are serval solutions.

e.g.
1) I described in the last mail;
2) filter in flink source;
3) filter via flink filter transform function
4) side output/split, selet

Choosing one solution that suite your scene.

The key thing in my last mail is to describe the problem of your reflection
problem.

Best,
Vino

Sidney Feiner  于2019年12月16日周一 下午9:31写道:

> You are right with everything you say!
> The solution you propose is actually what I'm trying to avoid. I'd prefer
> not to consume messages I don't plan on actually handling.
> But from what you say it sounds I have no other choice. Am I right? I MUST
> consume the messages, count those I want to filter out and then simply not
> handle them?
> Which means I must filter them in the task itself and I have no way of
> filtering them directly from the data source?
>
>
> *Sidney Feiner* */* Data Platform Developer
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
> [image: emailsignature]
>
> --
> *From:* vino yang 
> *Sent:* Monday, December 16, 2019 7:56 AM
> *To:* Sidney Feiner 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Fw: Metrics based on data filtered from DataStreamSource
>
> Hi Sidney,
>
> Firstly, the `open` method of UDF's instance is always invoked when the
> task thread starts to run.
>
> From the second code snippet image that you provided, I guess you are
> trying to get a dynamic handler with reflection technology, is
> that correct? I also guess that you want to get a dynamic instance of a
> handler in the runtime, correct me if I am wrong.
>
> IMO, you may misunderstand the program you write and the runtime of Task,
> the purpose of your program is used to build the job graph. The business
> logic in UDF is used to describe the user's business logic.
>
> For your scene, if many types of events exist in one topic, you can
> consume them and group by the type then count them?
>
> Best,
> Vino
>
> Sidney Feiner  于2019年12月16日周一 上午12:54写道:
>
> Hey,
> I have a question about using metrics based on filtered data.
> Basically, I have handlers for many types of events I get from my data
> source (in my case, Kafka), and every handler has it's own filter function.
> That given handler also has a Counter, incrementing every time it filters
> out an event (as part of the FilterFunction).
>
> Problem arrises when I use that FilterFunction on the DataSourceStream -
> the handler's open() function hasn't been called and thus the metrics have
> never been initiated.
> Do I have a way of making this work? Or any other way of counting events
> that have been filtered out from the DataStreamSource?
>
> Handler:
>
> public abstract class Handler extends RichMapFunction {
> private transient Counter filteredCounter;
> private boolean isInit = false;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> if (!isInit) {
> MetricGroup metricGroup = 
> getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
> filteredCounter = 
> metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
> isInit = true;
> }
> }
>
> public final FilterFunction getFilter() {
> return (FilterFunction) event -> {
> boolean res = filter(event);
> if (!res) {
> filteredCounter.inc();
> }
> return res;
> };
> }
>
> abstract protected boolean filter(Event event);
> }
>
>
> And when I init the DataStreamSource:
>
> Handler handler = (Handler) 
> Class.forName(handlerName).getConstructor().newInstance();
> dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);
>
>
> Any help would be much appreciated!
>
> Thanks 
>
>
>
>


Re: Documentation tasks for release-1.10

2019-12-16 Thread vino yang
+1 for centralizing all the documentation issues so that the community can
take more effective to fix them.

Best,
Vino

Xintong Song  于2019年12月16日周一 下午6:02写道:

> Thank you Kostas.
> Big +1 for keeping all the documentation related issues at one place.
>
> I've added the documentation task for resource management.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Dec 16, 2019 at 5:29 PM Kostas Kloudas 
> wrote:
>
>> Hi all,
>>
>> With the feature-freeze for the release-1.10 already past us, it is
>> time to focus a little bit on documenting the new features that the
>> community added to this release, and improving the already existing
>> documentation based on questions that we see in Flink's mailing lists.
>>
>> To this end, I have create an umbrella issue
>> https://issues.apache.org/jira/browse/FLINK-15273 to monitor the
>> pending documentation tasks. This is by no means an exhaustive list of
>> the tasks, so feel free to add more.
>>
>> Having a central place with all these tasks will also allow more
>> easily other members of the community, not necessarily people who
>> implemented the features, to get involved with the project. This is a
>> really helpful way to get in touch with the community and help the
>> project reach even greater audiences, as a feature that is not
>> documented is non-existent to users.
>>
>> Thanks a lot,
>> Kostas
>>
>


Re: Questions about taskmanager.memory.off-heap and taskmanager.memory.preallocate

2019-12-15 Thread vino yang
Hi Ethan,

For now, my suggestion is that you can set "preallocate" to false. The
description(the link provided by you) of "taskmanager.memory.preallocate"
says:

"When taskmanager.memory.off-heap is set to true, then it is advised that
this configuration is also set to true."

Best,
Vino

Ethan Li  于2019年12月14日周六 上午5:20写道:

> Hi Community,
>
> I have a question about the taskmanager.memory.preallocate config in the
> doc
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#taskmanager-memory-preallocate
>
> We have large memory box so as it suggested we should use off heap memory
> for flink managed memory. And the doc then suggests to
> set taskmanager.memory.preallocate to true. However,
>
>  "For streaming setups is is highly recommended to set this value to false
> as the core state backends currently do not use the managed memory."
>
>
> Our flink set up is mainly for streaming jobs so I think the above applies
> to our case. So should I use off-heap with “preallocate" setting to false?
> What would be the impact with these configs?
>
>
> Thank you very much!
>
>
> Best,
> Ethan
>


Re: Fw: Metrics based on data filtered from DataStreamSource

2019-12-15 Thread vino yang
Hi Sidney,

Firstly, the `open` method of UDF's instance is always invoked when the
task thread starts to run.

>From the second code snippet image that you provided, I guess you are
trying to get a dynamic handler with reflection technology, is
that correct? I also guess that you want to get a dynamic instance of a
handler in the runtime, correct me if I am wrong.

IMO, you may misunderstand the program you write and the runtime of Task,
the purpose of your program is used to build the job graph. The business
logic in UDF is used to describe the user's business logic.

For your scene, if many types of events exist in one topic, you can consume
them and group by the type then count them?

Best,
Vino

Sidney Feiner  于2019年12月16日周一 上午12:54写道:

> Hey,
> I have a question about using metrics based on filtered data.
> Basically, I have handlers for many types of events I get from my data
> source (in my case, Kafka), and every handler has it's own filter function.
> That given handler also has a Counter, incrementing every time it filters
> out an event (as part of the FilterFunction).
>
> Problem arrises when I use that FilterFunction on the DataSourceStream -
> the handler's open() function hasn't been called and thus the metrics have
> never been initiated.
> Do I have a way of making this work? Or any other way of counting events
> that have been filtered out from the DataStreamSource?
>
> Handler:
>
> public abstract class Handler extends RichMapFunction {
> private transient Counter filteredCounter;
> private boolean isInit = false;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> if (!isInit) {
> MetricGroup metricGroup = 
> getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
> filteredCounter = 
> metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
> isInit = true;
> }
> }
>
> public final FilterFunction getFilter() {
> return (FilterFunction) event -> {
> boolean res = filter(event);
> if (!res) {
> filteredCounter.inc();
> }
> return res;
> };
> }
>
> abstract protected boolean filter(Event event);
> }
>
>
> And when I init the DataStreamSource:
>
> Handler handler = (Handler) 
> Class.forName(handlerName).getConstructor().newInstance();
> dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);
>
>
> Any help would be much appreciated!
>
> Thanks 
>
>
>
>


Re: TypeInformation problem

2019-12-15 Thread vino yang
Hi Nick,

>From StackOverflow, I see a similar issue which answered by @Till Rohrmann
 . [1]
FYI.

Best,
Vino

[1]:
https://stackoverflow.com/questions/38214958/flink-error-specifying-keys-via-field-positions-is-only-valid-for-tuple-data-ty

Nicholas Walton  于2019年12月14日周六 上午12:01写道:

> I was refactoring some Flink code to use IndexedSeq rather than Array. When I 
> compiled the code I had failures that required according to the URL below the 
> following to be inserted
>
> /*
>  * Type information (see 
> https://stackoverflow.com/questions/37920023/could-not-find-implicit-value-for-evidence-parameter-of-type-org-apache-flink-ap)
>  *
>  * Code when ported to use IndexedSeq rather than Array
>  * and similar refuses to build without this information
>  */
> implicit val typeInfo1 = TypeInformation.of(classOf[(Int, Long, Double, Int)])
> implicit val typeInfo2 = TypeInformation.of(classOf[(Int, Long, Double, 
> Double)])
> implicit val typeInfo3 = TypeInformation.of(classOf[(Int, Long, Double, 
> IndexedSeq[Long])])
> implicit val typeInfo4 = TypeInformation.of(classOf[(Int, Long, Double, 
> IndexedSeq[BigInt])])
> implicit val typeInfo5 = TypeInformation.of(classOf[(Int, Long, Double, 
> IndexedSeq[String])])
> implicit val typeInfo6 = TypeInformation.of(classOf[(String, Int, Long, 
> Double)])
> implicit val typeInfo7 = TypeInformation.of(classOf[(Int, Long, Double, 
> IndexedSeq[String], Int)])
> implicit val typeInfo8 = TypeInformation.of(classOf[(Int, Long, Double, 
> String, Int)])
>
>
> The code now compiles fine, but I now have a problem with the code below,
> which was working perfectly fine before I added the above and made the
> IndexedSeq refactor
>
> val readings: DataStream[(Int, Long, Double, Int)] = stream
>   .flatMap(new splitReadings())
>   .setParallelism(1)
>   .assignTimestampsAndWatermarks(new readingTimstamps)
>   .setParallelism(1)
>
>
> val maxChannelScaled: DataStream[(Int, Long, Double, Double)] = readings
>   .keyBy(0)
>   .countWindow(runmaxWinLen, 1)
>   .process(new runningMax())
>   .setParallelism(2 * env.getParallelism)
>
>
> When I submit the job I find the following in the log
>
> 2019-12-13 15:37:35,600 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> scala.Tuple4 does not contain a setter for field _1
> 2019-12-13 15:37:35,601 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> scala.Tuple4 cannot be used as a POJO type because not all fields are valid
> POJO fields, and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> 2019-12-13 15:37:35,602 ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler   - Unhandled
> exception.
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Specifying keys via field positions is only valid
> for tuple data types. Type: GenericType
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:100)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Specifying
> keys via field positions is only valid for tuple data types. Type:
> GenericType
> at
> org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:232)
> at
> org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:223)
> at
> org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:321)
> at
> org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:392)
> at org.example.Job$.main(Job.scala:99)
> at org.example.Job.main(Job.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> ... 9 more
>
> What is happening, and more importantly how can I fix the 

Re: How to understand create watermark for Kafka partitions

2019-12-13 Thread vino yang
Hi Alex,

>> But why also say created watermark for each Kafka topic partitions ?

IMO, the official documentation has explained the reason. Just copied here:

When using Apache Kafka

as
a data source, each Kafka partition may have a simple event time pattern
(ascending timestamps or bounded out-of-orderness). However, when consuming
streams from Kafka, multiple partitions often get consumed in parallel,
interleaving the events from the partitions and destroying the
per-partition patterns (this is inherent in how Kafka’s consumer clients
work).

In that case, you can use Flink’s Kafka-partition-aware watermark
generation. Using that feature, watermarks are generated inside the Kafka
consumer, per Kafka partition, and the per-partition watermarks are merged
in the same way as watermarks are merged on stream shuffles.


>> As I tested, watermarks also created by global, even I run my job with
parallels. And assign watermarks on Kafka consumer .


Did you follow the official example? Can you share your program?


Best,

Vino

qq <471237...@qq.com> 于2019年12月13日周五 上午9:57写道:

> Hi all,
>
>   I confused with watermark for each Kafka partitions.  As I know
> watermark  created by data stream level. But why also say created watermark
> for each Kafka topic partitions ? As I tested, watermarks also created by
> global, even I run my job with parallels. And assign watermarks on Kafka
> consumer . Thanks .
>
> Below text copied from flink web.
>
>
> you can use Flink’s Kafka-partition-aware watermark generation. Using that
> feature, watermarks are generated inside the Kafka consumer, per Kafka
> partition, and the per-partition watermarks are merged in the same way as
> watermarks are merged on stream shuffles.
>
> For example, if event timestamps are strictly ascending per Kafka
> partition, generating per-partition watermarks with the ascending
> timestamps watermark generator
> 
>  will
> result in perfect overall watermarks.
>
> The illustrations below show how to use the per-Kafka-partition watermark
> generation, and how watermarks propagate through the streaming dataflow in
> that case.
>
>
>
> Thanks
> Alex Fu
>


Re: State Processor API: StateMigrationException for keyed state

2019-12-12 Thread vino yang
Hi pwestermann,

Can you share the relevant detailed exception message?

Best,
Vino

pwestermann  于2019年12月13日周五 上午2:00写道:

> I am trying to get the new State Processor API but I am having trouble with
> keyed state (this is for Flink 1.9.1 with RocksDB on S3 as the backend).
> I can read keyed state for simple key type such as Strings but whenever I
> tried to read state with a more complex key type - such as a named tuple
> type (for example ), I get a StateMigrationException:
>
>
>
> Any idea what could be wrong?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink client trying to submit jobs to old session cluster (which was killed)

2019-12-12 Thread vino yang
Hi Pankaj,

Can you tell us what's Flink version do you use?  And can you share the
Flink client and job manager log with us?

This information would help us to locate your problem.

Best,
Vino

Pankaj Chand  于2019年12月12日周四 下午7:08写道:

> Hello,
>
> When using Flink on YARN in session mode, each Flink job client would
> automatically know the YARN cluster to connect to. It says this somewhere
> in the documentation.
>
> So, I killed the Flink session cluster by simply killing the YARN
> application using the "yarn kill" command. However, when starting a new
> Flink session cluster and trying to submit Flink jobs to yarn-session,
> Flink complains that the old cluster (it gave the port number and YARN
> application ID) is not available.
>
> It seems like the details of the old cluster were still stored somewhere
> in Flink. So, I had to completely replace the Flink folder with a new one.
>
> Does anyone know the proper way to kill a Flink+YARN session cluster to
> completely remove it so that jobs will get submitted to a new Flink session
> cluster?
>
> Thanks,
>
> Pankaj
>


Re: Processing Events by custom rules kept in Broadcast State

2019-12-10 Thread vino yang
Hi KristoffSC,

It seems the main differences are when to parse your rules and what could
be put into the broadcast state.

IMO, multiple solutions all can take effect. I prefer option 3. I'd like to
parse the rules ASAP and let them be real rule event stream (not ruleset
stream) in the source. Then doing the real parse in the
processBroadcastElement.

In short, it's my personal opinion.

Best,
Vino

KristoffSC  于2019年12月11日周三 上午6:26写道:

> Hi,
> I think this would be the very basic use case for Broadcast State Pattern
> but I would like to know what are the best approaches to solve this
> problem.
>
> I have an operator that extends BroadcastProcessFunction. The
> brodcastElement is an element sent as Json format message by Kafka. It
> describes a processing rules like key/value mapping, like so: ruleName -
> ruleValue (both strings).
>
> In processElement method I'm delegating to my custom RuleEngineService. It
> is a class that has the "rule engine" logic and accepts received event and
> "set of processing rules" in some form.
>
> What would be the best approaches:
> 1. Keep original Json String in broadcast state. Whenever there is a new
> set
> of rules streamed by Kafka, then in processBroadcastElement method parse
> this Json, map to some RuleParams abstraction and keep it as transient
> field
> in my BroadcastProcessFunction operator. Save Json in broadcast state. Pass
> RuleParams to rule engine service.
>
> 2. Same as 1 but instead keeping Raw Json String in broadcast state, keep
> already parsed JsonObject, somethign like ObjectNode from KafkaConnector
> lib.
>
> 3. Keep each pair of ruleName - ruleValue (both strings) separate in
> broadcast state. In processBrodcastElement method parse the received Json
> and update the state. In processElement method take all rules, build
> RulePArams object (basically a map) and pass them to rule engine
>
> 4. Parse Json in processBroadcastElement method, map it to RuleParams
> abstraction method, keeping rules in a hashMap and keep this RulePrams in
> broadcast state
>
> 5. any other...
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-10 Thread vino yang
Hi kristoffSC,

>> I've noticed that all methods are called by the same thread. Would it be
always the case, or could those methods be called by different threads?

No, open/processXXX/close methods are called in the different stages of a
task thread's life cycle. The framework must keep the call order.

>> The second thing I've noticed is that "open" method was executed only
before
the first "fast stream" element was received (before execution of
processElement method). That means that if I received the control stream
element first (the broadcast stream element) then method open would not be
called and I will not initialize my processing rule descriptor - I will
loose the event.

There is a similar question I joined that you can consider.[1]
There is also another similar question that comes from StackOverflow.[2]

Best,
Vino

[1]:
http://mail-archives.apache.org/mod_mbox/flink-user/201911.mbox/%3CCAArWwf4jmbaFeizO_YBZVBAMyiuvV95DetoVCkj4rJi4PYorpQ%40mail.gmail.com%3E
[2]:
https://stackoverflow.com/questions/54748158/how-could-flink-broadcast-state-be-initialized


KristoffSC  于2019年12月11日周三 上午5:56写道:

> Hi,
> I was playing around with BroadcastProcessFunction and I've observe a
> specific behavior.
>
> My setup:
>
> MapStateDescriptor ruleStateDescriptor = new
> MapStateDescriptor<>(
> "RulesBroadcastState",
> Types.VOID,
> TypeInformation.of(new TypeHint() {
> }));
>
> BroadcastStream processingRulesBroadcastStream =
> processingRulesStream
>.broadcast(ruleStateDescriptor);
>
>
> SingleOutputStreamOperator evaluatedTrades =
> enrichedTransactionStream
> .connect(processingRulesBroadcastStream)
> .process(new DriveEngineRuleOperator())
> .name("Drive Rule Evaluation");
>
> Where DriveEngineRuleOperator extends BroadcastProcessFunction and
> implements open, processElement and processBroadcastElement methods.
>
> I was following Flink's tutorials about broadcast state pattern and my
> "open" method looks like this:
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> processingRulesDesc = new MapStateDescriptor<>(
> "RulesBroadcastState",
> Types.VOID,
> TypeInformation.of(new TypeHint() {
> }));
>
>
> }
>
>
> I've noticed that all methods are called by the same thread. Would it be
> always the case, or could those methods be called by different threads?
>
> The second thing I've noticed is that "open" method was executed only
> before
> the first "fast stream" element was received (before execution of
> processElement method). That means that if I received the control stream
> element first (the broadcast stream element) then method open would not be
> called and I will not initialize my processing rule descriptor - I will
> loose the event.
>
> What are the good practices in this case?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink ML feature

2019-12-10 Thread vino yang
Hi Benoit,

I can only try to ping @Till Rohrmann  @Kurt Young
  who may know more information to answer this question.

Best,
Vino

Benoît Paris  于2019年12月10日周二 下午7:06写道:

> Is there any information as to whether Alink is going to be contributed to
> Apache Flink as the official ML Lib?
>
>
> On Tue, Dec 10, 2019 at 7:11 AM vino yang  wrote:
>
>> Hi Chandu,
>>
>> AFAIK, there is a project named Alink[1] which is the Machine Learning
>> algorithm platform based on Flink, developed by the PAI team of Alibaba
>> computing platform. FYI
>>
>> Best,
>> Vino
>>
>> [1]: https://github.com/alibaba/Alink
>>
>> Tom Blackwood  于2019年12月10日周二 下午2:07写道:
>>
>>> You may try Spark ML, which is a production ready library for ML stuff.
>>>
>>> regards.
>>>
>>> On Tue, Dec 10, 2019 at 1:04 PM chandu soa  wrote:
>>>
>>>> Hello Community,
>>>>
>>>> Can you please give me some pointers for implementing Machine Learning
>>>> using Flink.
>>>>
>>>> I see Flink ML libraries were dropped in v1.9. It looks like ML feature
>>>> in Flink going to be enhanced.
>>>>
>>>> What is the recommended approach for implementing production grade ML
>>>> based apps using Flink? v1.9 is ok?or should wait for 1.10?
>>>>
>>>> Thanks,
>>>> Chandu
>>>>
>>>
>
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml
>


Re: Flink ML feature

2019-12-09 Thread vino yang
Hi Chandu,

AFAIK, there is a project named Alink[1] which is the Machine Learning
algorithm platform based on Flink, developed by the PAI team of Alibaba
computing platform. FYI

Best,
Vino

[1]: https://github.com/alibaba/Alink

Tom Blackwood  于2019年12月10日周二 下午2:07写道:

> You may try Spark ML, which is a production ready library for ML stuff.
>
> regards.
>
> On Tue, Dec 10, 2019 at 1:04 PM chandu soa  wrote:
>
>> Hello Community,
>>
>> Can you please give me some pointers for implementing Machine Learning
>> using Flink.
>>
>> I see Flink ML libraries were dropped in v1.9. It looks like ML feature
>> in Flink going to be enhanced.
>>
>> What is the recommended approach for implementing production grade ML
>> based apps using Flink? v1.9 is ok?or should wait for 1.10?
>>
>> Thanks,
>> Chandu
>>
>


Re: KeyBy/Rebalance overhead?

2019-12-09 Thread vino yang
Hi Komal,

Actually, the main factor about choosing the type of the partition depends
on your business logic. If you want to do some aggregation logic based on a
group. You must choose KeyBy to guarantee the correctness semantics.

Best,
Vino

Komal Mariam  于2019年12月9日周一 下午5:07写道:

> Thank you @vino yang   for the reply. I suspect
> keyBy will beneficial in those cases where my subsequent operators are
> computationally intensive. Their computation time being > than network
> reshuffling cost.
>
> Regards,
> Komal
>
> On Mon, 9 Dec 2019 at 15:23, vino yang  wrote:
>
>> Hi Komal,
>>
>> KeyBy(Hash Partition, logically partition) and rebalance(physical
>> partition) are both one of the partitions been supported by Flink.[1]
>>
>> Generally speaking, partitioning may cause network communication(network
>> shuffles) costs which may cause more time cost. The example provided by you
>> may be benefit from operator chain[2] if you remove the keyBy operation.
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>>
>> Komal Mariam  于2019年12月9日周一 上午9:11写道:
>>
>>> Anyone?
>>>
>>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam 
>>> wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> I want to get some insights on the KeyBy (and Rebalance) operations as
>>>> according to my understanding they partition our tasks over the defined
>>>> parallelism and thus should make our pipeline faster.
>>>>
>>>> I am reading a topic which contains 170,000,000 pre-stored records with
>>>> 11 Kafka partitions and replication factor of 1.   Hence I use
>>>> .setStartFromEarliest() to read the stream.
>>>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
>>>> and 1 job manager with 6 cores. (10 task slots per TM hence I set
>>>> environment parallelism to 30).
>>>>
>>>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>>>> keeping the number of records fixed to get a handle on how fast they're
>>>> being processed.
>>>>
>>>> When I remove keyBy, I get the same results in 39 secs as opposed to 52
>>>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
>>>> below I still get the same extra overhead of 9 to 13secs. My data is mostly
>>>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>>>> likewise has the same latency as keyBy.
>>>>
>>>>  What I want to know is what may be causing this overhead? And is there
>>>> any way to decrease it?
>>>>
>>>> Here's the script I'm running for testing purposes:
>>>> --
>>>> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
>>>> new
>>>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>>>
>>>> DataStream myPoints = JSONStream.map(new jsonToPoint());
>>>>
>>>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>>>
>>>> public class findDistancefromPOI extends RichFilterFunction {
>>>> public boolean filter(Point input) throws Exception {
>>>> Double distance = computeEuclideanDist(
>>>> 16.4199  , 89.974  ,input.X(),input.Y);
>>>>  return distance > 0;
>>>> }
>>>> }
>>>>
>>>> Best Regards,
>>>> Komal
>>>>
>>>


Re: Sample Code for querying Flink's default metrics

2019-12-09 Thread vino yang
Hi Pankaj,

> Is there any sample code for how to read such default metrics?  Is there
any way to query the default metrics, such as CPU usage and Memory, without
using REST API or Reporters?

What's your real requirement? Can you use code to call REST API?  Why does
it not match your requirements?

> Additionally, how do I query Backpressure using code, or is it still only
visually available via the dashboard UI? Consequently, is there any way to
infer Backpressure by querying one (or more) of the Memory metrics of the
TaskManager?

The backpressure is related to not only memory metrics but also IO and
network metrics, for more details about measure backpressure please see
this blog.[1][2]

[1]: https://flink.apache.org/2019/06/05/flink-network-stack.html
[2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html

Best,
Vino

Pankaj Chand  于2019年12月9日周一 下午12:07写道:

> Hello,
>
> Using Flink on Yarn, I could not understand the documentation for how to
> read the default metrics via code. In particular, I want to read
> throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and
> Memory.
>
> Is there any sample code for how to read such default metrics?  Is there
> any way to query the default metrics, such as CPU usage and Memory, without
> using REST API or Reporters?
>
> Additionally, how do I query Backpressure using code, or is it still only
> visually available via the dashboard UI? Consequently, is there any way to
> infer Backpressure by querying one (or more) of the Memory metrics of the
> TaskManager?
>
> Thank you,
>
> Pankaj
>


Re: Need help using AggregateFunction instead of FoldFunction

2019-12-08 Thread vino yang
Hi dev,

The time of the window may have different semantics.
In the session window, it's only a time gap, the size of the window is
driven via activity events.
In the tumbling or sliding window, it means the size of the window.

For more details, please see the official documentation.[1]

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows



devinbost  于2019年12月6日周五 下午10:39写道:

> I think there might be a bug in
> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
>  (unless I'm just not using it correctly) because I'm able to get output
> when I use the simpler window
> `.timeWindow(Time.seconds(5))`
> However, I don't get any output when I used the session-based window.
>
>
> devinbost wrote
> > I added logging statements everywhere in my code, and I'm able to see my
> > message reach the `add` method in the AggregateFunction that I
> > implemented,
> > but the getResult method is never called.
> >
> > In the code below, I also never see the:
> >  "Ran dataStream. Adding sink next"
> > line appear in my log, and the only log statements from the
> > JsonConcatenator
> > class come from the `add` method, as shown below.
> >
> >
> > DataStream
> > 
> >  combinedEnvelopes = dataStream
> > .map(new MapFunctionString, Tuple2lt;String, String>() {
> > @Override
> > public Tuple2 map(String incomingMessage) throws Exception {
> > return mapToTuple(incomingMessage);
> > }
> > })
> > .keyBy(0)
> > .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
> > .aggregate(new JsonConcatenator());
> >
> > Logger logger = LoggerFactory.getLogger(StreamJob.class);
> > logger.info("Ran dataStream. Adding sink next")
> >
> > -
> >
> > private static class JsonConcatenator
> > implements AggregateFunctionTuple2lt;String, String,
> > Tuple2String, String, String> {
> > Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
> > @Override
> > public Tuple2String, String createAccumulator() {
> > return new Tuple2String, String("","");
> > }
> >
> > @Override
> > public Tuple2String, String add(Tuple2String, String
> > value,
> > Tuple2String, String accumulator) {
> > logger.info("Running Add on value.f0: " + value.f0 + " and
> > value.f1:
> > " + value.f1);
> > return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
> > }
> >
> > @Override
> > public String getResult(Tuple2String, String accumulator) {
> > logger.info("Running getResult on accumulator.f1: " +
> > accumulator.f1);
> > return "[" + accumulator.f1 + "]";
> > }
> >
> > @Override
> > public Tuple2String, String merge(Tuple2String,
> String
> > a,
> > Tuple2String, String b) {
> > logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
> > a.f1
> > + " and b.f1: " + b.f1);
> > return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
> > }
> > }
> >
> >
> >
> >
> > Any ideas?
> >
> >
> > Chris Miller-2 wrote
> >> I hit the same problem, as far as I can tell it should be fixed in
> >> Pulsar 2.4.2. The release of this has already passed voting so I hope
> it
> >> should be available in a day or two.
> >>
> >> https://github.com/apache/pulsar/pull/5068
> >
> >
> >
> >
> >
> > --
> > Sent from:
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: KeyBy/Rebalance overhead?

2019-12-08 Thread vino yang
Hi Komal,

KeyBy(Hash Partition, logically partition) and rebalance(physical
partition) are both one of the partitions been supported by Flink.[1]

Generally speaking, partitioning may cause network communication(network
shuffles) costs which may cause more time cost. The example provided by you
may be benefit from operator chain[2] if you remove the keyBy operation.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains

Komal Mariam  于2019年12月9日周一 上午9:11写道:

> Anyone?
>
> On Fri, 6 Dec 2019 at 19:07, Komal Mariam  wrote:
>
>> Hello everyone,
>>
>> I want to get some insights on the KeyBy (and Rebalance) operations as
>> according to my understanding they partition our tasks over the defined
>> parallelism and thus should make our pipeline faster.
>>
>> I am reading a topic which contains 170,000,000 pre-stored records with
>> 11 Kafka partitions and replication factor of 1.   Hence I use
>> .setStartFromEarliest() to read the stream.
>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
>> and 1 job manager with 6 cores. (10 task slots per TM hence I set
>> environment parallelism to 30).
>>
>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>> keeping the number of records fixed to get a handle on how fast they're
>> being processed.
>>
>> When I remove keyBy, I get the same results in 39 secs as opposed to 52
>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
>> below I still get the same extra overhead of 9 to 13secs. My data is mostly
>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>> likewise has the same latency as keyBy.
>>
>>  What I want to know is what may be causing this overhead? And is there
>> any way to decrease it?
>>
>> Here's the script I'm running for testing purposes:
>> --
>> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
>> new
>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>
>> DataStream myPoints = JSONStream.map(new jsonToPoint());
>>
>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>
>> public class findDistancefromPOI extends RichFilterFunction {
>> public boolean filter(Point input) throws Exception {
>> Double distance = computeEuclideanDist(
>> 16.4199  , 89.974  ,input.X(),input.Y);
>>  return distance > 0;
>> }
>> }
>>
>> Best Regards,
>> Komal
>>
>


Re: Need help using AggregateFunction instead of FoldFunction

2019-12-04 Thread vino yang
Hi devinbost,

Sharing two example links with you :


   - the example code of official documentation[1];
   - a StackOverflow answer of a similar question[2];

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#aggregatefunction
[2]:
https://stackoverflow.com/questions/47123785/flink-how-to-convert-the-deprecated-fold-to-aggregrate

I hope these resources are helpful to you.

Best,
Vino

devinbost  于2019年12月5日周四 上午9:38写道:

> Hi,
>
> In my use case, I am attempting to create a keyedStream (on a string) and
> then window that stream (which represents keyed JSON objects) with
> EventTimeSessionWindows (so that I have a separate window for each set of
> JSON messages, according to the key), and then concatenate the JSON objects
> by their keys. (e.g. If message1, message2, and message3 all have the same
> key, they should be concatenated to a JSON array like: [message1,message2,
> message3].)
>
> I think my code expresses my intent conceptually, but I learned that Fold
> is
> deprecated because it can't perform partial aggregations. Instead, I need
> to
> use the AggregateFunction, but I'm having trouble understanding the API
> documentation. How do I convert this code to an implementation that uses
> the
> AggregateFunction instead?
>
> DataStream combinedEnvelopes = dataStream
> .map(new MapFunction>() {
> @Override
> public Tuple2 map(String incomingMessage) throws Exception {
> return mapToTuple(incomingMessage);
> }
> })
> .keyBy(0)
> .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
> .fold("[", new FoldFunction, String>() {
> @Override
> public String fold(String concatenatedJsonArray, Tuple2
> incomingMessage) {
> return concatenatedJsonArray + ", " +
> incomingMessage.f1.toString();
> }
> })
> .map(new MapFunction() {
> @Override
> public String map(String jsonPartialArray) throws Exception {
> return jsonPartialArray + "]";
> }
> })
> .returns(String.class);
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [DISCUSS] Drop Kafka 0.8/0.9

2019-12-04 Thread vino yang
+1

jincheng sun  于2019年12月5日周四 上午10:26写道:

> +1  for drop it, and Thanks for bring up this discussion Chesnay!
>
> Best,
> Jincheng
>
> Jark Wu  于2019年12月5日周四 上午10:19写道:
>
>> +1 for dropping, also cc'ed user mailing list.
>>
>>
>> Best,
>> Jark
>>
>> On Thu, 5 Dec 2019 at 03:39, Konstantin Knauf 
>> wrote:
>>
>> > Hi Chesnay,
>> >
>> > +1 for dropping. I have not heard from any user using 0.8 or 0.9 for a
>> long
>> > while.
>> >
>> > Cheers,
>> >
>> > Konstantin
>> >
>> > On Wed, Dec 4, 2019 at 1:57 PM Chesnay Schepler 
>> > wrote:
>> >
>> > > Hello,
>> > >
>> > > What's everyone's take on dropping the Kafka 0.8/0.9 connectors from
>> the
>> > > Flink codebase?
>> > >
>> > > We haven't touched either of them for the 1.10 release, and it seems
>> > > quite unlikely that we will do so in the future.
>> > >
>> > > We could finally close a number of test stability tickets that have
>> been
>> > > lingering for quite a while.
>> > >
>> > >
>> > > Regards,
>> > >
>> > > Chesnay
>> > >
>> > >
>> >
>> > --
>> >
>> > Konstantin Knauf | Solutions Architect
>> >
>> > +49 160 91394525
>> >
>> >
>> > Follow us @VervericaData Ververica 
>> >
>> >
>> > --
>> >
>> > Join Flink Forward  - The Apache Flink
>> > Conference
>> >
>> > Stream Processing | Event Driven | Real Time
>> >
>> > --
>> >
>> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> >
>> > --
>> > Ververica GmbH
>> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> > (Tony) Cheng
>> >
>>
>


Re: Building with Hadoop 3

2019-12-04 Thread vino yang
Hi Marton,

Thanks for your explanation. Personally, I look forward to your
contribution!

Best,
Vino

Márton Balassi  于2019年12月4日周三 下午5:15写道:

> Wearing my Cloudera hat I can tell you that we have done this exercise for
> our distros of the  3.0 and 3.1 Hadoop versions. We have not contributed
> these back just yet, but we are open to do so. If the community is
> interested we can contribute those changes back to flink-shaded and suggest
> the necessay changes to flink too. The task was not overly complex, but it
> certainly involved a bit of dependency hell. :-)
>
> Right now we are focused on internal timelines, but we could invest into
> contributing this back in the end of January timeframe if the community
> deems this a worthwhile effort.
>
> Best,
> Marton
>
> On Wed, Dec 4, 2019 at 10:00 AM Chesnay Schepler 
> wrote:
>
>> There's no JIRA and no one actively working on it. I'm not aware of any
>> investigations on the matter; hence the first step would be to just try it
>> out.
>>
>> A flink-shaded artifact isn't a hard requirement; Flink will work with
>> any 2.X hadoop distribution (provided that there aren't any dependency
>> clashes).
>>
>> On 03/12/2019 18:22, Foster, Craig wrote:
>>
>> Hi:
>>
>> I don’t see a JIRA for Hadoop 3 support. I see a comment on a JIRA here
>> from a year ago that no one is looking into Hadoop 3 support [1]. Is there
>> a document or JIRA that now exists which would point to what needs to be
>> done to support Hadoop 3? Right now builds with Hadoop 3 don’t work
>> obviously because there’s no flink-shaded-hadoop-3 artifacts.
>>
>>
>>
>> Thanks!
>>
>> Craig
>>
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11086
>>
>>
>>
>>
>>


Re: Auto Scaling in Flink

2019-12-03 Thread vino yang
Hi Akash,

The key difference between Pravega and Kafka is: Kafka is a messaging
system, while Pravega is a streaming system.[1] The official documentation
also statements their difference in their faq page.[2]

[1]:
https://siliconangle.com/2017/04/17/dell-emc-takes-on-streaming-storage-with-open-source-solution-pravega-ffsf17/
[2]: http://www.pravega.io/faq.html

Best,
Vino

Akash Goel  于2019年12月4日周三 下午12:00写道:

> Hi,
>
> If my application is already running on Kafka, then do I need to replace
> with Pravega or can Pravega read directly from Kafka?
>
> I have also reached out to to Pravega Community but just checking here.
>
> Thanks,
> Akash Goel
>
> On Fri, Nov 29, 2019 at 11:14 AM Caizhi Weng  wrote:
>
>> Hi Akash,
>>
>> Flink doesn't support auto scaling in core currently, it may be supported
>> in the future, when the new scheduling architecture is implemented
>> https://issues.apache.org/jira/browse/FLINK-10407 .
>>
>> You can do it externally by cancel the job with a savepoint, update the
>> parallelism, and restart the job, according to the rate of data. like what
>> pravega suggests in the doc:
>> http://pravega.io/docs/latest/key-features/#auto-scaling.
>>
>> vino yang  于2019年11月29日周五 上午11:12写道:
>>
>>> Hi Akash,
>>>
>>> You can use Pravega connector to integrate with Flink, the source code
>>> is here[1].
>>>
>>> In short, relying on its rescalable state feature[2] flink supports
>>> scalable streaming jobs.
>>>
>>> Currently, the mainstream solution about auto-scaling is Flink + K8S, I
>>> can share some resources with you[3].
>>>
>>> Best,
>>> Vino
>>>
>>> [1]: https://github.com/pravega/flink-connectors
>>> [2]:
>>> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>>> [3]:
>>> https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2019-scaling-a-realtime-streaming-warehouse-with-apache-flink-parquet-and-kubernetes-aditi-verma-ramesh-shanmugam
>>>
>>> Akash Goel  于2019年11月29日周五 上午9:52写道:
>>>
>>>> Hi,
>>>>
>>>> Does Flunk support auto scaling. I read that it is supported using
>>>> pravega? Is it incorporated in any version.
>>>>
>>>> Thanks,
>>>> Akash Goel
>>>>
>>>


Re: Building with Hadoop 3

2019-12-03 Thread vino yang
cc @Chesnay Schepler  to answer this question.

Foster, Craig  于2019年12月4日周三 上午1:22写道:

> Hi:
>
> I don’t see a JIRA for Hadoop 3 support. I see a comment on a JIRA here
> from a year ago that no one is looking into Hadoop 3 support [1]. Is there
> a document or JIRA that now exists which would point to what needs to be
> done to support Hadoop 3? Right now builds with Hadoop 3 don’t work
> obviously because there’s no flink-shaded-hadoop-3 artifacts.
>
>
>
> Thanks!
>
> Craig
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-11086
>
>
>


Re: [DISCUSS] Drop RequiredParameters and OptionType

2019-12-03 Thread vino yang
+1,

One concern: these two classes are marked with `@publicEvolving`
annotation.
Shall we mark them with `@Deprecated` annotation firstly?

Best,
Vino

Dian Fu  于2019年12月3日周二 下午8:56写道:

> +1 to remove them. It seems that we should also drop the class Option as
> it's currently only used in RequiredParameters.
>
> 在 2019年12月3日,下午8:34,Robert Metzger  写道:
>
> +1 on removing it.
>
> On Tue, Dec 3, 2019 at 12:31 PM Stephan Ewen  wrote:
>
>> I just stumbled across these classes recently and was looking for sample
>> uses.
>> No examples and other tests in the code base seem to
>> use RequiredParameters and OptionType.
>>
>> They also seem quite redundant with how ParameterTool itself works
>> (tool.getRequired()).
>>
>> Should we drop them, in an attempt to reduce unnecessary code and
>> confusion for users (multiple ways to do the same thing)? There are also
>> many better command line parsing libraries out there, this seems like
>> something we don't need to solve in Flink.
>>
>> Best,
>> Stephan
>>
>
>


Re: Access to CheckpointStatsCounts

2019-12-02 Thread vino yang
Hi min,

If it is only for monitoring purposes, you can just use checkpoint REST
API[1] to do this work.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints

Best,
Vino

 于2019年12月2日周一 下午5:01写道:

> Hi,
>
>
>
> Just wonder how to access the CheckpointStatsCoutns from the main method
> of a job?
>
>
>
> We need to detect if a job recovers from a checkpoint or starts from an
> empty status.
>
>
>
> Regards,
>
>
>
> Min
>


Re: Firing timers on ProcessWindowFunction

2019-12-02 Thread vino yang
Hi Avi,

Firstly, let's clarify that the "timer" you said is the timer of the
window? Or a timer you want to register to trigger some action?

Best,
Vino


Avi Levi  于2019年12月2日周一 下午4:11写道:

> Hi,
> Is there a way to fire timer in a ProcessWindowFunction ? I would like to
> mutate the global state on a timely basis.
>
>


Re: Read multiline JSON/XML

2019-12-01 Thread vino yang
Also, say sorry to Flavio!

Best,
Vino

vino yang  于2019年12月2日周一 上午10:29写道:

> Hi Chesnay,
>
> Sorry, yes, I lost the "like" keyword. I mistakenly thought he wanted to
> ask how to use Spark to accomplish this job.
>
> Best,
> Vino
>
> Chesnay Schepler  于2019年11月29日周五 下午10:01写道:
>
>> Why vino?
>>
>> He's specifically asking whether Flink offers something _like_ spark.
>>
>> On 29/11/2019 14:39, vino yang wrote:
>>
>> Hi Flavio,
>>
>> IMO, it would take more effect to ask this question in the Spark user
>> mailing list.
>>
>> WDYT?
>>
>> Best,
>> Vino
>>
>> Flavio Pompermaier  于2019年11月29日周五 下午7:09写道:
>>
>>> Hi to all,
>>> is there any out-of-the-box option to read multiline JSON or XML like in
>>> Spark?
>>> It would be awesome to have something like
>>>
>>> spark.read .option("multiline", true) .json("/path/to/user.json")
>>>
>>> Best,
>>> Flavio
>>>
>>
>>


Re: Read multiline JSON/XML

2019-11-29 Thread vino yang
Hi Flavio,

IMO, it would take more effect to ask this question in the Spark user
mailing list.

WDYT?

Best,
Vino

Flavio Pompermaier  于2019年11月29日周五 下午7:09写道:

> Hi to all,
> is there any out-of-the-box option to read multiline JSON or XML like in
> Spark?
> It would be awesome to have something like
>
> spark.read .option("multiline", true) .json("/path/to/user.json")
>
> Best,
> Flavio
>


Re: Auto Scaling in Flink

2019-11-28 Thread vino yang
Hi Akash,

You can use Pravega connector to integrate with Flink, the source code is
here[1].

In short, relying on its rescalable state feature[2] flink supports
scalable streaming jobs.

Currently, the mainstream solution about auto-scaling is Flink + K8S, I can
share some resources with you[3].

Best,
Vino

[1]: https://github.com/pravega/flink-connectors
[2]:
https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
[3]:
https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2019-scaling-a-realtime-streaming-warehouse-with-apache-flink-parquet-and-kubernetes-aditi-verma-ramesh-shanmugam

Akash Goel  于2019年11月29日周五 上午9:52写道:

> Hi,
>
> Does Flunk support auto scaling. I read that it is supported using
> pravega? Is it incorporated in any version.
>
> Thanks,
> Akash Goel
>


Re: JobGraphs not cleaned up in HA mode

2019-11-28 Thread vino yang
Hi,

Why do you not use HDFS directly?

Best,
Vino

曾祥才  于2019年11月28日周四 下午6:48写道:

>
> anyone have the same problem? pls help, thks
>
>
>
> -- 原始邮件 --
> *发件人:* "曾祥才";
> *发送时间:* 2019年11月28日(星期四) 下午2:46
> *收件人:* "Vijay Bhaskar";
> *抄送:* "User-Flink";
> *主题:* 回复: JobGraphs not cleaned up in HA mode
>
> the config  (/flink is the NASdirectory ):
>
> jobmanager.rpc.address: flink-jobmanager
> taskmanager.numberOfTaskSlots: 16
> web.upload.dir: /flink/webUpload
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> jobmanager.heap.size: 1024m
> taskmanager.heap.size: 1024m
> high-availability: zookeeper
> high-availability.cluster-id: /cluster-test
> high-availability.storageDir: /flink/ha
> high-availability.zookeeper.quorum: :2181
> high-availability.jobmanager.port: 6123
> high-availability.zookeeper.path.root: /flink/risk-insight
> high-availability.zookeeper.path.checkpoints: /flink/zk-checkpoints
> state.backend: filesystem
> state.checkpoints.dir: file:///flink/checkpoints
> state.savepoints.dir: file:///flink/savepoints
> state.checkpoints.num-retained: 2
> jobmanager.execution.failover-strategy: region
> jobmanager.archive.fs.dir: file:///flink/archive/history
>
>
>
> -- 原始邮件 --
> *发件人:* "Vijay Bhaskar";
> *发送时间:* 2019年11月28日(星期四) 下午3:12
> *收件人:* "曾祥才";
> *抄送:* "User-Flink";
> *主题:* Re: JobGraphs not cleaned up in HA mode
>
> Can you share the flink configuration once?
>
> Regards
> Bhaskar
>
> On Thu, Nov 28, 2019 at 12:09 PM 曾祥才  wrote:
>
>> if i clean the zookeeper data , it runs fine .  but next time when the
>> jobmanager failed and redeploy the error occurs again
>>
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Vijay Bhaskar";
>> *发送时间:* 2019年11月28日(星期四) 下午3:05
>> *收件人:* "曾祥才";
>> *主题:* Re: JobGraphs not cleaned up in HA mode
>>
>> Again it could not find the state store file: "Caused by:
>> java.io.FileNotFoundException: /flink/ha/submittedJobGraph0c6bcff01199 "
>>  Check why its unable to find.
>> Better thing is: Clean up zookeeper state and check your configurations,
>> correct them and restart cluster.
>> Otherwise it always picks up corrupted state from zookeeper and it will
>> never restart
>>
>> Regards
>> Bhaskar
>>
>> On Thu, Nov 28, 2019 at 11:51 AM 曾祥才  wrote:
>>
>>> i've made a misstake( the log before is another cluster) . the full
>>> exception log is :
>>>
>>>
>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>> Recovering all persisted jobs.
>>> 2019-11-28 02:33:12,726 INFO
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>> Starting the SlotManager.
>>> 2019-11-28 02:33:12,743 INFO
>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>> Released locks of job graph 639170a9d710bacfd113ca66b2aacefa from
>>> ZooKeeper.
>>> 2019-11-28 02:33:12,744 ERROR
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
>>> occurred in the cluster entrypoint.
>>> org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
>>> leadership with session id 607e1fe0-f1b4-4af0-af16-4137d779defb.
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$30(Dispatcher.java:915)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>>> at
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>>>
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>>
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> Caused by: java.lang.RuntimeException:
>>> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph
>>> from state handle under /639170a9d710bacfd113ca66b2aacefa. This indicates
>>> that the retrieved state handle is broken. Try cleaning the state handle
>>> store.
>>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
>>> at
>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>>> at
>>> 

Re: Flink 'Job Cluster' mode Ui Access

2019-11-28 Thread vino yang
Hi Jatin,

Which version do you use?

Best,
Vino

Jatin Banger  于2019年11月28日周四 下午5:03写道:

> Hi,
>
> I checked the log file there is no error.
> And I checked the pods internal ports by using rest api.
>
> # curl : 4081
> {"errors":["Not found."]}
> 4081 is the Ui port
>
> # curl :4081/config
> {"refresh-interval":3000,"timezone-name":"Coordinated Universal
> Time","timezone-offset":0,"flink-version":"","flink-revision":"ceba8af
> @ 11.02.2019 @ 22:17:09 CST"}
>
> # curl :4081/jobs
> {"jobs":[{"id":"___job_Id_____","status":"RUNNING"}]}
>
> Which shows the state of the job as running.
>
> What else can we do ?
>
> Best regards,
> Jatin
>
> On Thu, Nov 28, 2019 at 1:28 PM vino yang  wrote:
>
>> Hi Jatin,
>>
>> Flink web UI does not depend on any deployment mode.
>>
>> You should check if there are error logs in the log file and the job
>> status is running state.
>>
>> Best,
>> Vino
>>
>> Jatin Banger  于2019年11月28日周四 下午3:43写道:
>>
>>> Hi,
>>>
>>> It seems there is Web Ui for Flink Session cluster, But for Flink Job
>>> Cluster it is Showing
>>>
>>> {"errors":["Not found."]}
>>>
>>> Is it the expected behavior for Flink Job Cluster Mode ?
>>>
>>> Best Regards,
>>> Jatin
>>>
>>


Re: Flink 'Job Cluster' mode Ui Access

2019-11-27 Thread vino yang
Hi Jatin,

Flink web UI does not depend on any deployment mode.

You should check if there are error logs in the log file and the job status
is running state.

Best,
Vino

Jatin Banger  于2019年11月28日周四 下午3:43写道:

> Hi,
>
> It seems there is Web Ui for Flink Session cluster, But for Flink Job
> Cluster it is Showing
>
> {"errors":["Not found."]}
>
> Is it the expected behavior for Flink Job Cluster Mode ?
>
> Best Regards,
> Jatin
>


Re: SQL Performance

2019-11-26 Thread vino yang
Hi Nick,

Can you provide more details? Are you using JDBCOutputFormat? If yes, can
`JDBCOutputFormatBuilder#setBatchInterval` help you?

Best,
Vino

Nicholas Walton  于2019年11月26日周二 下午9:20写道:

> I’m streaming records down to an Embedded Derby database, at a rate of
> around 200 records per second. I’m certain Derby can sustain a higher
> throughput than that, if I could buffer the records but it seems that I’m
> writing each record as soon as it arrives and as a single transaction which
> is inefficient.
>
> Is there a way to to improve my throughput rate?
>
> Nick Walton


Re: Pre-process data before it hits the Source

2019-11-26 Thread vino yang
Hi Felipe,

>> But you said, "before it hits the Source".

I did not say this. Vijay said it. About this question, he may not think
about customizing the source connector.

If he does not try to find a solution in the Flink domain. Why he asked
Flink questions and pasted Flink program?

IMO, It's just a matter of expression. WDYT?

Best,
Vino

Felipe Gutierrez  于2019年11月26日周二 下午5:16写道:

> Hi Vino,
>
> yes, in the source function it is possible. But you said, "before it hits
> the Source". So, IMO I think it is outside of the flink workflow.
> Best,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Nov 26, 2019 at 10:09 AM vino yang  wrote:
>
>> Hi Felipe,
>>
>> Why do you think it's not possible.
>>
>> My thought is we can do the data pre-procession in the source function.
>> If so, source function would contain consume upstream events then do
>> pre-processing then emits to the downstream.
>>
>> Best,
>> Vino
>>
>>
>> Felipe Gutierrez  于2019年11月26日周二 下午4:56写道:
>>
>>> I am afraid that this is not possible in FLink, since the entry point of
>>> all transformation is the source function. Everything that we can
>>> pre-process is in the source function or on the downstream operators.
>>> If you want to pre-process something before the data hits the source you
>>> will have to rely on the broker/storage/queue that the source consumes your
>>> data, not in FLink.
>>>
>>> Best,
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Tue, Nov 26, 2019 at 2:57 AM vino yang  wrote:
>>>
>>>> Hi Vijay,
>>>>
>>>> IMO, the semantics of the source is not changeless. It can contain
>>>> integrate with third-party systems and consume events. However, it can also
>>>> contain more business logic about your data pre-process after consuming
>>>> events.
>>>>
>>>> Maybe it needs some customization. WDYT?
>>>>
>>>> Best,
>>>> Vino
>>>>
>>>> Vijay Balakrishnan  于2019年11月26日周二 上午6:45写道:
>>>>
>>>>> Hi,
>>>>> Need to pre-process data(transform incoming data to a different
>>>>> format) before it hits the Source I have defined. How can I do that ?
>>>>>
>>>>> I tried to use a .map on the DataStream but that is too late as the
>>>>> data has already hit the Source I defined.
>>>>> FlinkKinesisConsumer> kinesisConsumer =
>>>>> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
>>>>> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
>>>>> socketTimeout);
>>>>> DataStreamSource> monitoringDataStreamSource =
>>>>> env.addSource(kinesisConsumer);
>>>>>
>>>>> DataStream> kinesisStream1 = kinesisStream.map(new
>>>>> TransformFunction(...));//too late here
>>>>>
>>>>> TIA,
>>>>>
>>>>


Re: Pre-process data before it hits the Source

2019-11-26 Thread vino yang
Hi Felipe,

Why do you think it's not possible.

My thought is we can do the data pre-procession in the source function. If
so, source function would contain consume upstream events then do
pre-processing then emits to the downstream.

Best,
Vino


Felipe Gutierrez  于2019年11月26日周二 下午4:56写道:

> I am afraid that this is not possible in FLink, since the entry point of
> all transformation is the source function. Everything that we can
> pre-process is in the source function or on the downstream operators.
> If you want to pre-process something before the data hits the source you
> will have to rely on the broker/storage/queue that the source consumes your
> data, not in FLink.
>
> Best,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Nov 26, 2019 at 2:57 AM vino yang  wrote:
>
>> Hi Vijay,
>>
>> IMO, the semantics of the source is not changeless. It can contain
>> integrate with third-party systems and consume events. However, it can also
>> contain more business logic about your data pre-process after consuming
>> events.
>>
>> Maybe it needs some customization. WDYT?
>>
>> Best,
>> Vino
>>
>> Vijay Balakrishnan  于2019年11月26日周二 上午6:45写道:
>>
>>> Hi,
>>> Need to pre-process data(transform incoming data to a different format)
>>> before it hits the Source I have defined. How can I do that ?
>>>
>>> I tried to use a .map on the DataStream but that is too late as the data
>>> has already hit the Source I defined.
>>> FlinkKinesisConsumer> kinesisConsumer =
>>> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
>>> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
>>> socketTimeout);
>>> DataStreamSource> monitoringDataStreamSource =
>>> env.addSource(kinesisConsumer);
>>>
>>> DataStream> kinesisStream1 = kinesisStream.map(new
>>> TransformFunction(...));//too late here
>>>
>>> TIA,
>>>
>>


Re: Flink behavior as a slow consumer - out of Heap MEM

2019-11-25 Thread vino yang
Hi Hanan,

Sometimes, the behavior depends on your implementation.

Since it's not a built-in connector, it would be better to share your
customized source with the community
so that the community would be better to help you figure out where is the
problem.

WDYT?

Best,
Vino

Hanan Yehudai  于2019年11月26日周二 下午12:27写道:

> HI ,  I am trying to do some performance test to my flink deployment.
>
> I am implementing an extremely simplistic use case
>
> I built a ZMQ Source
>
>
>
> The topology is ZMQ Source - > Mapper- > DIscardingSInk ( a sink that does
> nothing )
>
>
>
> Data is pushed via ZMQ at a very high rate.
>
> When the incoming  rate from ZMQ is higher then the rate flink can keep up
> with,  I can see that the JVM Heap is filling up  ( using Flinks metrics ) .
> when the heap is fullt consumes – TM chokes , a HeartBeat is missed  and
> the job fails.
>
>
>
> I was expecting Flink to handle this type of backpressure gracefully and
> not
>
>
>
> Note :  The mapper has not state to persist
>
> See below the Grafana charts,  on the left  is the TM HHEAP  Used.
>
> On the right – the ZMQ – out of flink. ( yellow ) Vs Flink consuming rate
> from reported by ZMQSOurce
>
> 1GB is the configured heap size
>
>
>
>


Re: Question about to modify operator state on Flink1.7 with State Processor API

2019-11-25 Thread vino yang
Hi Kaihao,

Ping @Aljoscha Krettek  @Tzu-Li (Gordon) Tai
 to give more professional suggestions.

What's more, we may need to give a statement about if the state processor
API can process the snapshots generated by the old version jobs.  WDYT?

Best,
Vino

Kaihao Zhao  于2019年11月25日周一 下午11:39写道:

> Hi,
>
> We are running Flink 1.7 and recently due to Kafka cluster migration, we
> need to find a way to modify kafka offset in FlinkKafkaConnector's state,
> and we found Flink 1.9's State Processor API is the exactly tool we need,
> we are able to modify the operator state via State Processor API, but when
> trying to resume App from the modified savepoint, we found it failed with
> ClassNotFoundException: *TupleSerializerSnapshot*, these
> *TypeSerializerSnapshots* are new in Flink 1.9 but not in 1.7, so I
> wonder if there has any suggestion or workaround to modify 1.7's state?
>
> --
> Thanks & Regards
> Zhao Kaihao
>


Re: Pre-process data before it hits the Source

2019-11-25 Thread vino yang
Hi Vijay,

IMO, the semantics of the source is not changeless. It can contain
integrate with third-party systems and consume events. However, it can also
contain more business logic about your data pre-process after consuming
events.

Maybe it needs some customization. WDYT?

Best,
Vino

Vijay Balakrishnan  于2019年11月26日周二 上午6:45写道:

> Hi,
> Need to pre-process data(transform incoming data to a different format)
> before it hits the Source I have defined. How can I do that ?
>
> I tried to use a .map on the DataStream but that is too late as the data
> has already hit the Source I defined.
> FlinkKinesisConsumer> kinesisConsumer =
> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
> socketTimeout);
> DataStreamSource> monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
>
> DataStream> kinesisStream1 = kinesisStream.map(new
> TransformFunction(...));//too late here
>
> TIA,
>


Re: Flink Kudu Connector

2019-11-25 Thread vino yang
Hi Rahul,

Only found some resources from the Internet you can consider.[1][2]

Best,
Vino

[1]: https://bahir.apache.org/docs/flink/current/flink-streaming-kudu/
[2]:
https://www.slideshare.net/0xnacho/apache-flink-kudu-a-connector-to-develop-kappa-architectures

Rahul Jain  于2019年11月25日周一 下午6:32写道:

> Hi,
>
> We are trying to use the Flink Kudu connector. Is there any documentation
> available that we can read to understand how to use it ?
>
> We found some sample code but that was not very helpful.
>
> Thanks,
> -rahul
>


Re: Idiomatic way to split pipeline

2019-11-25 Thread vino yang
Hi Avi,

The side output provides a superset of split's functionality. So anything
can be implemented via split also can be implemented via side output.[1]

Best,
Vino

[1]:
https://stackoverflow.com/questions/51440677/apache-flink-whats-the-difference-between-side-outputs-and-split-in-the-data

Avi Levi  于2019年11月25日周一 下午5:32写道:

> Thank you, for your quick reply. I appreciate that.  but this it not
> exactly "side output" per se. it is simple splitting. IIUC The side output
> is more for splitting the records buy something the differentiate them
> (latnes , value etc' ) . I thought there is more idiomatic but if this is
> it, than I will go with that.
>
> On Mon, Nov 25, 2019 at 10:42 AM vino yang  wrote:
>
>> *This Message originated outside your organization.*
>> --
>> Hi Avi,
>>
>> As the doc of DataStream#split said, you can use the "side output"
>> feature to replace it.[1]
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>
>> Best,
>> Vino
>>
>> Avi Levi  于2019年11月25日周一 下午4:12写道:
>>
>>> Hi,
>>> I want to split the output of one of the operators to two pipelines.
>>> Since the *split* method is deprecated, what is the idiomatic way to do
>>> that without duplicating the operator ?
>>>
>>> [image: Screen Shot 2019-11-25 at 10.05.38.png]
>>>
>>>
>>>


Re: Idiomatic way to split pipeline

2019-11-25 Thread vino yang
Hi Avi,

As the doc of DataStream#split said, you can use the "side output" feature
to replace it.[1]

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

Best,
Vino

Avi Levi  于2019年11月25日周一 下午4:12写道:

> Hi,
> I want to split the output of one of the operators to two pipelines. Since
> the *split* method is deprecated, what is the idiomatic way to do that
> without duplicating the operator ?
>
> [image: Screen Shot 2019-11-25 at 10.05.38.png]
>
>
>


Re: How to submit two jobs sequentially and view their outputs in .out file?

2019-11-25 Thread vino yang
Hi Komal,

> Thank you! That's exactly what's happening. Is there any way to force it
write to a specific .out of a TaskManager?

No, I am curious why the two jobs depend on stdout? Can we introduce
another coordinator other than stdout? IMO, this mechanism is not always
available.

Best,
Vino

Komal Mariam  于2019年11月25日周一 上午10:46写道:

> Hi Theo,
>
> I want to interrupt/cancel my current job as it has produced the desired
> results even though it runs infinitely,  and the next one requires full
> resources.
>
> Due to some technical issue we cannot access the web UI so just working
> with the CLI, for now.
>
> I found a less crude way by running the command ./bin/flink cancel  id>  specified by the commands listed here:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
>
> 
>
> Hello Vino,
>
> Thank you! That's exactly what's happening. Is there any way to force it
> write to a specific .out of a TaskManager?
>
>
> Best Regards,
> Komal
>
>
>
> On Mon, 25 Nov 2019 at 11:10, vino yang  wrote:
>
>> Hi Komal,
>>
>> Since you use the Flink standalone deployment mode, the tasks of the jobs
>> which print information to the STDOUT may randomly deploy in any task
>> manager of the cluster. Did you check other Task Managers out file?
>>
>> Best,
>> Vino
>>
>> Komal Mariam  于2019年11月22日周五 下午6:59写道:
>>
>>> Dear all,
>>>
>>> Thank you for your help regarding my previous queries. Unfortunately,
>>> I'm stuck with another one and will really appreciate your input.
>>>
>>> I can't seem to produce any outputs in "flink-taskexecutor-0.out" from
>>> my second job after submitting the first one in my 3-node-flink standalone
>>> cluster.
>>>
>>> Say I want to test out two jobs sequentially. (I do not want to run them
>>> concurrently/in parallel).
>>>
>>> After submitting "job1.jar " via command line, I press "Ctrl + C" to
>>> exit from it (as it runs infinitely). After that I
>>> try to submit a second jar file having the same properties (group-id,
>>> topic, etc) with the only difference being the query written in main
>>> function.
>>>
>>> The first job produces relevant outputs in "flink-taskexecutor-0.out"
>>> but the second one doesn't.
>>>
>>> The only way I can see the output produced is if I restart the cluster
>>> after job1 and then submit job2 as it produces another .out file.
>>>
>>> But I want to submit 2 jobs sequentially and see their outputs without
>>> having to restart my cluster. Is there any way to do this?
>>>
>>> Additional info:
>>> For both jobs I'm using DataStream API and I have set:
>>>  StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> Best Regards,
>>> Komal
>>>
>>


Re: Side output from Flink Sink

2019-11-24 Thread vino yang
Hi Victor,

Currently, it seems the "side output" feature does not been supported by
the streaming sink.

IMO, you can customize your sink via selecting different types of events to
output to different places.

WDYT?

Best,
Vino

Victor Villa Dev  于2019年11月25日周一 下午1:37写道:

> Hi Vino,
>
> Thanks a lot for your reply!
> However I'm not quite sure my question was clear enough.
> I'm aware I can create/get side outputs using output tags from within
> operators (Process Functions) as documentation also states.
>
> The main point in my question is wether creating a sideo output is even
> possible from within a Sink?
> if so, would you mind pointing to an examples on how to correctly get the
> context necessary to add the "output" from within the "invoke()" method.
> In case it isn't what are the usual/suggested strategies?
>
> I know the Sink is usually the "last" portion of a data stream as its name
> indicates, but I was wondering if for some reason something can't be sinked
> (after retries, etc), what is the usual way to deal with such cases?
>
> Thanks again for your kind support.
>
> On 2019/11/25 02:23:15, vino yang  wrote:
> > Hi Victor,
> >
> > Firstly, you can get your side output stream via OutputTag. Please refer
> to
> > the official documentation[1].
> > Then, specify a sink for your side output stream. Of course, you can
> > specify a Kafka sink.
> >
> > Best,
> > Vino
> >
> > [1]:
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
> >
> > Victor Villa Dev  于2019年11月25日周一 上午2:27写道:
> >
> > > I'd like know if there's a way to generate a side output and/or sink
> to an
> > > alternate kafka topic from within a Sink?
> > > The use case is the datastream sinks to a storage and on particular
> failed
> > > attempts I'd like to deadletter to a kafka topic.
> > > Any suggestions?
> > >
> > > Thanks
> > >
> >
>


Re: Side output from Flink Sink

2019-11-24 Thread vino yang
Hi Victor,

Firstly, you can get your side output stream via OutputTag. Please refer to
the official documentation[1].
Then, specify a sink for your side output stream. Of course, you can
specify a Kafka sink.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

Victor Villa Dev  于2019年11月25日周一 上午2:27写道:

> I'd like know if there's a way to generate a side output and/or sink to an
> alternate kafka topic from within a Sink?
> The use case is the datastream sinks to a storage and on particular failed
> attempts I'd like to deadletter to a kafka topic.
> Any suggestions?
>
> Thanks
>


Re: flink session cluster ha on k8s

2019-11-24 Thread vino yang
Hi 祥才,

You can refer to the reply of this old thread[1].

Best,
Vino

[1]:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Re-how-to-setup-a-ha-flink-cluster-on-k8s-td31089.html

曾祥才  于2019年11月25日周一 上午9:28写道:

> hi,   is there any example about  ha  on k8s  for  flink  session
> cluster.?   I've checked  the  docs on  flink.apache.org ,  seems no
> info  about  this
>


Re: How to submit two jobs sequentially and view their outputs in .out file?

2019-11-24 Thread vino yang
Hi Komal,

Since you use the Flink standalone deployment mode, the tasks of the jobs
which print information to the STDOUT may randomly deploy in any task
manager of the cluster. Did you check other Task Managers out file?

Best,
Vino

Komal Mariam  于2019年11月22日周五 下午6:59写道:

> Dear all,
>
> Thank you for your help regarding my previous queries. Unfortunately, I'm
> stuck with another one and will really appreciate your input.
>
> I can't seem to produce any outputs in "flink-taskexecutor-0.out" from my
> second job after submitting the first one in my 3-node-flink standalone
> cluster.
>
> Say I want to test out two jobs sequentially. (I do not want to run them
> concurrently/in parallel).
>
> After submitting "job1.jar " via command line, I press "Ctrl + C" to exit
> from it (as it runs infinitely). After that I
> try to submit a second jar file having the same properties (group-id,
> topic, etc) with the only difference being the query written in main
> function.
>
> The first job produces relevant outputs in "flink-taskexecutor-0.out" but
> the second one doesn't.
>
> The only way I can see the output produced is if I restart the cluster
> after job1 and then submit job2 as it produces another .out file.
>
> But I want to submit 2 jobs sequentially and see their outputs without
> having to restart my cluster. Is there any way to do this?
>
> Additional info:
> For both jobs I'm using DataStream API and I have set:
>  StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> Best Regards,
> Komal
>


Re: Flink on YARN: frequent "container released on a *lost* node"

2019-11-21 Thread vino yang
Hi Amran,

Did you monitor or have a look at your memory metrics(e.g. full GC) of your
TM.

There is a similar thread that a user reported the same question due to
full GC, the link is here[1].

Best,
Vino

[1]:
http://mail-archives.apache.org/mod_mbox/flink-user/201709.mbox/%3cfa4068d3-2696-4f29-857d-4a7741c3e...@greghogan.com%3E

amran dean  于2019年11月22日周五 上午7:15写道:

> Hello,
> I am frequently seeing this error in my jobmanager logs:
>
> 2019-11-18 09:07:08,863 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> kdi_g2 -> (Sink: s_id_metadata, Sink: t_id_metadata) (23/24)
> (5e10e88814fe4ab0be5f554ec59bd93d) switched from RUNNING to FAILED.
> java.lang.Exception: Container released on a *lost* node
> at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Yarn nodemanager logs don't show anything out of the ordinary when such
> exceptions occur, so I am suspecting it is a timeout of some sort due to
> network problems. (YARN is not killing the container ). It's difficult to
> diagnose because Flink doesn't give any reason for losing the node. Is it a
> timeout? OOM?
>
>  Is there a corresponding configuration I should tune? What is the
> recommended course of action?
>


Re: Retrieving Flink job ID/YARN Id programmatically

2019-11-21 Thread vino yang
Hi Lei,

It would be better to use Flink's RESTful API to fetch the information of
the running jobs[1].

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-1

Best,
Vino

Lei Nie  于2019年11月22日周五 上午4:14写道:

> I looked at the code, and
> StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID is
> generating a random ID unrelated to the actual ID used.
>
> Is there any way to fetch the real ID at runtime?
> Use case: fetch most recent checkpoint from stable storage for
> automated restarts. Most recent checkpoint has form
> ".../checkpoints/flink_app_id/chk-123"
>
> On Thu, Nov 21, 2019 at 11:28 AM Lei Nie  wrote:
> >
> > This does not get the correct id:
> > StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID =
> > eea5abc21dd8743a4090f4a3a660f9e8
> > Actual job ID (from webUI): 1357d21be640b6a3b8a86a063f4bba8a
> >
> >
> >
> > On Thu, Nov 7, 2019 at 6:56 PM vino yang  wrote:
> > >
> > > Hi Lei Nie,
> > >
> > > You can use
> `StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID` to get the
> job id.
> > >
> > > Best,
> > > Vino
> > >
> > > Lei Nie  于2019年11月8日周五 上午8:38写道:
> > >>
> > >> Hello,
> > >> I am currently executing streaming jobs via
> StreamExecutionEnvironment. Is it possible to retrieve the Flink job
> ID/YARN ID within the context of a job? I'd like to be able to
> automatically register the job such that monitoring jobs can run (REST api
> requires for example job id).
> > >>
> > >> Thanks
>


Re: Dynamically creating new Task Managers in YARN

2019-11-20 Thread vino yang
Hi Jingsong,

Thanks for the explanation about the mechanism of the new Flink session
cluster mode.

Because I mostly use job cluster mode, so did not have a good knowledge of
the new Flink session cluster mode.

Best,
Vino

Jingsong Li  于2019年11月21日周四 下午2:46写道:

> Hi Piper and Vino:
>
> Current Flink version, the resources of Flink Session cluster
> are unrestricted, which means if the requested resources exceed the
> resources owned by the current session, it will apply to the RM of yarn for
> new resources.
> And if TaskManager is idle for too long, JM will release it to yarn. This
> behavior is controlled by resourcemanager.taskmanager-timeout . You can set
> a suitable value for it to enjoy the benefits of reuse process and dynamic
> resources.
>
> From this point of view, I think session mode is a good choice.
> Is this what you want? Piper.
>
> Best,
> Jingsong Lee
>
>
>
> On Thu, Nov 21, 2019 at 2:25 PM vino yang  wrote:
>
>> Hi Piper,
>>
>> The understanding of two deploy modes For Flink on Yarn is right.
>>
>> AFAIK, The single job (job cluster) mode is more popular than Session
>> mode.
>>
>> Because job cluster mode, Flink let YARN manage resources as far as
>> possible. And this mode can keep isolation from other jobs.
>>
>> IMO, we do not need to combine their advantages. Let YARN do the things
>> that it is good at. What do you think?
>>
>> Best,
>> Vino
>>
>>
>> Piper Piper  于2019年11月21日周四 上午11:55写道:
>>
>>> Hi Vino,
>>>
>>> I want to implement Resource Elasticity. In doing so, I have read that
>>> Flink with YARN has two modes: Job and Session.
>>>
>>> In Job mode, Flink’s Resource Manager requests YARN for containers with
>>> TMs, and then gives the containers back to YARN upon job completion.
>>>
>>> In Session mode, Flink already has the TMs that are persistent.
>>>
>>> I want to combine the advantages of Job and Session mode, i.e. Flink
>>> will have persistent TMs/containers and request YARN for more
>>> TMs/containers when needed (or release TMs/containers back to YARN).
>>>
>>> Thank you,
>>>
>>> Piper
>>>
>>> On Wed, Nov 20, 2019 at 9:39 PM vino yang  wrote:
>>>
>>>> Hi Piper,
>>>>
>>>> Can you share more reason and details of your requirements.
>>>>
>>>> Best,
>>>> Vino
>>>>
>>>> Piper Piper  于2019年11月21日周四 上午5:48写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> How can I make Flink's Resource Manager request YARN to spin up new
>>>>> (or destroy/reclaim existing) TaskManagers in YARN containers?
>>>>>
>>>>> Preferably at runtime (i.e. dynamically).
>>>>>
>>>>> Thank you
>>>>>
>>>>> Piper
>>>>>
>>>>
>
> --
> Best, Jingsong Lee
>


Re: Dynamically creating new Task Managers in YARN

2019-11-20 Thread vino yang
Hi Piper,

The understanding of two deploy modes For Flink on Yarn is right.

AFAIK, The single job (job cluster) mode is more popular than Session mode.

Because job cluster mode, Flink let YARN manage resources as far as
possible. And this mode can keep isolation from other jobs.

IMO, we do not need to combine their advantages. Let YARN do the things
that it is good at. What do you think?

Best,
Vino


Piper Piper  于2019年11月21日周四 上午11:55写道:

> Hi Vino,
>
> I want to implement Resource Elasticity. In doing so, I have read that
> Flink with YARN has two modes: Job and Session.
>
> In Job mode, Flink’s Resource Manager requests YARN for containers with
> TMs, and then gives the containers back to YARN upon job completion.
>
> In Session mode, Flink already has the TMs that are persistent.
>
> I want to combine the advantages of Job and Session mode, i.e. Flink will
> have persistent TMs/containers and request YARN for more TMs/containers
> when needed (or release TMs/containers back to YARN).
>
> Thank you,
>
> Piper
>
> On Wed, Nov 20, 2019 at 9:39 PM vino yang  wrote:
>
>> Hi Piper,
>>
>> Can you share more reason and details of your requirements.
>>
>> Best,
>> Vino
>>
>> Piper Piper  于2019年11月21日周四 上午5:48写道:
>>
>>> Hi,
>>>
>>> How can I make Flink's Resource Manager request YARN to spin up new (or
>>> destroy/reclaim existing) TaskManagers in YARN containers?
>>>
>>> Preferably at runtime (i.e. dynamically).
>>>
>>> Thank you
>>>
>>> Piper
>>>
>>


Re: Completed job wasn't saved to archive

2019-11-20 Thread vino yang
If everything is OK(your config options about archive dir and history
server is correct), Flink should archive the completed job.

You said you did not find any exceptions in the log about failing to
archive. But any other exceptions? Can you share the logs about your scene?

Best,
Vino

Pavel Potseluev  于2019年11月21日周四 上午2:25写道:

> Hi all,
>
> We see occasionally that flink doesn't save information about canceled job
> to archive directory (configured by jobmanager.archive.fs.dir property). And
> there are no exceptions in the log about failing archiving. It's a problem
> in our use case because our script for deploying jobs relies on flink
> history server to find latest checkpoint for some job. Does flink guarantee
> saving data to archive? If so, any ideas why it doesn't work
> sometimes? Flink version is 1.8.0.
>
> --
> Best regards,
> Pavel Potseluev
> Software developer, Yandex.Classifieds LLC
>
>


Re: Dynamically creating new Task Managers in YARN

2019-11-20 Thread vino yang
Hi Piper,

Can you share more reason and details of your requirements.

Best,
Vino

Piper Piper  于2019年11月21日周四 上午5:48写道:

> Hi,
>
> How can I make Flink's Resource Manager request YARN to spin up new (or
> destroy/reclaim existing) TaskManagers in YARN containers?
>
> Preferably at runtime (i.e. dynamically).
>
> Thank you
>
> Piper
>


Re: [ANNOUNCE] Launch of flink-packages.org: A website to foster the Flink Ecosystem

2019-11-19 Thread vino yang
Hi Robert,

Just added it under the "Tools" category[1].

[1]: https://flink-packages.org/packages/kylin-flink-cube-engine

Best,
Vino

Robert Metzger  于2019年11月19日周二 下午4:33写道:

> Thanks.
> You can add Kylin whenever you think it is ready.
>
> On Tue, Nov 19, 2019 at 9:07 AM vino yang  wrote:
>
> > Thanks Robert. Great job! The web site looks great.
> >
> > In the future, we can also add my Kylin Flink cube engine[1] to the
> > ecosystem projects list.
> >
> > [1]: https://github.com/apache/kylin/tree/engine-flink
> >
> > Best,
> > Vino
> >
> > Oytun Tez  于2019年11月19日周二 上午12:09写道:
> >
> >> Congratulations! This is exciting.
> >>
> >>
> >>  --
> >>
> >> [image: MotaWord]
> >> Oytun Tez
> >> M O T A W O R D | CTO & Co-Founder
> >> oy...@motaword.com
> >>
> >>   <https://www.motaword.com/blog>
> >>
> >>
> >> On Mon, Nov 18, 2019 at 11:07 AM Robert Metzger 
> >> wrote:
> >>
> >>> Hi all,
> >>>
> >>> I would like to announce that Ververica, with the permission of the
> >>> Flink PMC, is launching a website called flink-packages.org. This goes
> >>> back to an effort proposed earlier in 2019 [1]
> >>> The idea of the site is to help developers building extensions /
> >>> connectors / API etc. for Flink to get attention for their project.
> >>> At the same time, we want to help Flink users to find those ecosystem
> >>> projects, so that they can benefit from the work. A voting and
> commenting
> >>> functionality allows users to rate and and discuss about individual
> >>> packages.
> >>>
> >>> You can find the website here: https://flink-packages.org/
> >>>
> >>> The full announcement is available here:
> >>> https://www.ververica.com/blog/announcing-flink-community-packages
> >>>
> >>> I'm happy to hear any feedback about the site.
> >>>
> >>> Best,
> >>> Robert
> >>>
> >>>
> >>> [1]
> >>>
> https://lists.apache.org/thread.html/c306b8b6d5d2ca071071b634d647f47769760e1e91cd758f52a62c93@%3Cdev.flink.apache.org%3E
> >>>
> >>
>


Re: [ANNOUNCE] Launch of flink-packages.org: A website to foster the Flink Ecosystem

2019-11-19 Thread vino yang
Thanks Robert. Great job! The web site looks great.

In the future, we can also add my Kylin Flink cube engine[1] to the
ecosystem projects list.

[1]: https://github.com/apache/kylin/tree/engine-flink

Best,
Vino

Oytun Tez  于2019年11月19日周二 上午12:09写道:

> Congratulations! This is exciting.
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>
>
> On Mon, Nov 18, 2019 at 11:07 AM Robert Metzger 
> wrote:
>
>> Hi all,
>>
>> I would like to announce that Ververica, with the permission of the Flink
>> PMC, is launching a website called flink-packages.org. This goes back to
>> an effort proposed earlier in 2019 [1]
>> The idea of the site is to help developers building extensions /
>> connectors / API etc. for Flink to get attention for their project.
>> At the same time, we want to help Flink users to find those ecosystem
>> projects, so that they can benefit from the work. A voting and commenting
>> functionality allows users to rate and and discuss about individual
>> packages.
>>
>> You can find the website here: https://flink-packages.org/
>>
>> The full announcement is available here:
>> https://www.ververica.com/blog/announcing-flink-community-packages
>>
>> I'm happy to hear any feedback about the site.
>>
>> Best,
>> Robert
>>
>>
>> [1]
>> https://lists.apache.org/thread.html/c306b8b6d5d2ca071071b634d647f47769760e1e91cd758f52a62c93@%3Cdev.flink.apache.org%3E
>>
>


Re: Flink configuration at runtime

2019-11-18 Thread vino yang
Hi Amran,

Change the config option at runtime? No, Flink does not support this
feature currently.

However, for Flink on Yarn job cluster mode, you can specify different
config options for different jobs via program or flink-conf.yaml(copy a new
flink binary package then change config file).

Best,
Vino

amran dean  于2019年11月19日周二 上午5:53写道:

> Is it possible to configure certain settings at runtime, on a per-job
> basis rather than globally within flink-conf.yaml?
>
> For example, I have a job where it's desirable to retain a large number of
> checkpoints via
> state.checkpoints.num-retained.
>
> The checkpoints are cheap, and it's low cost. For other jobs, I don't want
> such a large number.
>
>
>


Re: how to setup a ha flink cluster on k8s?

2019-11-16 Thread vino yang
Hi Rock,

I searched by Google and found a blog[1] talk about how to config JM HA for
Flink on k8s. Do not know whether it suitable for you or not. Please feel
free to refer to it.

Best,
Vino

[1]:
http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/

Rock  于2019年11月16日周六 上午11:02写道:

> I'm trying to setup a flink cluster on k8s for production use.But the
> setup here
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html
>   this
> not ha , when job-manager down and rescheduled
>
> the metadata for running job is lost.
>
>
>
> I tried to use ha setup for zk
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html
>  on
> k8s , but can't get it right.
>
>
>
> Stroing  job's metadata on k8s using pvc or other external file
> system should be  very easy.Is there a way to achieve it.
>


Re: slow checkpoints

2019-11-15 Thread vino yang
Hi Yubraj,

So the frequent job failure is the root reason, you need to fix it. Yes,
when too many messages are squashed into the message system. If the
messages can not be consumed normally, there would exist catchup consuming
which will cause your streaming system more pressure than usual.

Best,
Vino

yuvraj singh <19yuvrajsing...@gmail.com> 于2019年11月15日周五 下午5:25写道:

> @Congxian , back pressure is due to job failure , some times job can fail
> and we need to catch up .
>
> Thanks
> Yubraj Singh
>
> [image: Mailtrack]
> 
>  Sender
> notified by
> Mailtrack
> 
>  11/15/19,
> 02:53:23 PM
>
> On Fri, Nov 15, 2019 at 2:39 PM Congxian Qiu 
> wrote:
>
>> Hi
>>
>> Currently, checkpoint may be faile in high back pressure scenario,
>> because the barrier alignment can't be done in expected time, you should
>> fix the back pressure problem first. There is a FLIP[1] that wants to fix
>> this issue.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>> Best,
>> Congxian
>>
>>
>> yuvraj singh <19yuvrajsing...@gmail.com> 于2019年11月15日周五 下午4:47写道:
>>
>>>
>>> Hi all ,
>>>
>>> I am facing one issue , when i have high back pressure my checkpoints
>>> start failing . please let me know how to deal with this kind  of
>>> situations .
>>>
>>>
>>> Thanks
>>> Yubraj Singh .
>>>
>>>
>>> [image: Mailtrack]
>>> 
>>>  Sender
>>> notified by
>>> Mailtrack
>>> 
>>>  11/15/19,
>>> 02:15:52 PM
>>>
>>


Re: Flink on Yarn resource arrangement

2019-11-13 Thread vino yang
Hi Alex,

Which Flink version are you using?

AFAIK, since Flink 1.8+, the config option: "-yn" for Flink on YARN job
cluster mode does not take effect(always 1 and would be overridden).

So, the config option "-ys" and "-p" will decide the number of TM.

The first example: -p(20)/-ys(3) should be advanced to 7 to match the
requirement of parallelism. So total slots should be 7 * 3 = 21.

The second example: -p(20)/-ys(4) = 5. So total slots should be 5 * 4 = 20.

Best,
Vino

qq <471237...@qq.com> 于2019年11月14日周四 下午3:26写道:

> Hi all,
>
>Could you list details how  Flink job on Yarn resources managed ?
>
>   I used command “-p 20 -yn 5 -ys 3 -yjm 2048m -ytm 2048m” to run flink
> job. I got
> containers vcores
> 8 22
> Task Managers 7 Total Task Slots 21
>
>
> I used command “-p 20 -yn 7 -ys 4 -yjm 2048m -ytm 2048m” to run flink job,
> I got
> containers vcores
>621
> Total Task Slots 20 Task Managers 5
>
> Could you help give the exactly resources formula ? Thanks very much.
>
>
>
> Alex Fu
> 2019/11/14


Re: Initialization of broadcast state before processing main stream

2019-11-13 Thread vino yang
Hi Vasily,

Currently, Flink did not do the coordination between a general stream and
broadcast stream, they are both streams. Your scene of using the broadcast
state is a special one. In a more general scene, the states need to be
broadcasted is an unbounded stream, the state events may be broadcasted to
the downstream at any time. So it can not be wait to be done before playing
the usual stream events.

For your scene:


   - you can change your storage about dimension table, e.g. Redis or MySQL
   and so on to do the stream and dimension table join;
   - you can inject some control event in your broadcast stream to mark the
   stream is end and let the fact stream wait until receiving the control
   event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to
   coordinate them, however, it would make your solution more complex.

Best,
Vino


Vasily Melnik  于2019年11月14日周四
下午1:28写道:

> Hi all.
>
> In our task we have two Kafka topics:
> - one with fact stream (web traffic)
> - one with dimension
>
> We would like to put dimension data into broadcast state and lookup on int
> with facts. But we see that not all dimension records are put into state
> before first fact record is processed, so lookup gives no data.
>
> The question is: how could we read fact topic with some "delay" to give
> dimension enough time to initialize state?
>
>
> С уважением,
> Василий Мельник
>


Re: Flink (Local) Environment Thread Leaks?

2019-11-13 Thread vino yang
Hi Theo,

If you think there is a thread leakage problem. You can create a JIRA issue
and write a detailed description.

Ping @Gary Yao   and @Zhu Zhu  to
help to locate and analyze this problem?

Best,
Vino

Theo Diefenthal  于2019年11月14日周四 上午3:16写道:

> I included a Solr End2End test in my project, inheriting from Junit 4
> SolrCloudTestCase.
>
> The solr-test-framework for junit 4 makes use of 
> com.carrotsearch.randomizedtesting
> which automatically tests for thread leakages on test end. In my other
> projects, that tool doesn't produce any problems.
> When used in a test together with a Flink LocalExecutionEnvironment, it
> will prevent the test from suceeding due the following error at shutdown
> phase:
>
> com.carrotsearch.randomizedtesting.ThreadLeakError: 3 threads leaked from
> SUITE scope at somepackage.E2ETest:
>1) Thread[id=170, name=FlinkCompletableFutureDelayScheduler-thread-1,
> state=TIMED_WAITING, group=TGRP-E2ETest]
> at sun.misc.Unsafe.park(Native Method)
> at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>2) Thread[id=29, name=metrics-meter-tick-thread-2, state=WAITING,
> group=TGRP-E2ETest]
> at sun.misc.Unsafe.park(Native Method)
> at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>3) Thread[id=28, name=metrics-meter-tick-thread-1, state=TIMED_WAITING,
> group=TGRP-E2ETest]
> at sun.misc.Unsafe.park(Native Method)
> at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> at __randomizedtesting.SeedInfo.seed([CC6ED531AFECBAF6]:0)
>
> Note that I can suppress the errors easily via setting
> @ThreadLeakScope(ThreadLeakScope.Scope.NONE) in my tests, but I just want
> to point out possible thread leaks in the mailing list here. As the first
> thread is named FlinkCompletableFutureDelayScheduler, I suggest that Flink
> doesn't shut down some of its multitude of threads nicely in a local
> execution environment. My question: Is that some kind of problem / thread
> leakage in Flink or is it just a false warning?
>
>
>
>


Re: Document an example pattern that makes sources and sinks pluggable in the production code for testing

2019-11-11 Thread vino yang
Hi Hung,

Your suggestion is reasonable. Giving an example of a pluggable source and
sink can make it more user-friendly, you can open a JIRA issue to see if
there is anyone who wants to improve this.

IMO, it's not very difficult to implement it. Because the source and sink
in Flink has two unified abstract interfaces: `SourceFunction` and
`SinkFunction`. You can pass them into your method which builds your
DAG(except source and sink). Then, you can provide different implementation
of source and sink for production and testing purposes.

Best,
Vino

Hung  于2019年11月11日周一 下午8:44写道:

> Hi guys,
>
> I found the testing part mentioned
>
> make sources and sinks pluggable in your production code and inject special
> test sources and test sinks in your tests.
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs
>
> I think it would be useful to have a documented example as the section
> *testing stateful operato*r does, which demonstrates by WindowOperatorTest
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators
>
> or, is there perhaps already a test that plugs sources and sinks?
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Job Distribution Strategy On Cluster.

2019-11-11 Thread vino yang
Hi srikanth,

What's your job's parallelism?

In some scenes, many operators are chained with each other. if it's
parallelism is 1, it would just use a single slot.

Best,
Vino

srikanth flink  于2019年11月6日周三 下午10:03写道:

> Hi there,
>
> I'm running Flink with 3 node cluster.
> While running my jobs(both SQL client and jar submission), the jobs are
> being assigned to single machine instead of distribution among the cluster.
> How could I achieve the job distribution to make use of the computation
> power?
>
> Thanks
> Srikanth
>


Re: static table in flink

2019-11-09 Thread vino yang
Hi Jaqie,

If I understand your question correctly, it seems you are finding a
solution about the Stream table and Dim table(you called static table) join.

There were many users who asked this question. Linked some reply here[1][2]
to let you consider.

Best,
Vino

[1]:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/How-to-Join-a-dimension-table-in-flink-sql-td20729.html
[2]: http://www.codeha.us/apache-flink-users/msg12421.html

Jaqie Chan  于2019年11月9日周六 下午3:04写道:

> Hello,
>
> I have questions about static table in flink.
>
> Join the stream table with static table. I'm looking at temporal table,
> while the time based table would grow exponentially over period. Any
> suggestions?
>
> Stream tables checks the contains in static table(updates once everyday
> with new set of data). Trying to approach this with views.
>
> Thanks for any suggestions.
>
> Regards
> 嘉琪
>
>


Re: Retrieving Flink job ID/YARN Id programmatically

2019-11-07 Thread vino yang
Hi Lei Nie,

You can use
`StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID` to get the
job id.

Best,
Vino

Lei Nie  于2019年11月8日周五 上午8:38写道:

> Hello,
> I am currently executing streaming jobs via StreamExecutionEnvironment. Is
> it possible to retrieve the Flink job ID/YARN ID within the context of a
> job? I'd like to be able to automatically register the job such that
> monitoring jobs can run (REST api requires for example job id).
>
> Thanks
>


Re: flink's hard dependency on zookeeper for HA

2019-11-07 Thread vino yang
Hi Vishwas,

In the standalone cluster HA mode, Flink heavily depends on ZooKeeper. Not
only for leader election, but also for:


   - Checkpoint metadata info;
   - JobGraph store;
   - 

So you should make sure your ZooKeeper Cluster works normally. More details
please see[1][2].

Best,
Vino

[1]:
https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html

Vishwas Siravara  于2019年11月7日周四 上午12:07写道:

> Hi all,
> I am using flink 1.7.2 as a standalone cluster in high availability mode
> with zookeeper. I have noticed that all flink processes go down once
> zookeeper goes down ? Is this expected behavior since the leader election
> has already happened and the job has been running for several hours.
>
>
> Best,
> Vishwas
>


Re: When using udaf, the startup job has a “Cannot determine simple type name 'com' ” exception(Flink version 1.7.2)

2019-11-06 Thread vino yang
Hi mailtolrl,

Can you share more context about your program and UDAF.

Best,
Vino

mailtolrl  于2019年11月7日周四 下午3:05写道:

> My flink streaming job use a udaf, set 60 parallelisms,submit job in yarn
> cluster mode,and then happens every time I start.
>
>
>
>


Re: What is the slot vs cpu ratio?

2019-11-06 Thread vino yang
Hi srikanth,

Referred from the official document:

"Each Flink TaskManager provides processing slots in the cluster. The
number of slots is typically proportional to the number of available CPU
cores of each TaskManager. As a general recommendation, the number of
available CPU cores is a good default for taskmanager.numberOfTaskSlots."

So based on the recommendation, you can have a 3 * 16 = 48(max) slots.  And
the calculation information is here[1].

Additionally, here is some resource about the task slot[2].

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#configuring-taskmanager-processing-slots
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html#task-slots-and-resources

Best,
Vino

srikanth flink  于2019年11月6日周三 下午9:45写道:

> Hi there,
>
> I've 3 node cluster with 16cores each. How many slots could I utilize at
> max and how to I do the calculation?
>
>
> Thanks
> Srikanth
>


Re: RocksDB and local file system

2019-11-06 Thread vino yang
Hi Jaqie,

For testing, you can use the local file system pattern (e.g. "file:///").
Technically speaking, it's OK to specify the string path provided by you.

However, in the production environment, we do not recommend using the local
file system. Because it does not provide high availability.

Best,
Vino

Jaqie Chan  于2019年11月6日周三 下午11:29写道:

> Hello
>
> I am using Flink rocksDb state backend, the documentation seems to imply i
> can use a regular file system such as: file:///data/flink/checkpoints,
> but the code javadoc only mentions hdfs or s3 option here.
>
> I am wondering if it's possible to use local file system with flink
> rocksdb backend.
>
>
> Thanks
>
> 嘉琪
>


Re: Limit max cpu usage per TaskManager

2019-11-06 Thread vino yang
Hi Lu,

When using Flink on YARN, it will rely on YARN's resource management
capabilities, and Flink cannot currently limit CPU usage.

Also, what version of Flink do you use? As far as I know, since Flink 1.8,
the -yn parameter will not work.

Best,
Vino

Lu Niu  于2019年11月6日周三 下午1:29写道:

> Hi,
>
> When run flink application in yarn mode, is there a way to limit maximum
> cpu usage per TaskManager?
>
> I tried this application with just source and sink operator.
> parallelism of source is 60 and parallelism of sink is 1. When running in
> default config, there are 60 TaskManager assigned. I notice one TaskManager
> process cpu usage could be 200% white the rest below 50%.
>
> When I set -yn = 2 (default is 1), I notice # of TaskManger dropped down
> to 30. and one TaskManger process cpu usage could be 600% while the rest
> below 50%.
>
> Tried to set yarn.containers.vcores = 2,  all tasks are in start state
> forever, application is not able to turn to running state.
>
> Best
> Lu
>


Re: Partitioning based on key flink kafka sink

2019-11-06 Thread vino yang
Hi Vishwas,

You should pay attention to the other args.

The constructor provided by you has a `KeyedSerializationSchema` arg, while
the comments of the constructor which made you confused only has a
`SerializationSchema` arg. That's their difference.

Best,
Vino

Vishwas Siravara  于2019年11月6日周三 上午9:16写道:

> Hi all,
> I am using flink 1.7.0 and using this constructor
>
> FlinkKafkaProducer(String topicId, KeyedSerializationSchema 
> serializationSchema, Properties producerConfig)
>
> From the doc it says this constructor uses fixed partitioner. I want to
> partition based on key , so I tried to use this
>
> public FlinkKafkaProducer(
>String defaultTopicId,
>KeyedSerializationSchema serializationSchema,
>Properties producerConfig,
>Optional> customPartitioner)
>
> What should I pass in the optional field ? From the doc it says
>
> @param customPartitioner A serializable partitioner for assigning messages to 
> Kafka partitions.
> *  If a partitioner is not provided, records will be 
> partitioned by the key of each record
> *  (determined by {@link 
> KeyedSerializationSchema#serializeKey(Object)}). If the keys
> *  are {@code null}, then records will be distributed 
> to Kafka partitions in a
> *  round-robin fashion.
>
> This is super confusing(contradicting in a way) since the previous
> constructor says that fixedpartitioner will be used if customPartioner is
> not present.
>
> Best,
> Vishwas
>


Re: Checkpoint in FlinkSQL

2019-11-04 Thread vino yang
Hi Simon,

Absolutely, yes. Before using Flink SQL, you need to initialize a
StreamExecutionEnvirnoment instance[1], then call
StreamExecutionEnvirnoment#setStateBackend
or StreamExecutionEnvirnoment#enableCheckpointing to specify the
information what you want.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#specifying-a-query

Best,
Vino

Simon Su  于2019年11月5日周二 上午10:38写道:

> Hi All
>
> Does current Flink support to set checkpoint properties while using Flink
> SQL ?
> For example,  statebackend choices, checkpoint interval and so on ...
>
> Thanks,
> SImon
>
>


Re: Async operator with a KeyedStream

2019-11-01 Thread vino yang
Hi Bastien,

Your analysis of using KeyedStream in Async I/O is correct. It will not
figure out the key.

In your scene, the good practice about interacting with DB is async I/O +
thread pool[1] + connection Pool.

You can use a connection pool to reuse and limit the mysql connection.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#implementation-tips

bastien dine  于2019年10月31日周四 下午4:36写道:

> Hello,
>
> I would like to know if you can use a KeyedStream with the Async operator :
> I want to use the async operator to insert some stuff in my database but I
> want to limit 1 request per element (with key=id) at a time
> With a regular keyBy / map, it's working, but it's too slow (i don't have
> enough ressources to increase my parallelism),
>
> As far as I have seen, this is not possible
> When I write something like
> Async.orderedWait(myStream.keyBy(myKeyselector)), the keyBy is totally
> ignored
>
> Have you a solution for this?
>
> Best Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>


Re: Stateful functions presentation code (UI part)

2019-10-31 Thread vino yang
Hi Flavio,

Please see this link.[1]

Best,
Vino

[1]:
https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example

Flavio Pompermaier  于2019年10月31日周四 下午4:53写道:

> Hi to all,
> yould it be possible to provide also the source code of the UI part of the
> ride sharing example? It would be interesting to me how the UI is reading
> the data from the Kafka egress.
>
> Best,
> Flavio
>


Re: Sending custom statsd tags

2019-10-31 Thread vino yang
Hi Prakhar,

You need to customize StatsDReporter[1] in the Flink source.

If you want to flexibly get configurable tags from the configuration
file[2], you can refer to the implementation of DatadogHttpReporter#open[3]
(for reference only how to get the tag).

Best,
Vino

[1]:
https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L52
[2]:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter
[3]:
https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java#L114


Prakhar Mathur  于2019年10月31日周四 下午2:35写道:

> Hi Chesnay,
>
> Thanks for the response, can you point me to some existing example for
> this?
>
> On Wed, Oct 30, 2019 at 5:30 PM Chesnay Schepler 
> wrote:
>
>> Not possible, you'll have to extend the StatsDReporter yourself to add
>> arbitrary tags.
>>
>> On 30/10/2019 12:52, Prakhar Mathur wrote:
>>
>> Hi,
>>
>> We are running Flink 1.6.2. We are using flink-metrics-statsd jar in
>> order to send metrics to telegraf. In order to send custom metrics, we are
>> using MetricGroups. Currently, we are trying to send a few custom tags
>> but unable to find any examples or documentation regarding the same.
>>
>> Regards
>> Prakhar Mathur
>>
>>
>>


Re: Flink S3 error

2019-10-30 Thread vino yang
Hi Harrison,

So did you check whether the file exists or not? And what's your question?

Best,
Vino

Harrison Xu  于2019年10月31日周四 上午5:24写道:

> I'm seeing this exception with the S3 uploader - it claims a previously
> part file was not found. Full jobmanager logs attached. (Flink 1.8)
>
> java.io.FileNotFoundException: No such file or directory: 
> s3a://qcache/tmp/kafka/meta/rq_features/dt=2019-10-30T15/partition_1/_part-4-1169_tmp_21400e5e-3921-4f33-a980-ac953b50b4b7
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>   at 
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObject(HadoopS3AccessHelper.java:98)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverInProgressPart(S3RecoverableMultipartUploadFactory.java:97)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverRecoverableUpload(S3RecoverableMultipartUploadFactory.java:75)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.recover(S3RecoverableWriter.java:95)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.recover(S3RecoverableWriter.java:50)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.Bucket.restoreInProgressFile(Bucket.java:146)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.Bucket.(Bucket.java:133)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.Bucket.restore(Bucket.java:404)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:182)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:170)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.Buckets.initializeState(Buckets.java:154)
>   at 
> com.quora.dataInfra.s3connector.flink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:344)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
>
>


Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread vino yang
Hi Franke,

>From the information provided by Alex:

>> mvn build jar include com.mysql.jdbc.Driver.

it seems he has packaged a fat jar?

Best,
Vino

Jörn Franke  于2019年10月30日周三 下午2:47写道:

>
>
> You can create a fat jar (also called Uber jar) that includes all
> dependencies in your application jar.
>
> I would avoid to put things in the Flink lib directory as it can make
> maintenance difficult. Eg deployment is challenging, upgrade of flink,
> providing it on new nodes etc.
>
>
> Am 30.10.2019 um 04:46 schrieb Alex Wang :
>
> 
> Hello everyone, I am a newbie.
> I am learning the flink-sql-submit project. From @Jark Wu  :
> https://github.com/wuchong/flink-sql-submit
>
> My local environment is:
> 1. flink1.9.0 standalone
> 2. kafka_2.11-2.2.0 single
>
> I configured Flink Connectors and Formats jars to $FLINK_HOME/lib .
> Reference:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>
> Then I run flink-sql-submit , sh run.sh q1
> Throw  java.lang.ClassNotFoundException: com.mysql.jdbc.Driver.
>
> My question is:
> I configured mysql-connector-java in the pom.xml file, mvn build jar
> include com.mysql.jdbc.Driver.
> Why is this error still reported? I put the jar package in $FLINK_HOME/lib
> and the problem can be solved.
> Do you need to put these jars in $FLINK_HOME/lib when the project relies
> on too many jar packages?
> If I don't put mysql-connector-java.jar in $FLINK_HOME/lib, how can I
> solve this problem?
>
> Can @Jark Wu  give me some advice? Or can someone give me some advice?
> Thank you.
>
> 1. pom.xml
>
> 
>> mysql
>> mysql-connector-java
>> 5.1.38
>> 
>
> 2. mvn clean; mvn package
>
> $ ll -rth target
>>
>>  [±master ●]
>> total 32312
>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 generated-sources
>> drwxr-xr-x  5 alex  staff   160B Oct 30 11:32 classes
>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 maven-archiver
>> -rw-r--r--  1 alex  staff   7.2M Oct 30 11:32
>> flink-sql-submit-1.0-SNAPSHOT.jar
>> -rw-r--r--  1 alex  staff   8.2M Oct 30 11:32 flink-sql-submit.jar
>>
>
> 3. flink-sql-submit.jar include java.sql.Driver
>
> " zip.vim version v28
>> " Browsing zipfile
>> /Users/alex/IdeaProjects/alex/flink_learn/flink-sql-submit/target/flink-sql-submit.jar
>> " Select a file with cursor and press ENTER
>>
>> META-INF/MANIFEST.MF
>> META-INF/
>> q1.sql
>> user_behavior.log
>> com/
>> com/github/
>> com/github/wuchong/
>> com/github/wuchong/sqlsubmit/
>> com/github/wuchong/sqlsubmit/SqlSubmit$1.class
>> com/github/wuchong/sqlsubmit/SqlSubmit.class
>> com/github/wuchong/sqlsubmit/SourceGenerator.class
>> com/github/wuchong/sqlsubmit/cli/
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommandCall.class
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser.class
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommand.class
>> com/github/wuchong/sqlsubmit/cli/CliOptions.class
>> com/github/wuchong/sqlsubmit/cli/CliOptionsParser.class
>> META-INF/maven/
>> META-INF/maven/com.github.wuchong/
>> META-INF/maven/com.github.wuchong/flink-sql-submit/
>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.xml
>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.properties
>> META-INF/services/
>> META-INF/services/java.sql.Driver
>> com/mysql/
>> com/mysql/fabric/
>> com/mysql/fabric/FabricCommunicationException.class
>> com/mysql/fabric/FabricConnection.class
>> com/mysql/fabric/FabricStateResponse.class
>> com/mysql/fabric/HashShardMapping$ReverseShardIndexSorter.class
>> com/mysql/fabric/HashShardMapping.class
>> com/mysql/fabric/RangeShardMapping$RangeShardIndexSorter.class
>> com/mysql/fabric/RangeShardMapping.class
>> com/mysql/fabric/Response.class
>> com/mysql/fabric/Server.class
>> com/mysql/fabric/ServerGroup.class
>> com/mysql/fabric/ServerMode.class
>> com/mysql/fabric/ServerRole.class
>> etc ...
>>
>
>
>
> $FLINK_DIR/bin/flink run -d -p 3 target/flink-sql-submit.jar -w
> "${PROJECT_DIR}"/src/main/resources/ -f "$1".sql
> Eerror:
> 2019-10-30 10:27:35
> java.lang.IllegalArgumentException: JDBC driver class not found.
> At
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:112)
> At
> org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.open(JDBCUpsertSinkFunction.java:42)
> At
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> At
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> At
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> At
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
> At
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
> At org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> At org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> At java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: 

Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread vino yang
Hi Franke,

>From the information provided by Alex:

>> mvn build jar include com.mysql.jdbc.Driver.

it seems he has packaged a fat jar?

Best,
Vino

Jörn Franke  于2019年10月30日周三 下午2:47写道:

>
>
> You can create a fat jar (also called Uber jar) that includes all
> dependencies in your application jar.
>
> I would avoid to put things in the Flink lib directory as it can make
> maintenance difficult. Eg deployment is challenging, upgrade of flink,
> providing it on new nodes etc.
>
>
> Am 30.10.2019 um 04:46 schrieb Alex Wang :
>
> 
> Hello everyone, I am a newbie.
> I am learning the flink-sql-submit project. From @Jark Wu  :
> https://github.com/wuchong/flink-sql-submit
>
> My local environment is:
> 1. flink1.9.0 standalone
> 2. kafka_2.11-2.2.0 single
>
> I configured Flink Connectors and Formats jars to $FLINK_HOME/lib .
> Reference:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>
> Then I run flink-sql-submit , sh run.sh q1
> Throw  java.lang.ClassNotFoundException: com.mysql.jdbc.Driver.
>
> My question is:
> I configured mysql-connector-java in the pom.xml file, mvn build jar
> include com.mysql.jdbc.Driver.
> Why is this error still reported? I put the jar package in $FLINK_HOME/lib
> and the problem can be solved.
> Do you need to put these jars in $FLINK_HOME/lib when the project relies
> on too many jar packages?
> If I don't put mysql-connector-java.jar in $FLINK_HOME/lib, how can I
> solve this problem?
>
> Can @Jark Wu  give me some advice? Or can someone give me some advice?
> Thank you.
>
> 1. pom.xml
>
> 
>> mysql
>> mysql-connector-java
>> 5.1.38
>> 
>
> 2. mvn clean; mvn package
>
> $ ll -rth target
>>
>>  [±master ●]
>> total 32312
>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 generated-sources
>> drwxr-xr-x  5 alex  staff   160B Oct 30 11:32 classes
>> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 maven-archiver
>> -rw-r--r--  1 alex  staff   7.2M Oct 30 11:32
>> flink-sql-submit-1.0-SNAPSHOT.jar
>> -rw-r--r--  1 alex  staff   8.2M Oct 30 11:32 flink-sql-submit.jar
>>
>
> 3. flink-sql-submit.jar include java.sql.Driver
>
> " zip.vim version v28
>> " Browsing zipfile
>> /Users/alex/IdeaProjects/alex/flink_learn/flink-sql-submit/target/flink-sql-submit.jar
>> " Select a file with cursor and press ENTER
>>
>> META-INF/MANIFEST.MF
>> META-INF/
>> q1.sql
>> user_behavior.log
>> com/
>> com/github/
>> com/github/wuchong/
>> com/github/wuchong/sqlsubmit/
>> com/github/wuchong/sqlsubmit/SqlSubmit$1.class
>> com/github/wuchong/sqlsubmit/SqlSubmit.class
>> com/github/wuchong/sqlsubmit/SourceGenerator.class
>> com/github/wuchong/sqlsubmit/cli/
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommandCall.class
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser.class
>> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommand.class
>> com/github/wuchong/sqlsubmit/cli/CliOptions.class
>> com/github/wuchong/sqlsubmit/cli/CliOptionsParser.class
>> META-INF/maven/
>> META-INF/maven/com.github.wuchong/
>> META-INF/maven/com.github.wuchong/flink-sql-submit/
>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.xml
>> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.properties
>> META-INF/services/
>> META-INF/services/java.sql.Driver
>> com/mysql/
>> com/mysql/fabric/
>> com/mysql/fabric/FabricCommunicationException.class
>> com/mysql/fabric/FabricConnection.class
>> com/mysql/fabric/FabricStateResponse.class
>> com/mysql/fabric/HashShardMapping$ReverseShardIndexSorter.class
>> com/mysql/fabric/HashShardMapping.class
>> com/mysql/fabric/RangeShardMapping$RangeShardIndexSorter.class
>> com/mysql/fabric/RangeShardMapping.class
>> com/mysql/fabric/Response.class
>> com/mysql/fabric/Server.class
>> com/mysql/fabric/ServerGroup.class
>> com/mysql/fabric/ServerMode.class
>> com/mysql/fabric/ServerRole.class
>> etc ...
>>
>
>
>
> $FLINK_DIR/bin/flink run -d -p 3 target/flink-sql-submit.jar -w
> "${PROJECT_DIR}"/src/main/resources/ -f "$1".sql
> Eerror:
> 2019-10-30 10:27:35
> java.lang.IllegalArgumentException: JDBC driver class not found.
> At
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:112)
> At
> org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.open(JDBCUpsertSinkFunction.java:42)
> At
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> At
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> At
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> At
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
> At
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
> At org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> At org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> At java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: 

  1   2   3   4   5   >