Re: Flink client trying to submit jobs to old session cluster (which was killed)

2019-12-12 Thread Yang Wang
Hi Pankaj,

Always using `-yid` to submit a flink job to an existing yarn session
cluster is a safe way. For example, `flink run -d -yid application_1234
examples/streaming/WordCount.jar`. Maybe the magic properties file will be
removed in the future.



Best,

Yang

Pankaj Chand  于2019年12月13日周五 下午1:16写道:

> Vino and Kostas:
>
> Thank you for the info!
>
> I was using Flink 1.9.1 with Pre-bundled Hadoop 2.7.5.
>
> Cloudlab has quarantined my cluster experiment without notice , so I'll
> let you know if and when they allow me to access the files in the future.
>
> regards,
>
> Pankaj
>
> On Thu, Dec 12, 2019 at 8:35 AM Kostas Kloudas  wrote:
>
>> Hi Pankaj,
>>
>> When you start a session cluster with the bin/yarn-session.sh script,
>> Flink will create the cluster and then write a "Yarn Properties file"
>> named ".yarn-properties-YOUR_USER_NAME" in the directory:
>> either the one specified by the option "yarn.properties-file.location"
>> in the flink-conf.yaml or in your local
>> System.getProperty("java.io.tmpdir"). This file will contain the
>> applicationId of the cluster and
>> it will be picked up by any future calls to `flink run`. Could you
>> check if this file exists and if it is updated every time you create a
>> cluster?
>>
>> Thanks,
>> Kostas
>>
>> On Thu, Dec 12, 2019 at 2:22 PM vino yang  wrote:
>> >
>> > Hi Pankaj,
>> >
>> > Can you tell us what's Flink version do you use?  And can you share the
>> Flink client and job manager log with us?
>> >
>> > This information would help us to locate your problem.
>> >
>> > Best,
>> > Vino
>> >
>> > Pankaj Chand  于2019年12月12日周四 下午7:08写道:
>> >>
>> >> Hello,
>> >>
>> >> When using Flink on YARN in session mode, each Flink job client would
>> automatically know the YARN cluster to connect to. It says this somewhere
>> in the documentation.
>> >>
>> >> So, I killed the Flink session cluster by simply killing the YARN
>> application using the "yarn kill" command. However, when starting a new
>> Flink session cluster and trying to submit Flink jobs to yarn-session,
>> Flink complains that the old cluster (it gave the port number and YARN
>> application ID) is not available.
>> >>
>> >> It seems like the details of the old cluster were still stored
>> somewhere in Flink. So, I had to completely replace the Flink folder with a
>> new one.
>> >>
>> >> Does anyone know the proper way to kill a Flink+YARN session cluster
>> to completely remove it so that jobs will get submitted to a new Flink
>> session cluster?
>> >>
>> >> Thanks,
>> >>
>> >> Pankaj
>>
>


Re: Flink client trying to submit jobs to old session cluster (which was killed)

2019-12-12 Thread Pankaj Chand
Vino and Kostas:

Thank you for the info!

I was using Flink 1.9.1 with Pre-bundled Hadoop 2.7.5.

Cloudlab has quarantined my cluster experiment without notice , so I'll
let you know if and when they allow me to access the files in the future.

regards,

Pankaj

On Thu, Dec 12, 2019 at 8:35 AM Kostas Kloudas  wrote:

> Hi Pankaj,
>
> When you start a session cluster with the bin/yarn-session.sh script,
> Flink will create the cluster and then write a "Yarn Properties file"
> named ".yarn-properties-YOUR_USER_NAME" in the directory:
> either the one specified by the option "yarn.properties-file.location"
> in the flink-conf.yaml or in your local
> System.getProperty("java.io.tmpdir"). This file will contain the
> applicationId of the cluster and
> it will be picked up by any future calls to `flink run`. Could you
> check if this file exists and if it is updated every time you create a
> cluster?
>
> Thanks,
> Kostas
>
> On Thu, Dec 12, 2019 at 2:22 PM vino yang  wrote:
> >
> > Hi Pankaj,
> >
> > Can you tell us what's Flink version do you use?  And can you share the
> Flink client and job manager log with us?
> >
> > This information would help us to locate your problem.
> >
> > Best,
> > Vino
> >
> > Pankaj Chand  于2019年12月12日周四 下午7:08写道:
> >>
> >> Hello,
> >>
> >> When using Flink on YARN in session mode, each Flink job client would
> automatically know the YARN cluster to connect to. It says this somewhere
> in the documentation.
> >>
> >> So, I killed the Flink session cluster by simply killing the YARN
> application using the "yarn kill" command. However, when starting a new
> Flink session cluster and trying to submit Flink jobs to yarn-session,
> Flink complains that the old cluster (it gave the port number and YARN
> application ID) is not available.
> >>
> >> It seems like the details of the old cluster were still stored
> somewhere in Flink. So, I had to completely replace the Flink folder with a
> new one.
> >>
> >> Does anyone know the proper way to kill a Flink+YARN session cluster to
> completely remove it so that jobs will get submitted to a new Flink session
> cluster?
> >>
> >> Thanks,
> >>
> >> Pankaj
>


How to understand create watermark for Kafka partitions

2019-12-12 Thread qq
Hi all,

  I confused with watermark for each Kafka partitions.  As I know watermark 
 created by data stream level. But why also say created watermark for each 
Kafka topic partitions ? As I tested, watermarks also created by global, even I 
run my job with parallels. And assign watermarks on Kafka consumer . Thanks .

Below text copied from flink web.


you can use Flink’s Kafka-partition-aware watermark generation. Using that 
feature, watermarks are generated inside the Kafka consumer, per Kafka 
partition, and the per-partition watermarks are merged in the same way as 
watermarks are merged on stream shuffles.

For example, if event timestamps are strictly ascending per Kafka partition, 
generating per-partition watermarks with the ascending timestamps watermark 
generator 

 will result in perfect overall watermarks.

The illustrations below show how to use the per-Kafka-partition watermark 
generation, and how watermarks propagate through the streaming dataflow in that 
case.




Thanks 
Alex Fu

Re: State Processor API: StateMigrationException for keyed state

2019-12-12 Thread vino yang
Hi pwestermann,

Can you share the relevant detailed exception message?

Best,
Vino

pwestermann  于2019年12月13日周五 上午2:00写道:

> I am trying to get the new State Processor API but I am having trouble with
> keyed state (this is for Flink 1.9.1 with RocksDB on S3 as the backend).
> I can read keyed state for simple key type such as Strings but whenever I
> tried to read state with a more complex key type - such as a named tuple
> type (for example ), I get a StateMigrationException:
>
>
>
> Any idea what could be wrong?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-12 Thread Li Peng
Ok I think I identified the issue:

1. I accidentally bundled another version of slf4j in my job jar, which
results in some incompatibility with the slf4j jar bundled with flink/bin.
Apparently slf4j in this case defaults to something that ignores the conf?
Once I removed slf4j from my job jar, the logger properties were properly
consumed.
2. Looks like the line log4j.appender.file.file=${log.file} on the default
properties didn't work properly (resulting in log4j null errors), it
started working after I just set it manually to opt/flink/logs/output.log.

Thanks for your guidance!
Li

On Thu, Dec 12, 2019 at 12:09 PM Li Peng  wrote:

> Hey ouywl, interesting, I figured something like that would happen. I
> actually replaced all the log4j-x files with the same config I originally
> posted, including log4j-console, but that didn't change the behavior either.
>
> Hey Yang, yes I verified the properties files are as I configured, and
> that the logs don't match up with it. Here are the JVM arguments, if that's
> what you were looking for:
>
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM:
> OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.232-b09
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>  Maximum heap size: 989 MiBytes
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>  JAVA_HOME: /usr/local/openjdk-8
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>  Hadoop version: 2.8.3
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
> Options:
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Xms1024m
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Xmx1024m
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>  Program Arguments:
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> --configDir
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> /opt/flink/conf
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Djobmanager.rpc.address=myjob-manager
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dparallelism.default=2
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dblob.server.port=6124
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dqueryable-state.server.ports=6125
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Djobmanager.heap.size=3000m
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dtaskmanager.heap.size=3000m
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dmetrics.reporter.stsd.class=org.apache.flink.metrics.statsd.StatsDReporter
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dmetrics.reporter.stsd.host=myhost.com
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dmetrics.reporter.stsd.port=8125
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dmetrics.system-resource=true
>
> Thanks,
> Li
>
> On Thu, Dec 12, 2019 at 4:40 AM ouywl  wrote:
>
>>  @Li Peng
>>I found your problems.  Your start cmd use args “start-foreground”,
>> It will run “exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME}
>> "${ARGS[@]}””, and In ' flink-console.sh’, the code is “log_setting=(
>> "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties"
>> "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")”
>> . So the log4j.properties not work. It need log4j-console.properties and
>> logback-console.xml.
>>
>> ouywl
>> ou...@139.com
>>
>> 
>> 签名由 网易邮箱大师  定制
>>
>> On 12/12/2019 15:35,ouywl  wrote:
>>
>> HI yang,
>>Could you give more info detail? log4j.properties content, and The k8s
>> yaml. Is use the dockerfile in flink-container? When I test it use the
>> default per-job yaml in flick-container? It is only show logs in docker
>> infos. And not logs in /opt/flink/log.
>>
>> ouywl
>> ou...@139.com
>>
>> 
>> 签名由 网易邮箱大师  定制
>>
>> On 12/12/2019 13:47,Yang Wang
>>  wrote:
>>
>> Hi Peng,
>>
>> What i mean is to use `docker exec` into the 

Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-12 Thread Li Peng
Hey ouywl, interesting, I figured something like that would happen. I
actually replaced all the log4j-x files with the same config I originally
posted, including log4j-console, but that didn't change the behavior either.

Hey Yang, yes I verified the properties files are as I configured, and that
the logs don't match up with it. Here are the JVM arguments, if that's what
you were looking for:

[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM:
OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.232-b09
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
 Maximum heap size: 989 MiBytes
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
 JAVA_HOME: /usr/local/openjdk-8
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Hadoop
version: 2.8.3
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
Options:
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Xms1024m
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Xmx1024m
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
 Program Arguments:
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
--configDir
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
/opt/flink/conf
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Djobmanager.rpc.address=myjob-manager
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dparallelism.default=2
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dblob.server.port=6124
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dqueryable-state.server.ports=6125
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Djobmanager.heap.size=3000m
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dtaskmanager.heap.size=3000m
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dmetrics.reporter.stsd.class=org.apache.flink.metrics.statsd.StatsDReporter
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dmetrics.reporter.stsd.host=myhost.com
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dmetrics.reporter.stsd.port=8125
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dmetrics.system-resource=true

Thanks,
Li

On Thu, Dec 12, 2019 at 4:40 AM ouywl  wrote:

>  @Li Peng
>I found your problems.  Your start cmd use args “start-foreground”, It
> will run “exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "$
> {ARGS[@]}””, and In ' flink-console.sh’, the code is “log_setting=(
> "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties"
> "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")”
> . So the log4j.properties not work. It need log4j-console.properties and
> logback-console.xml.
>
> ouywl
> ou...@139.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 12/12/2019 15:35,ouywl  wrote:
>
> HI yang,
>Could you give more info detail? log4j.properties content, and The k8s
> yaml. Is use the dockerfile in flink-container? When I test it use the
> default per-job yaml in flick-container? It is only show logs in docker
> infos. And not logs in /opt/flink/log.
>
> ouywl
> ou...@139.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 12/12/2019 13:47,Yang Wang
>  wrote:
>
> Hi Peng,
>
> What i mean is to use `docker exec` into the running pod and `ps` to get
> the real
> command that is running for jobmanager.
> Do you have checked the /opt/flink/conf/log4j.properties is right?
>
> I have tested standalone per-job on my kubernetes cluster, the logs show
> up as expected.
>
>
> Best,
> Yang
>
> Li Peng  于2019年12月12日周四 上午2:59写道:
>
>> Hey Yang, here are the commands:
>>
>> "/opt/flink/bin/taskmanager.sh",
>> "start-foreground",
>> "-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager",
>> "-Dtaskmanager.numberOfTaskSlots=1"
>>
>> "/opt/flink/bin/standalone-job.sh",
>> "start-foreground",
>> "-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager",
>> "-Dparallelism.default={{ .Values.task.replicaCount }}"
>>
>> Yes it's very curious that I don't see any logs actually written to
>> /opt/flink/log.
>>
>> On 

State Processor API: StateMigrationException for keyed state

2019-12-12 Thread pwestermann
I am trying to get the new State Processor API but I am having trouble with
keyed state (this is for Flink 1.9.1 with RocksDB on S3 as the backend). 
I can read keyed state for simple key type such as Strings but whenever I
tried to read state with a more complex key type - such as a named tuple
type (for example ), I get a StateMigrationException:



Any idea what could be wrong?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Join a datastream with tables stored in Hive

2019-12-12 Thread Krzysztof Zarzycki
Hello dear Flinkers,
If this kind of question was asked on the groups, I'm sorry for a
duplicate. Feel free to just point me to the thread.
I have to solve a probably pretty common case of joining a datastream to a
dataset.
Let's say I have the following setup:
* I have a high pace stream of events coming in Kafka.
* I have some dimension tables stored in Hive. These tables are changed
daily. I can keep a snapshot for each day.

Now conceptually, I would like to join the stream of incoming events to the
dimension tables (simple hash join). we can consider two cases:
1) simpler, where I join the stream with the most recent version of the
dictionaries. (So the result is accepted to be nondeterministic if the job
is retried).
2) more advanced, where I would like to do temporal join of the stream with
dictionaries snapshots that were valid at the time of the event. (This
result should be deterministic).

The end goal is to do aggregation of that joined stream, store results in
Hive or more real-time analytical store (Druid).

Now, could you please help me understand is any of these cases
implementable with declarative Table/SQL API? With use of temporal joins,
catalogs, Hive integration, JDBC connectors, or whatever beta features
there are now. (I've read quite a lot of Flink docs about each of those,
but I have a problem to compile this information in the final design.)
Could you please help me understand how these components should cooperate?
If that is impossible with Table API, can we come up with the easiest
implementation using Datastream API ?

Thanks a lot for any help!
Krzysztof


Re: Flink ML feature

2019-12-12 Thread Rong Rong
Hi guys,

Yes, as Till mentioned. The community is working on a new ML library and we
are working closely with the Alink project to bring the algorithms.

You can find more information regarding the new ML design architecture in
FLIP-39 [1].
One of the major change is that the new ML library [2] will be based on the
Table API [3], instead of depending on the dataset/datastream API.

I've cc-ed @Xu Yang , who has been a major
contributor to the Alink project to provide more information.

--
Rong

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs
[2] https://github.com/apache/flink/tree/master/flink-ml-parent
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html

On Wed, Dec 11, 2019 at 1:11 AM Till Rohrmann  wrote:

> Hi guys,
>
> it is true that we dropped Flink-ML with 1.9. The reason is that the
> community started working on a new ML library which you can find under
> flink-ml-parent [1]. This module contains the framework for building ML
> pipelines but not yet too many algorithms iirc. The plan is to extend this
> library with algorithms from Alink in the near future to grow Flink's
> machine learning library.
>
> [1] https://github.com/apache/flink/tree/master/flink-ml-parent
>
> Cheers,
> Till
>
> On Wed, Dec 11, 2019 at 3:42 AM vino yang  wrote:
>
>> Hi Benoit,
>>
>> I can only try to ping @Till Rohrmann  @Kurt Young
>>   who may know more information to answer this
>> question.
>>
>> Best,
>> Vino
>>
>> Benoît Paris  于2019年12月10日周二
>> 下午7:06写道:
>>
>>> Is there any information as to whether Alink is going to be contributed
>>> to Apache Flink as the official ML Lib?
>>>
>>>
>>> On Tue, Dec 10, 2019 at 7:11 AM vino yang  wrote:
>>>
 Hi Chandu,

 AFAIK, there is a project named Alink[1] which is the Machine Learning
 algorithm platform based on Flink, developed by the PAI team of Alibaba
 computing platform. FYI

 Best,
 Vino

 [1]: https://github.com/alibaba/Alink

 Tom Blackwood  于2019年12月10日周二 下午2:07写道:

> You may try Spark ML, which is a production ready library for ML stuff.
>
> regards.
>
> On Tue, Dec 10, 2019 at 1:04 PM chandu soa 
> wrote:
>
>> Hello Community,
>>
>> Can you please give me some pointers for implementing Machine
>> Learning using Flink.
>>
>> I see Flink ML libraries were dropped in v1.9. It looks like ML
>> feature in Flink going to be enhanced.
>>
>> What is the recommended approach for implementing production grade ML
>> based apps using Flink? v1.9 is ok?or should wait for 1.10?
>>
>> Thanks,
>> Chandu
>>
>
>>>
>>> --
>>> Benoît Paris
>>> Ingénieur Machine Learning Explicable
>>> Tél : +33 6 60 74 23 00
>>> http://benoit.paris
>>> http://explicable.ml
>>>
>>


Flink1.9.1的SQL向前不兼容的问题

2019-12-12 Thread 李佟
近期进行Flink升级,将原来的程序从老的集群(1.8.0运行正常)迁移到新的集群(1.9.1)中。在部署程序的时候发现在1.9.1的集群中,原来运行正常的Flink
 SQL的程序无法执行,异常如下:




org.apache.flink.table.api.ValidationException: Window can only be defined over 
a time attribute column.
at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)


at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)




跟踪到反编译的scala文件并设置断点,发现下图中的红框部分没有执行,直接跳过。




功能很简单,就是在某个时间窗对数值求和。测试用例如下:




package org.flowmatrix.isp.traffic.accounting.test;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.apache.flink.types.Row;
import org.junit.Test;

import javax.annotation.Nullable;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class TestSql {
@Test
public void testAccountingSql() {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

try {
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);

SimpleTableSource source = new SimpleTableSource();
Table t = tableEnv.fromTableSource(source);

String interval = "5"; //5 second
System.out.println("source schema is " + source.getTableSchema());

Table sqlResult = tableEnv.sqlQuery("SELECT " +
" TUMBLE_START(UserActionTime, INTERVAL '" + interval + "' 
SECOND) as rowTime, " +
" Username," +
" SUM(Data) as Data " +
" FROM  " + t +
" GROUP BY TUMBLE(UserActionTime, INTERVAL '" + interval + 
"' SECOND),Username");


String[] fieldNames = {
"rowTime",
"Username", "Data"};
TypeInformation[] fieldTypes = {
TypeInformation.of(Timestamp.class),
TypeInformation.of(String.class),
TypeInformation.of(Long.class)};

TableSink sink1 = new CsvTableSink("/tmp/data.log", ",");
sink1 = sink1.configure(fieldNames, fieldTypes);
tableEnv.registerTableSink("EsSinkTable", sink1);
System.out.println("sql result schema is " + sqlResult.getSchema());

tableEnv.sqlUpdate("insert into EsSinkTable select  " +
"rowTime,Username,Data from " + sqlResult + "");

env.execute("test");
} catch (Exception e) {
e.printStackTrace();
System.err.println("start program error. FlowMatrix --zookeeper 
 --config " +
" --name  --interval  
--indexName ");
System.err.println(e.toString());
return;
}
}

public static class SimpleTableSource implements StreamTableSource, 
DefinedRowtimeAttributes {
@Override
public DataStream getDataStream(StreamExecutionEnvironment env) {
return 
env.fromCollection(genertateData()).assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks() {
private long lastWaterMarkMillSecond = -1;
private long waterMarkPeriodMillSecond = 1000;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Row lastElement, long 
extractedTimestamp) {
if(extractedTimestamp - lastWaterMarkMillSecond >= 
waterMarkPeriodMillSecond){
lastWaterMarkMillSecond = extractedTimestamp;
return new Watermark(extractedTimestamp);
}
return null;
}

@Override
public long extractTimestamp(Row element, long 
previousElementTimestamp) {
   

Flink 1.9.1版本sql与1.8.x兼容性问题

2019-12-12 Thread 李佟
Hi All:




近期进行Flink升级,将原来的程序从老的集群(1.8.0以及1.8.3运行正常)迁移到新的集群(1.9.1)中。在部署程序的时候发现在1.9.1的集群中,原来运行正常的Flink
 SQL的程序无法执行,异常如下:




org.apache.flink.table.api.ValidationException: Window can only be defined over 
a time attribute column.
at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)


at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)

..







功能很简单,就是在某个时间窗对数值求和。测试用例如下:





import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.apache.flink.types.Row;
import org.junit.Test;

import javax.annotation.Nullable;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class TestSql {
@Test
public void testAccountingSql() {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

try {
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);

SimpleTableSource source = new SimpleTableSource();
Table t = tableEnv.fromTableSource(source);

String interval = "5"; //5 second
System.out.println("source schema is " + source.getTableSchema());

Table sqlResult = tableEnv.sqlQuery("SELECT " +
" TUMBLE_START(UserActionTime, INTERVAL '" + interval + "' 
SECOND) as rowTime, " +
" Username," +
" SUM(Data) as Data " +
" FROM  " + t +
" GROUP BY TUMBLE(UserActionTime, INTERVAL '" + interval + 
"' SECOND),Username");


String[] fieldNames = {
"rowTime",
"Username", "Data"};
TypeInformation[] fieldTypes = {
TypeInformation.of(Timestamp.class),
TypeInformation.of(String.class),
TypeInformation.of(Long.class)};

TableSink sink1 = new CsvTableSink("d:/data.log", ",");

sink1 = sink1.configure(fieldNames, fieldTypes);
tableEnv.registerTableSink("EsSinkTable", sink1);
System.out.println("SQL result schema is " + sqlResult.getSchema());

tableEnv.sqlUpdate("insert into EsSinkTable select " +
" rowTime,Username,Data from " + sqlResult + "");

env.execute("test");
} catch (Exception e) {
e.printStackTrace();
System.err.println("start program error. FlowMatrix --zookeeper 
 --config " +
" --name  --interval  
--indexName ");
System.err.println(e.toString());
return;
}
}

public static class SimpleTableSource implements StreamTableSource, 
DefinedRowtimeAttributes {
@Override
public DataStream getDataStream(StreamExecutionEnvironment env) {
return 
env.fromCollection(genertateData()).assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks() {
private long lastWaterMarkMillSecond = -1;
private long waterMarkPeriodMillSecond = 1000;

@Nullable
@Override
public Watermark checkAndGetNextWatermark(Row lastElement, long 
extractedTimestamp) {
if (extractedTimestamp - lastWaterMarkMillSecond >= 
waterMarkPeriodMillSecond) {
lastWaterMarkMillSecond = extractedTimestamp;
return new Watermark(extractedTimestamp);
}
return null;
}

@Override
public long extractTimestamp(Row element, long 
previousElementTimestamp) {
return ((Timestamp) element.getField(2)).getTime();

Re: Flink 1.9.1 KafkaConnector missing data (1M+ records)

2019-12-12 Thread Kostas Kloudas
Hi Harrison,

Really sorry for the late reply.
Do you have any insight on whether the missing records were read by
the consumer and just the StreamingFileSink failed to write their
offsets, or the Kafka consumer did not even read them or dropped them
for some reason? I asking this in order to narrow down the problem. In
addition, did you see anything out of the ordinary in the logs?

I am also cc'ing Becket who may know a bit more on the kafka consumer
side of things.

Cheers,
Kostas

On Mon, Dec 2, 2019 at 10:00 PM Harrison Xu  wrote:
>
> Thank you for your reply,
>
> Some clarification:
>
> We have configured the BucketAssigner to use the Kafka record timestamp. 
> Exact bucketing behavior as follows:
> private static final DateTimeFormatter formatter = DateTimeFormatter
> .ofPattern("-MM-dd'T'HH");
>
> @Override
> public String getBucketId(KafkaRecord record, BucketAssigner.Context context) 
> {
> return String.format(
> "%s/dt=%s/partition_%s",
> record.getTopic(),
> Instant.ofEpochMilli(record.getTimestamp()).atZone(ZoneOffset.UTC).format(formatter),
> record.getPartition());
> }
>
> For each record, we write only its offset to the S3 object as a sanity check. 
> It is easy to detect missing or duplicate offsets. To answer your questions:
>
> Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
> are entirely skipped?
> No, because even if the producer were idle during these datetimes, we would 
> expect no missing offsets. We observed both millions of missing records, in 
> addition to missing partitions (2019-11-24T01 and 2019-11-24T02). Further, 
> the producer was very active during this time.
> I want to emphasize that we noticed that the consumer for this exact 
> TopicPartition was falling behind (>1 hour lag); this degree of lag was only 
> observed for this partition. (The consumer eventually caught up). It's normal 
> for the consumer to fall behind the producer for short bursts, but we 
> definitely do not expect missing records as a result. There were millions of 
> records whose timestamps fall into (dt 2019-11-24T01 and 2019-11-24T02) - 
> they were entirely skipped by the writer.
>
>
> what does TT stand for?
> It's simply convention for datetime serialization as string.
>
>
> Can it be that there are a lot of events for partition 4 that fill up
> 2 part files for that duration?
> We are using the BulkWriter. I am under the impression that this writer 
> should only produce one file per checkpoint interval, which we have 
> configured to be 5 minutes. You see that the preceding commits follow this 
> pattern of one commit per checkpoint interval, which is what we expect. It's 
> very strange that two files for the same TopicPartition (same TaskManager) 
> are committed.
>
>
> I am eager to hear your reply and understand what we're seeing.
>
> Thanks,
> Harrison
>
> On Thu, Nov 28, 2019 at 6:43 AM Kostas Kloudas  wrote:
>>
>> Hi Harrison,
>>
>> One thing to keep in mind is that Flink will only write files if there
>> is data to write. If, for example, your partition is not active for a
>> period of time, then no files will be written.
>> Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
>> are entirely skipped?
>>
>> In addition, for the "duplicates", it would help if you could share a
>> bit more information about your BucketAssigner.
>> How are these names assigned to the files and what does TT stand for?
>> Can it be that there are a lot of events for partition 4 that fill up
>> 2 part files for that duration? I am
>> asking because the counter of the 2 part files differ.
>>
>> Cheers,
>> Kostas
>>
>> On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu  wrote:
>> >
>> > Hello,
>> >
>> > We're seeing some strange behavior with flink's KafkaConnector010 (Kafka 
>> > 0.10.1.1) arbitrarily skipping data.
>> >
>> > Context
>> > KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter 
>> > (S3) as sink with no intermediate operators. Recently, we noticed that 
>> > millions of Kafka records were missing for one topic partition (this job 
>> > is running for 100+ topic partitions, and such behavior was only observed 
>> > for one). This job is run on YARN, and hosts were healthy with no hardware 
>> > faults observed. No exceptions in jobmanager or taskmanager logs at this 
>> > time.
>> >
>> > How was this detected?
>> > As a sanity check, we dual-write Kafka metadata (offsets) to a separate 
>> > location in S3, and have monitoring to ensure that written offsets are 
>> > contiguous with no duplicates.
>> > Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3.
>> >
>> > (Condensed) Taskmanager logs
>> > 2019-11-24 02:36:50,140 INFO  
>> > org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
>> > kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252 
>> > with MPU ID 3XG...
>> > 2019-11-24 02:41:27,966 INFO  
>> > org.apache.flink.fs.s3.common.writer.S3Committer  - 

Re: Flink client trying to submit jobs to old session cluster (which was killed)

2019-12-12 Thread Kostas Kloudas
Hi Pankaj,

When you start a session cluster with the bin/yarn-session.sh script,
Flink will create the cluster and then write a "Yarn Properties file"
named ".yarn-properties-YOUR_USER_NAME" in the directory:
either the one specified by the option "yarn.properties-file.location"
in the flink-conf.yaml or in your local
System.getProperty("java.io.tmpdir"). This file will contain the
applicationId of the cluster and
it will be picked up by any future calls to `flink run`. Could you
check if this file exists and if it is updated every time you create a
cluster?

Thanks,
Kostas

On Thu, Dec 12, 2019 at 2:22 PM vino yang  wrote:
>
> Hi Pankaj,
>
> Can you tell us what's Flink version do you use?  And can you share the Flink 
> client and job manager log with us?
>
> This information would help us to locate your problem.
>
> Best,
> Vino
>
> Pankaj Chand  于2019年12月12日周四 下午7:08写道:
>>
>> Hello,
>>
>> When using Flink on YARN in session mode, each Flink job client would 
>> automatically know the YARN cluster to connect to. It says this somewhere in 
>> the documentation.
>>
>> So, I killed the Flink session cluster by simply killing the YARN 
>> application using the "yarn kill" command. However, when starting a new 
>> Flink session cluster and trying to submit Flink jobs to yarn-session, Flink 
>> complains that the old cluster (it gave the port number and YARN application 
>> ID) is not available.
>>
>> It seems like the details of the old cluster were still stored somewhere in 
>> Flink. So, I had to completely replace the Flink folder with a new one.
>>
>> Does anyone know the proper way to kill a Flink+YARN session cluster to 
>> completely remove it so that jobs will get submitted to a new Flink session 
>> cluster?
>>
>> Thanks,
>>
>> Pankaj


??????countWindow??????????

2019-12-12 Thread cs
??countWindowgloble window??
kafkakafkawindow??





----
??:"Jimmy Wong"

Re: Flink client trying to submit jobs to old session cluster (which was killed)

2019-12-12 Thread vino yang
Hi Pankaj,

Can you tell us what's Flink version do you use?  And can you share the
Flink client and job manager log with us?

This information would help us to locate your problem.

Best,
Vino

Pankaj Chand  于2019年12月12日周四 下午7:08写道:

> Hello,
>
> When using Flink on YARN in session mode, each Flink job client would
> automatically know the YARN cluster to connect to. It says this somewhere
> in the documentation.
>
> So, I killed the Flink session cluster by simply killing the YARN
> application using the "yarn kill" command. However, when starting a new
> Flink session cluster and trying to submit Flink jobs to yarn-session,
> Flink complains that the old cluster (it gave the port number and YARN
> application ID) is not available.
>
> It seems like the details of the old cluster were still stored somewhere
> in Flink. So, I had to completely replace the Flink folder with a new one.
>
> Does anyone know the proper way to kill a Flink+YARN session cluster to
> completely remove it so that jobs will get submitted to a new Flink session
> cluster?
>
> Thanks,
>
> Pankaj
>


Re: flink savepoint checkpoint

2019-12-12 Thread Congxian Qiu
Flink 也支持从 retained checkpoint 进行恢复,可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
Best,
Congxian


陈帅  于2019年12月11日周三 下午9:34写道:

> flink 1.9里面支持cancel job with savepoint功能
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
> checkpoint可能是增量的,但savepoint是全量的。具体区别可以参考
>
> https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink
>
>
> lucas.wu  于2019年12月11日周三 上午11:56写道:
>
> > hi 各位:
> >
> >
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
>


??????countWindow??????????

2019-12-12 Thread Jimmy Wong
 
??windowwindow??


| |
Jimmy Wong
|
|
wangzmk...@163.com
|
??


??2019??12??12?? 19:07??cs<58683...@qq.com> ??
??countWindow

Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-12 Thread ouywl






 @Li Peng    I found your problems.  Your start cmd use args “start-foreground”, It will run “exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}””, and In ' flink-console.sh’, the code is “log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")” . So the log4j.properties not work. It need log4j-console.properties and logback-console.xml.






  










ouywl




ou...@139.com








签名由
网易邮箱大师
定制

 


On 12/12/2019 15:35,ouywl wrote: 






HI yang,   Could you give more info detail? log4j.properties content, and The k8s yaml. Is use the dockerfile in flink-container? When I test it use the default per-job yaml in flick-container? It is only show logs in docker infos. And not logs in /opt/flink/log.






  










ouywl




ou...@139.com








签名由
网易邮箱大师
定制

 


On 12/12/2019 13:47,Yang Wang wrote: 


Hi Peng,What i mean is to use `docker exec` into the running pod and `ps` to get the realcommand that is running for jobmanager. Do you have checked the /opt/flink/conf/log4j.properties is right?I have tested standalone per-job on my kubernetes cluster, the logs show up as expected.Best,YangLi Peng  于2019年12月12日周四 上午2:59写道:Hey Yang, here are the commands:"/opt/flink/bin/taskmanager.sh","start-foreground","-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager","-Dtaskmanager.numberOfTaskSlots=1""/opt/flink/bin/standalone-job.sh","start-foreground","-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager","-Dparallelism.default={{ .Values.task.replicaCount }}"Yes it's very curious that I don't see any logs actually written to /opt/flink/log. On Tue, Dec 10, 2019 at 11:17 PM Yang Wang  wrote:Could you find the logs under /opt/flink/log/jobmanager.log? If not, please share thecommands the JobManager and TaskManager are using? If the command is correctand the log4j under /opt/flink/conf is expected, it is so curious why we could not get the logs.Best,YangLi Peng  于2019年12月11日周三 下午1:24写道:Ah I see. I think the Flink app is reading files from /opt/flink/conf correctly as it is, since changes I make to flink-conf are picked up as expected, it's just the log4j properties that are either not being used, or don't apply to stdout or whatever source k8 uses for its logs? Given that the pods don't seem to have logs written to file anywhere, contrary to the properties, I'm inclined to say it's the former and that the log4j properties just aren't being picked up. Still have no idea why though.On Tue, Dec 10, 2019 at 6:56 PM Yun Tang  wrote:







Sure, /opt/flink/conf is mounted as a volume from the configmap.
 
Best
Yun Tang
 

From: Li Peng 
Date: Wednesday, December 11, 2019 at 9:37 AM
To: Yang Wang 
Cc: vino yang , user 
Subject: Re: Flink on Kubernetes seems to ignore log4j.properties


 


1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and /opt/flink/bin/taskmanager.sh on my job and task managers respectively. It's based on the setup described here: http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/ .
 I haven't tried the configmap approach yet, does it also replace the conf files in /opt/flink/conf?

2. Hey Vino, here's a sample of the kubernetes: https://pastebin.com/fqJrgjZu  I didn't change any patterns from the default, so the string patterns should look the same, but as you can see it's full of info checkpoint
 logs that I originally was trying to suppress. Based on my log4j.properties, the level should be set to WARN. I couldn't actually find any .out files on the pod, this is from the kubectl logs command. I also didn't see any files in /opt/flink/log, which I
 

Re: Scala case class TypeInformation and Serializer

2019-12-12 Thread Timo Walther

Hi,

the serializers are created from TypeInformation. So you can simply 
inspect the type information. E.g. by using this in the Scala API:


val typeInfo = createTypeInformation[MyClassToAnalyze]

And going through the object using a debugger.

Actually, I don't understand why scala.Tuple2 is treat as a generic 
type. The problem might be that an outer class is treated as a Java POJO 
thus you are leaving the Scala type analyzer stack and switch to Java 
anlayzer stack for fields (currently there is no way back). For 
improving the performance, you have 3 options:


1. Make sure every type is a proper Scala type (all case classes, no POJOs).

2. Use the @TypeInfo annotation for specifying a factory. This has 
highest precedence in all APIs.


3. Register a Kryo serializer in the execution config. This might be the 
easiest way.


I hope this helps.

Regards,
Timo

On 12.12.19 10:38, 杨光 wrote:
Actually the original source code have too many third part classes which 
is hard to simplify , the question I want to ask is there any 
possible for me to find out which is ser/dser by which Serializer 
class,then we can tuning or and customer Serializer to improve performance.


Yun Tang mailto:myas...@live.com>> 于2019年12月12日周 
四 上午12:45写道:


Hi

__ __

Would you please give related code? I think it might due to
insufficient hint to type information.

__ __

Best

Yun Tang

__ __

__ __

__ __

*From: *杨光mailto:laolang...@gmail.com>>
*Date: *Wednesday, December 11, 2019 at 7:20 PM
*To: *user mailto:user@flink.apache.org>>
*Subject: *Scala case class TypeInformation and Serializer

__ __

Hi, I'm working on write a flink stream job with scala api , how
should I find out which class is serialied by flink type serializer
and which is falled back to generic Kryo serializer.
And if one class falls back to Kryo serializer, how can I make some
  extend the TypeInfo classes of Flink or some other customisations
to improve performance.

__ __

below is some errors I got when I set disableGenericTypes,so I know
if will fall back to Kryo

__ __

Exception in thread "main" java.lang.UnsupportedOperationException:
Generic types have been disabled in the ExecutionConfig and type
scala.Tuple2 is treated as a generic type.
at

org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at

org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
at

org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)





countWindow??????????

2019-12-12 Thread cs
??countWindow

Flink client trying to submit jobs to old session cluster (which was killed)

2019-12-12 Thread Pankaj Chand
Hello,

When using Flink on YARN in session mode, each Flink job client would
automatically know the YARN cluster to connect to. It says this somewhere
in the documentation.

So, I killed the Flink session cluster by simply killing the YARN
application using the "yarn kill" command. However, when starting a new
Flink session cluster and trying to submit Flink jobs to yarn-session,
Flink complains that the old cluster (it gave the port number and YARN
application ID) is not available.

It seems like the details of the old cluster were still stored somewhere in
Flink. So, I had to completely replace the Flink folder with a new one.

Does anyone know the proper way to kill a Flink+YARN session cluster to
completely remove it so that jobs will get submitted to a new Flink session
cluster?

Thanks,

Pankaj


Re: Sample Code for querying Flink's default metrics

2019-12-12 Thread Pankaj Chand
Thank you, Chesnay!

On Thu, Dec 12, 2019 at 5:46 AM Chesnay Schepler  wrote:

> Yes, when a cluster was started it takes a few seconds for (any) metrics
> to be available.
>
> On 12/12/2019 11:36, Pankaj Chand wrote:
>
> Hi Vino,
>
> Thank you for the links regarding backpressure!
>
> I am currently using code to get metrics by calling REST API via curl.
> However, many times the REST API via curl gives an empty JSON object/array.
> Piped through JQ (for filtering JSON) it produces a null value. This is
> breaking my code.
> Example in a Yarn cluster session mode, the following metric
> "metrics?get=Status.JVM.CPU.Load" randomly (I think) returns an empty json
> object/array or an actual value.
>
> Is it possible that for CPU Load, the empty JSON object is returned when
> the job is newly started less than 10 seconds ago.
>
> Thanks,
>
> Pankaj
>
>
>
> On Mon, Dec 9, 2019 at 4:21 AM vino yang  wrote:
>
>> Hi Pankaj,
>>
>> > Is there any sample code for how to read such default metrics?  Is
>> there any way to query the default metrics, such as CPU usage and Memory,
>> without using REST API or Reporters?
>>
>> What's your real requirement? Can you use code to call REST API?  Why
>> does it not match your requirements?
>>
>> > Additionally, how do I query Backpressure using code, or is it still
>> only visually available via the dashboard UI? Consequently, is there any
>> way to infer Backpressure by querying one (or more) of the Memory metrics
>> of the TaskManager?
>>
>> The backpressure is related to not only memory metrics but also IO and
>> network metrics, for more details about measure backpressure please see
>> this blog.[1][2]
>>
>> [1]: https://flink.apache.org/2019/06/05/flink-network-stack.html
>> [2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html
>>
>> Best,
>> Vino
>>
>> Pankaj Chand  于2019年12月9日周一 下午12:07写道:
>>
>>> Hello,
>>>
>>> Using Flink on Yarn, I could not understand the documentation for how to
>>> read the default metrics via code. In particular, I want to read
>>> throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and
>>> Memory.
>>>
>>> Is there any sample code for how to read such default metrics?  Is there
>>> any way to query the default metrics, such as CPU usage and Memory, without
>>> using REST API or Reporters?
>>>
>>> Additionally, how do I query Backpressure using code, or is it still
>>> only visually available via the dashboard UI? Consequently, is there any
>>> way to infer Backpressure by querying one (or more) of the Memory metrics
>>> of the TaskManager?
>>>
>>> Thank you,
>>>
>>> Pankaj
>>>
>>
>


Re: Sample Code for querying Flink's default metrics

2019-12-12 Thread Chesnay Schepler
Yes, when a cluster was started it takes a few seconds for (any) metrics 
to be available.


On 12/12/2019 11:36, Pankaj Chand wrote:

Hi Vino,

Thank you for the links regarding backpressure!

I am currently using code to get metrics by calling REST API via curl. 
However, many times the REST API via curl gives an empty JSON 
object/array. Piped through JQ (for filtering JSON) it produces a null 
value. This is breaking my code.
Example in a Yarn cluster session mode, the following metric 
"metrics?get=Status.JVM.CPU.Load" randomly (I think) returns an empty 
json object/array or an actual value.


Is it possible that for CPU Load, the empty JSON object is returned 
when the job is newly started less than 10 seconds ago.


Thanks,

Pankaj


On Mon, Dec 9, 2019 at 4:21 AM vino yang > wrote:


Hi Pankaj,

> Is there any sample code for how to read such default metrics? 
Is there any way to query the default metrics, such as CPU usage
and Memory, without using REST API or Reporters?

What's your real requirement? Can you use code to call REST API? 
Why does it not match your requirements?

> Additionally, how do I query Backpressure using code, or is it
still only visually available via the dashboard UI? Consequently,
is there any way to infer Backpressure by querying one (or more)
of the Memory metrics of the TaskManager?

The backpressure is related to not only memory metrics but also IO
and network metrics, for more details about measure backpressure
please see this blog.[1][2]

[1]: https://flink.apache.org/2019/06/05/flink-network-stack.html
[2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html

Best,
Vino

Pankaj Chand mailto:pankajchanda...@gmail.com>> 于2019年12月9日周一
下午12:07写道:

Hello,

Using Flink on Yarn, I could not understand the documentation
for how to read the default metrics via code. In particular, I
want to read throughput, i.e. CPU usage, Task/Operator's
numRecordsOutPerSecond, and Memory.

Is there any sample code for how to read such default metrics?
Is there any way to query the default metrics, such as CPU
usage and Memory, without using REST API or Reporters?

Additionally, how do I query Backpressure using code, or is it
still only visually available via the dashboard UI?
Consequently, is there any way to infer Backpressure by
querying one (or more) of the Memory metrics of the TaskManager?

Thank you,

Pankaj





Re: Sample Code for querying Flink's default metrics

2019-12-12 Thread Pankaj Chand
Hi Vino,

Thank you for the links regarding backpressure!

I am currently using code to get metrics by calling REST API via curl.
However, many times the REST API via curl gives an empty JSON object/array.
Piped through JQ (for filtering JSON) it produces a null value. This is
breaking my code.
Example in a Yarn cluster session mode, the following metric
"metrics?get=Status.JVM.CPU.Load" randomly (I think) returns an empty json
object/array or an actual value.

Is it possible that for CPU Load, the empty JSON object is returned when
the job is newly started less than 10 seconds ago.

Thanks,

Pankaj



On Mon, Dec 9, 2019 at 4:21 AM vino yang  wrote:

> Hi Pankaj,
>
> > Is there any sample code for how to read such default metrics?  Is there
> any way to query the default metrics, such as CPU usage and Memory, without
> using REST API or Reporters?
>
> What's your real requirement? Can you use code to call REST API?  Why does
> it not match your requirements?
>
> > Additionally, how do I query Backpressure using code, or is it still
> only visually available via the dashboard UI? Consequently, is there any
> way to infer Backpressure by querying one (or more) of the Memory metrics
> of the TaskManager?
>
> The backpressure is related to not only memory metrics but also IO and
> network metrics, for more details about measure backpressure please see
> this blog.[1][2]
>
> [1]: https://flink.apache.org/2019/06/05/flink-network-stack.html
> [2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html
>
> Best,
> Vino
>
> Pankaj Chand  于2019年12月9日周一 下午12:07写道:
>
>> Hello,
>>
>> Using Flink on Yarn, I could not understand the documentation for how to
>> read the default metrics via code. In particular, I want to read
>> throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and
>> Memory.
>>
>> Is there any sample code for how to read such default metrics?  Is there
>> any way to query the default metrics, such as CPU usage and Memory, without
>> using REST API or Reporters?
>>
>> Additionally, how do I query Backpressure using code, or is it still only
>> visually available via the dashboard UI? Consequently, is there any way to
>> infer Backpressure by querying one (or more) of the Memory metrics of the
>> TaskManager?
>>
>> Thank you,
>>
>> Pankaj
>>
>


Re: Scala case class TypeInformation and Serializer

2019-12-12 Thread 杨光
Actually the original source code have too many third part classes which is
hard to simplify , the question I want to ask is there any possible for me
to find out which is ser/dser by which Serializer class,then we can tuning
or and customer Serializer to improve performance.

Yun Tang  于2019年12月12日周四 上午12:45写道:

> Hi
>
>
>
> Would you please give related code? I think it might due to insufficient
> hint to type information.
>
>
>
> Best
>
> Yun Tang
>
>
>
>
>
>
>
> *From: *杨光 
> *Date: *Wednesday, December 11, 2019 at 7:20 PM
> *To: *user 
> *Subject: *Scala case class TypeInformation and Serializer
>
>
>
> Hi, I'm working on write a flink stream job with scala api , how should I
> find out which class is serialied by flink type serializer and which is
> falled back to generic Kryo serializer.
> And if one class falls back to Kryo serializer, how can I make some
>  extend the TypeInfo classes of Flink or some other customisations to
> improve performance.
>
>
>
> below is some errors I got when I set disableGenericTypes,so I know if
> will fall back to Kryo
>
>
>
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Generic types have been disabled in the ExecutionConfig and type
> scala.Tuple2 is treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
>


回复:窗口去重

2019-12-12 Thread Jimmy Wong
谢谢大家,我想到了解决方案:
情景一:可以每来一条数据就Trigger一次计算,然后再Window计算完的时候,清除状态
情景二:确实要等窗口计算完


| |
Jimmy Wong
|
|
wangzmk...@163.com
|
签名由网易邮箱大师定制


在2019年12月11日 16:26,yanggang_it_job 写道:
我觉得可以这样处理:1:首先把你的stream流注册为表(不管是一个还是多个stream)2:然后对这个表使用FLINKSQL进行业务表达3:最后使用FLINK
 
SQL提供的开窗函数指定想要去重的字段注意:控制state的大小参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#deduplication
在 2019-12-11 15:53:00,"Jimmy Wong"  写道:
属于不同的window,是window内去重,window间不去重


| |
Jimmy Wong
|
|
wangzmk...@163.com
|
签名由网易邮箱大师定制


在2019年12月11日 12:08,梁溪 写道:
去重了为什么还会有两个2




| |
梁溪
|
|
邮箱:lx_la...@163.com
|

签名由 网易邮箱大师 定制

在2019年12月11日 11:19,Jimmy Wong 写道:
Hi, Yuan,Youjun 谢谢。 你这种方案是 SQL 的角度吧,如果用 DataStream 算子要怎么处理呢?


| |
Jimmy Wong
|
|
wangzmk...@163.com
|
签名由网易邮箱大师定制


在2019年12月11日 09:04,Yuan,Youjun 写道:
第一种情况,用firstvalue这种聚合函数; 第二种情况,用min聚合函数,然后group by id,是不是就是你要的结果?

-邮件原件-
发件人: Jimmy Wong 
发送时间: Tuesday, December 10, 2019 4:40 PM
收件人: user-zh@flink.apache.org
主题: 窗口去重

Hi,All:
请教一个问题,现在有个实时场景:需要对每 5 分钟内数据进行去重,然后 Sink。
比如:
数据
{ts: 2019-12-10 16:24:00 id: 1}
{ts: 2019-12-10 16:22:00 id: 1}
{ts: 2019-12-10 16:23:00 id: 2}
{ts: 2019-12-10 16:21:00 id: 1}
{ts: 2019-12-10 16:29:00 id: 2}
{ts: 2019-12-10 16:27:00 id: 3}
{ts: 2019-12-10 16:26:00 id: 2}


第一种情景,不考虑时间去重,结果如下:
{ts: 2019-12-10 16:24:00 id: 1}
{ts: 2019-12-10 16:23:00 id: 2}
{ts: 2019-12-10 16:29:00 id: 2}
{ts: 2019-12-10 16:27:00 id: 3}


第二种情景,考虑时间去重,结果如下:
{ts: 2019-12-10 16:21:00 id: 1}
{ts: 2019-12-10 16:23:00 id: 2}
{ts: 2019-12-10 16:26:00 id: 2}
{ts: 2019-12-10 16:27:00 id: 3}


请教下,对于上面两种情景,分别有什么高效实时的解决方案么, 谢谢?我想了一下用 5min 窗口,和 ProcessWindowFunction 可以解决,但是 
ProcessWindowFunction 要缓存 5min 的窗口数据,但是有延迟。




| |
Jimmy Wong
|
|
wangzmk...@163.com
|
签名由网易邮箱大师定制



Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-12 Thread Zhu Zhu
Thanks Hequn for driving the release and everyone who makes this release
possible!

Thanks,
Zhu Zhu

Wei Zhong  于2019年12月12日周四 下午3:45写道:

> Thanks Hequn for being the release manager. Great work!
>
> Best,
> Wei
>
> 在 2019年12月12日,15:27,Jingsong Li  写道:
>
> Thanks Hequn for your driving, 1.8.3 fixed a lot of issues and it is very
> useful to users.
> Great work!
>
> Best,
> Jingsong Lee
>
> On Thu, Dec 12, 2019 at 3:25 PM jincheng sun 
> wrote:
>
>> Thanks for being the release manager and the great work Hequn :)
>> Also thanks to the community making this release possible!
>>
>> Best,
>> Jincheng
>>
>> Jark Wu  于2019年12月12日周四 下午3:23写道:
>>
>>> Thanks Hequn for helping out this release and being the release manager.
>>> Great work!
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  wrote:
>>>
>>> > Great work, Hequn
>>> >
>>> > Dian Fu  于2019年12月12日周四 下午2:32写道:
>>> >
>>> >> Thanks Hequn for being the release manager and everyone who
>>> contributed
>>> >> to this release.
>>> >>
>>> >> Regards,
>>> >> Dian
>>> >>
>>> >> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
>>> >>
>>> >> Hi,
>>> >>
>>> >> The Apache Flink community is very happy to announce the release of
>>> >> Apache Flink 1.8.3, which is the third bugfix release for the Apache
>>> Flink
>>> >> 1.8 series.
>>> >>
>>> >> Apache Flink® is an open-source stream processing framework for
>>> >> distributed, high-performing, always-available, and accurate data
>>> streaming
>>> >> applications.
>>> >>
>>> >> The release is available for download at:
>>> >> https://flink.apache.org/downloads.html
>>> >>
>>> >> Please check out the release blog post for an overview of the
>>> >> improvements for this bugfix release:
>>> >> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>>> >>
>>> >> The full release notes are available in Jira:
>>> >> https://issues.apache.org/jira/projects/FLINK/versions/12346112
>>> >>
>>> >> We would like to thank all contributors of the Apache Flink community
>>> who
>>> >> made this release possible!
>>> >> Great thanks to @Jincheng as a mentor during this release.
>>> >>
>>> >> Regards,
>>> >> Hequn
>>> >>
>>> >>
>>> >>
>>> >
>>> > --
>>> > Best Regards
>>> >
>>> > Jeff Zhang
>>> >
>>>
>>
>
> --
> Best, Jingsong Lee
>
>
>