Re: Flink restart strategy on specific exception

2020-05-12 Thread Zhu Zhu
Hi Ken,

Custom restart-strategy was an experimental feature and was deprecated
since 1.10. [1]
That's why you cannot find any documentation for it.

The old RestartStrategy was deprecated and replaced by
RestartBackoffTimeStrategy since 1.10
(unless you are using the legacy scheduler which was also deprecated).
The new restart strategy, RestartBackoffTimeStrategy, will be able to know
the exact failure cause.
However, the new restart strategy does not support customization at the
moment.
Your requirement sounds reasonable to me and I think custom (new) restart
strategy can be something to support later.

@Till Rohrmann  @Gary Yao  what do
you think?

[1]
https://lists.apache.org/thread.html/6ed95eb6a91168dba09901e158bc1b6f4b08f1e176db4641f79de765%40%3Cdev.flink.apache.org%3E

Thanks,
Zhu Zhu

Ken Krugler  于2020年5月13日周三 上午7:34写道:

> Hi Til,
>
> Sorry, missed the key question…in the RestartStrategy.restart() method, I
> don’t see any good way to get at the underlying exception.
>
> I can cast the RestartCallback to an ExecutionGraphRestartCallback, but I
> still need access to the private execGraph to be able to get at the failure
> info. Is there some other way in the restart handler to get at this?
>
> And yes, I meant to note you’d mentioned the required static method in
> your email, I was asking about documentation for it.
>
> Thanks,
>
> — Ken
>
> ===
> Sorry to resurface an ancient question, but is there a working example
> anywhere of setting a custom restart strategy?
>
> Asking because I’ve been wandering through the Flink 1.9 code base for a
> while, and the restart strategy implementation is…pretty tangled.
>
> From what I’ve been able to figure out, you have to provide a factory
> class, something like this:
>
> Configuration config = new Configuration();
> config.setString(ConfigConstants.RESTART_STRATEGY,
> MyRestartStrategyFactory.class.getCanonicalName());
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(4, config);
>
> That factory class should extend RestartStrategyFactory, but it also needs
> to implement a static method that looks like:
>
> public static MyRestartStrategyFactory
> createFactory(Configuration config) {
> return new MyRestartStrategyFactory();
> }
>
> I wasn’t able to find any documentation that mentioned this particular
> method being a requirement.
>
> And also the documentation at
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance
>  doesn’t
> mention you can set a custom class name for the restart-strategy.
>
> Thanks,
>
> — Ken
>
>
> On Nov 22, 2018, at 8:18 AM, Till Rohrmann  wrote:
>
> Hi Kasif,
>
> I think in this situation it is best if you defined your own custom
> RestartStrategy by specifying a class which has a `RestartStrategyFactory
> createFactory(Configuration configuration)` method as `restart-strategy:
> MyRestartStrategyFactoryFactory` in `flink-conf.yaml`.
>
> Cheers,
> Till
>
> On Thu, Nov 22, 2018 at 7:18 AM Ali, Kasif  wrote:
>
>> Hello,
>>
>>
>>
>> Looking at existing restart strategies they are kind of generic. We have
>> a requirement to restart the job only in case of specific exception/issues.
>>
>> What would be the best way to have a re start strategy which is based on
>> few rules like looking at particular type of exception or some extra
>> condition checks which are application specific.?
>>
>>
>>
>> Just a background on one specific issue which invoked this requirement is
>> slots not getting released when the job finishes. In our applications, we
>> keep track of jobs submitted with the amount of parallelism allotted to
>> it.  Once the job finishes we assume that the slots are free and try to
>> submit next set of jobs which at times fail with error  “not enough slots
>> available”.
>>
>>
>>
>> So we think a job re start can solve this issue but we only want to re
>> start only if this particular situation is encountered.
>>
>>
>>
>> Please let us know If there are better ways to solve this problem other
>> than re start strategy.
>>
>>
>>
>> Thanks,
>>
>> Kasif
>>
>>
>>
>> --
>>
>> Your Personal Data: We may collect and process information about you that
>> may be subject to data protection laws. For more information about how we
>> use and disclose your personal data, how we protect your information, our
>> legal basis to use your information, your rights and who you can contact,
>> please refer to: www.gs.com/privacy-notices
>>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>


How To subscribe a Kinesis Stream using enhance fanout?

2020-05-12 Thread Xiaolong Wang
Hello Flink Community!

  I'm currently coding on a project relying on AWS Kinesis. With the
provided connector (flink-connector-kinesis_2.11;1.10.0), I can consume the
message.

 But as the main stream is used among several other teams, I was
required to use the enhance fanout of Kinesis. I checked the connector code
and found no implementations.

 Has this issue occurred to anyone before?

Thanks for your help.


Re: Flink Streaming Job Tuning help

2020-05-12 Thread Zhijiang
Hi Kumar,

I can give some general ideas for further analysis. 

> We are finding that flink lags seriously behind when we introduce the keyBy 
> (presumably because of shuffle across the network)
The `keyBy` would break the chained operators, so it might bring obvious 
performance sensitive in practice. I guess if your previous way without keyBy 
can make use of chained mechanism, 
the follow-up operator can consume the emitted records from the preceding 
operator directly, no need to involve in buffer serialization-> network shuffle 
-> buffer deserializer processes,
especially your record size 10K is a bit large.

If the keyBy is necessary in your case, then you can further check the current 
bottleneck. E.g. whether there are back pressure which you can monitor from web 
UI. If so, which task is the
bottleneck to cause the back pressure, and you can trace it by network related 
metrics. 

Whether there are data skew in your case, that means some task would process 
more records than others. If so, maybe we can increase the parallelism to 
balance the load.

Best,
Zhijiang
--
From:Senthil Kumar 
Send Time:2020年5月13日(星期三) 00:49
To:user@flink.apache.org 
Subject:Re: Flink Streaming Job Tuning help

I forgot to mention, we are consuming said records from AWS kinesis and writing 
out to S3.

From: Senthil Kumar 
Date: Tuesday, May 12, 2020 at 10:47 AM
To: "user@flink.apache.org" 
Subject: Flink Streaming Job Tuning help

Hello Flink Community!

We have a fairly intensive flink streaming application, processing 8-9 million 
records a minute, with each record being 10k.
One of our steps is a keyBy operation. We are finding that flink lags seriously 
behind when we introduce the keyBy (presumably because of shuffle across the 
network).

We are trying to tune it ourselves (size of nodes, memory, network buffers 
etc), but before we spend way too much time on
this; would it be better to hire some “flink tuning expert” to get us through?

If so what resources are recommended on this list?

Cheers
Kumar



Re: Prometheus Pushgateway Reporter Can not DELETE metrics on pushgateway

2020-05-12 Thread Thomas Huang
I met this issue three months ago. Finally, we got the conclusion that is 
Prometheus push gateway can not handle high throughout metric data. But we 
solved the issue via service discovery. We changed the Prometheus metric 
reporter code, adding the registration logic, so the job can expose the host 
and port on discovery service. And then write a plugin for Prometheus that can 
get the service list to pull the metrics from the Flink jobs.


From: 李佳宸 
Sent: Wednesday, May 13, 2020 11:26:26 AM
To: user@flink.apache.org 
Subject: Prometheus Pushgateway Reporter Can not DELETE metrics on pushgateway

Hi,

I got stuck in using Prometheus,Pushgateway to collect metrics. Here is my 
configuration about reporter:

metrics.reporter.promgateway.class: 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: true

And the version information:
Flink 1.9.1
Prometheus 2.18
PushGateway 1.2 & 0.9 (I had already try them both)

I found that when Flink cluster restart, there showed up metrics which have new 
jobName with random suffix. But there still existed those metrics having 
jobName before restarting cluster(value stop update). Since Prometheus still 
periodically pulled the data in pushgateway, I got a bunch of time series data 
with value unchanged forever.

It looks like:


# HELP flink_jobmanager_Status_JVM_CPU_Load Load (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Load gauge
flink_jobmanager_Status_JVM_CPU_Load{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 0
flink_jobmanager_Status_JVM_CPU_Load{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 0.0006602344673593189
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 4.54512e+09
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 8.24809e+09
# HELP flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded ClassesLoaded 
(scope: jobmanager_Status_JVM_ClassLoader)
# TYPE flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded gauge
flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 5984
flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 6014
# HELP flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded ClassesUnloaded 
(scope: jobmanager_Status_JVM_ClassLoader)
# TYPE flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded gauge
flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 0
flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 0

Ps: This cluster has one JobManager.

In my understanding, when I set metrics.reporter.promgateway.deleteOnShutdown 
to true, the old metrics information should be deleted from pushgateway. But it 
didn’t work somehow.
Is my understanding on these configuration right? Any solution about deleting 
metrics from pushgateway?

Thanks!


回复:changing the output files names in Streamfilesink from part-00 to something else

2020-05-12 Thread Yun Gao
Hi Dhurandar:

Currently StreamingFileSink should be able to change the prefix and suffix 
of the filename[1], it could be changed to something like -0-0. 
Could this solve your problem ?


 Best,
  Yun




[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#part-file-configuration



--
发件人:dhurandar S
日 期:2020年05月13日 05:13:04
收件人:user; 
主 题:changing the output files names in Streamfilesink from part-00 to something 
else

We want to change the name of the file being generated as the output of our 
StreamFileSink. 
, when files are generated they are named part-00*, is there a way that we can 
change the name. 

In Hadoop, we can change RecordWriters and MultipleOutputs. May I please some 
help in this regard. This is causing blockers for us and will force us t move 
to MR job 

-- 
Thank you and regards,
Dhurandar




Prometheus Pushgateway Reporter Can not DELETE metrics on pushgateway

2020-05-12 Thread 李佳宸
Hi,

I got stuck in using Prometheus,Pushgateway to collect metrics. Here is my 
configuration about reporter:

metrics.reporter.promgateway.class: 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: true

And the version information:
Flink 1.9.1
Prometheus 2.18
PushGateway 1.2 & 0.9 (I had already try them both) 

I found that when Flink cluster restart, there showed up metrics which have new 
jobName with random suffix. But there still existed those metrics having 
jobName before restarting cluster(value stop update). Since Prometheus still 
periodically pulled the data in pushgateway, I got a bunch of time series data 
with value unchanged forever. 

It looks like:

# HELP flink_jobmanager_Status_JVM_CPU_Load Load (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Load gauge
flink_jobmanager_Status_JVM_CPU_Load{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 0
flink_jobmanager_Status_JVM_CPU_Load{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 0.0006602344673593189
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 4.54512e+09
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 8.24809e+09
# HELP flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded ClassesLoaded 
(scope: jobmanager_Status_JVM_ClassLoader)
# TYPE flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded gauge
flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 5984
flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 6014
# HELP flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded ClassesUnloaded 
(scope: jobmanager_Status_JVM_ClassLoader)
# TYPE flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded gauge
flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 0
flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 0
Ps: This cluster has one JobManager.

In my understanding, when I set metrics.reporter.promgateway.deleteOnShutdown 
to true, the old metrics information should be deleted from pushgateway. But it 
didn’t work somehow.
Is my understanding on these configuration right? Any solution about deleting 
metrics from pushgateway?

Thanks!

Re: Flink Memory analyze on AWS EMR

2020-05-12 Thread Xintong Song
Hi Jacky,

I don't think ${FLINK_LOG_PREFIX} is available for Flink Yarn deployment.
This is just my guess, that the actual file name becomes ".jit". You can
try to verify that by looking for the hidden file.

If it is indeed this problem, you can try to replace "${FLINK_LOG_PREFIX}"
with "/your-file-name.jit". The token "" should be
replaced with proper log directory path by Yarn automatically.

I noticed that the usage of ${FLINK_LOG_PREFIX} is recommended by Flink's
documentation [1]. This is IMO a bit misleading. I'll try to file an issue
to improve the docs.

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/application_profiling.html#profiling-with-jitwatch

On Wed, May 13, 2020 at 2:45 AM Jacky D  wrote:

> hi, Arvid
>
> thanks for the advice  ,  I removed the quotes and it do created a yarn
> session on EMR , but I didn't find any jit log file generated .
>
> The config with quotes is working on standalone cluster . I also tried to
> dynamic pass the property within the yarn session command :
>
> flink-yarn-session -n 1 -d -nm testSession -yD 
> env.java.opts="-XX:+UnlockDiagnosticVMOptions
> -XX:+TraceClassLoading -XX:+LogCompilation
> -XX:LogFile=${FLINK_LOG_PREFIX}.jit -XX:+PrintAssembly"
>
>
> but get same result , session created , but can not find any jit log file
> under container log .
>
>
> Thanks
>
> Jacky
>
> Arvid Heise  于2020年5月12日周二 下午12:57写道:
>
>> Hi Jacky,
>>
>> I suspect that the quotes are the actual issue. Could you try to remove
>> them? See also [1].
>>
>> [1]
>> http://blogs.perl.org/users/tinita/2018/03/strings-in-yaml---to-quote-or-not-to-quote.html
>>
>> On Tue, May 12, 2020 at 4:03 PM Jacky D  wrote:
>>
>>> hi, Xintong
>>>
>>> Thanks for reply , I attached those lines below for application master
>>> start command :
>>>
>>>
>>> 2020-05-11 21:16:16,635 DEBUG
>>> org.apache.hadoop.util.PerformanceAdvisory- Crypto
>>> codec org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec is not available.
>>> 2020-05-11 21:16:16,635 DEBUG
>>> org.apache.hadoop.util.PerformanceAdvisory- Using
>>> crypto codec org.apache.hadoop.crypto.JceAesCtrCryptoCodec.
>>> 2020-05-11 21:16:16,636 DEBUG org.apache.hadoop.hdfs.DataStreamer
>>>- DataStreamer block
>>> BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315 sending packet
>>> packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false
>>> lastByteOffsetInBlock: 1697
>>> 2020-05-11 21:16:16,637 DEBUG org.apache.hadoop.hdfs.DataStreamer
>>>- DFSClient seqno: 0 reply: SUCCESS
>>> downstreamAckTimeNanos: 0 flag: 0
>>> 2020-05-11 21:16:16,637 DEBUG org.apache.hadoop.hdfs.DataStreamer
>>>- DataStreamer block
>>> BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315 sending packet
>>> packet seqno: 1 offsetInBlock: 1697 lastPacketInBlock: true
>>> lastByteOffsetInBlock: 1697
>>> 2020-05-11 21:16:16,638 DEBUG org.apache.hadoop.hdfs.DataStreamer
>>>- DFSClient seqno: 1 reply: SUCCESS
>>> downstreamAckTimeNanos: 0 flag: 0
>>> 2020-05-11 21:16:16,638 DEBUG org.apache.hadoop.hdfs.DataStreamer
>>>- Closing old block
>>> BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315
>>> 2020-05-11 21:16:16,641 DEBUG org.apache.hadoop.ipc.Client
>>> - IPC Client (1954985045) connection to
>>> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #70
>>> org.apache.hadoop.hdfs.protocol.ClientProtocol.complete
>>> 2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.Client
>>> - IPC Client (1954985045) connection to
>>> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value #70
>>> 2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine
>>>- Call: complete took 2ms
>>> 2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.Client
>>> - IPC Client (1954985045) connection to
>>> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #71
>>> org.apache.hadoop.hdfs.protocol.ClientProtocol.setTimes
>>> 2020-05-11 21:16:16,645 DEBUG org.apache.hadoop.ipc.Client
>>> - IPC Client (1954985045) connection to
>>> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value #71
>>> 2020-05-11 21:16:16,645 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine
>>>- Call: setTimes took 2ms
>>> 2020-05-11 21:16:16,647 DEBUG org.apache.hadoop.ipc.Client
>>> - IPC Client (1954985045) connection to
>>> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #72
>>> org.apache.hadoop.hdfs.protocol.ClientProtocol.setPermission
>>> 2020-05-11 21:16:16,648 DEBUG org.apache.hadoop.ipc.Client
>>> - IPC Client (1954985045) connection to
>>> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value #72
>>> 2020-05-11 21:16:16,648 DEBUG 

Re: Flink Metrics in kubernetes

2020-05-12 Thread Averell
Hi Gary,

Thanks for the help.
Here below is the output from jstack. It seems not being blocked. 



In my JobManager log, there's this WARN, I am not sure whether it's relevant
at all.


Attached is the full jstack dump  k8xDump.txt

 
.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Prometheus Pushgateway 监控 Flink 问题

2020-05-12 Thread 李佳宸
谢谢您的解答!
意思是说deleteOnShutdown其实只是针对flink作业吗?
因为我想实现的是集群stop后,pushgateway里面和flink有关的所有数据会被清除。
原因是开启randomjobnamesuffix后,集群重启会有新的一组jobname,而之前的信息仍然残留在pushgateway里面(数值停止更新),prometheus就会仍然持续产生之前信息的时间序列数据。
这一块有解决的方法吗?还是只能重启pushgateway?

我理解源码这里好像是会根据jobname删除,那应该是集群维度么?

public class PrometheusPushGatewayReporter extends
AbstractPrometheusReporter implements Scheduled {

   private PushGateway pushGateway;
   private String jobName;
   private boolean deleteOnShutdown;

   @Override
   public void open(MetricConfig config) {
  super.open(config);

  String host = config.getString(HOST.key(), HOST.defaultValue());
  int port = config.getInteger(PORT.key(), PORT.defaultValue());
  String configuredJobName = config.getString(JOB_NAME.key(),
JOB_NAME.defaultValue());
  boolean randomSuffix =
config.getBoolean(RANDOM_JOB_NAME_SUFFIX.key(),
RANDOM_JOB_NAME_SUFFIX.defaultValue());
  deleteOnShutdown = config.getBoolean(DELETE_ON_SHUTDOWN.key(),
DELETE_ON_SHUTDOWN.defaultValue());

  if (host == null || host.isEmpty() || port < 1) {
 throw new IllegalArgumentException("Invalid host/port
configuration. Host: " + host + " Port: " + port);
  }

  if (randomSuffix) {
 this.jobName = configuredJobName + new AbstractID();
  } else {
 this.jobName = configuredJobName;
  }

  pushGateway = new PushGateway(host + ':' + port);
  log.info("Configured PrometheusPushGatewayReporter with
{host:{}, port:{}, jobName: {}, randomJobNameSuffix:{},
deleteOnShutdown:{}}", host, port, jobName, randomSuffix,
deleteOnShutdown);
   }

   @Override
   public void report() {
  try {
 pushGateway.push(CollectorRegistry.defaultRegistry, jobName);
  } catch (Exception e) {
 log.warn("Failed to push metrics to PushGateway with jobName
{}.", jobName, e);
  }
   }

   @Override
   public void close() {
  if (deleteOnShutdown && pushGateway != null) {
 try {
pushGateway.delete(jobName);

 } catch (IOException e) {
log.warn("Failed to delete metrics from PushGateway with
jobName {}.", jobName, e);
 }
  }
  super.close();
   }
}


yanggang_it_job  于2020年5月12日周二 下午10:10写道:

> HI 佳宸
> 跟你介绍下这几个参数的目的
> metrics.reporter.promgateway.deleteOnShutdown:这个参数用于控制,当通过stop或者cancel下线一个任务的时候,会把pushgateway内存中缓存的指标进行清理,如果通过yarn
> kill的方式就不会清除
>
> metrics.reporter.promgateway.randomJobNameSuffix:这个参数用于控制在我们定义的jobName后面加一个随机后缀以区别相同任务的不同container的metric_name,否则会出现覆盖写,也就是你描述的那样,指标不全的问题。原理是:当一个任务启动之后至少会有两个container(一个JM和一个TM),每个container都会往pushgateway推送指标,如果不设置这个参数为true的话,会用同一个jobName进行指标推送,那么此时后一个推送的指标就会前一个指标,就会产生一会是JM的指标,一会是TM的指标,所以要加上这个参数,那么每个container的就会不一样,这样就不会覆盖。
>
> 祝好
> 杨纲
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-05-12 18:25:10,"李佳宸"  写道:
> >hi,大家好
> >
> >我在使用Prometheus Pushgateway 监控
> >Flink时,metrics.reporter.promgateway.deleteOnShutdown:
> >true 这一配置失效,
> >Flink集群关闭时,pushgateway中仍然存有metrics数据
> >reporter相关的全部配置为:
> >
> >metrics.reporter.promgateway.class:
> >org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
> >
> >metrics.reporter.promgateway.host: localhost
> >
> >metrics.reporter.promgateway.port: 9091
> >
> >metrics.reporter.promgateway.jobName: myJob
> >
> >metrics.reporter.promgateway.randomJobNameSuffix: *true*
> >
> >metrics.reporter.promgateway.deleteOnShutdown: *true*
> >
> >
> >Flink版本为1.9.1, pushgateway版本0.9 和1.2都尝试过,一样的问题。
> >
> >
> >不知道这是不是bug,
> >
> >有谁有成功的案例么?
> >
> >谢谢
>


Re: Flink restart strategy on specific exception

2020-05-12 Thread Ken Krugler
Hi Til,

Sorry, missed the key question…in the RestartStrategy.restart() method, I don’t 
see any good way to get at the underlying exception.

I can cast the RestartCallback to an ExecutionGraphRestartCallback, but I still 
need access to the private execGraph to be able to get at the failure info. Is 
there some other way in the restart handler to get at this?

And yes, I meant to note you’d mentioned the required static method in your 
email, I was asking about documentation for it.

Thanks,

— Ken

===
Sorry to resurface an ancient question, but is there a working example anywhere 
of setting a custom restart strategy?

Asking because I’ve been wandering through the Flink 1.9 code base for a while, 
and the restart strategy implementation is…pretty tangled.

From what I’ve been able to figure out, you have to provide a factory class, 
something like this:

Configuration config = new Configuration();
config.setString(ConfigConstants.RESTART_STRATEGY, 
MyRestartStrategyFactory.class.getCanonicalName());
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(4, config);

That factory class should extend RestartStrategyFactory, but it also needs to 
implement a static method that looks like:

public static MyRestartStrategyFactory createFactory(Configuration config) {
return new MyRestartStrategyFactory();
}

I wasn’t able to find any documentation that mentioned this particular method 
being a requirement.

And also the documentation at 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance
 

 doesn’t mention you can set a custom class name for the restart-strategy.

Thanks,

— Ken


> On Nov 22, 2018, at 8:18 AM, Till Rohrmann  > wrote:
> 
> Hi Kasif,
> 
> I think in this situation it is best if you defined your own custom 
> RestartStrategy by specifying a class which has a `RestartStrategyFactory 
> createFactory(Configuration configuration)` method as `restart-strategy: 
> MyRestartStrategyFactoryFactory` in `flink-conf.yaml`.
> 
> Cheers,
> Till
> 
> On Thu, Nov 22, 2018 at 7:18 AM Ali, Kasif  > wrote:
> Hello,
> 
>  
> 
> Looking at existing restart strategies they are kind of generic. We have a 
> requirement to restart the job only in case of specific exception/issues.
> 
> What would be the best way to have a re start strategy which is based on few 
> rules like looking at particular type of exception or some extra condition 
> checks which are application specific.?
> 
>  
> 
> Just a background on one specific issue which invoked this requirement is 
> slots not getting released when the job finishes. In our applications, we 
> keep track of jobs submitted with the amount of parallelism allotted to it.  
> Once the job finishes we assume that the slots are free and try to submit 
> next set of jobs which at times fail with error  “not enough slots available”.
> 
>  
> 
> So we think a job re start can solve this issue but we only want to re start 
> only if this particular situation is encountered.
> 
>  
> 
> Please let us know If there are better ways to solve this problem other than 
> re start strategy.
> 
>  
> 
> Thanks,
> 
> Kasif
> 
>  
> 
> 
> 
> Your Personal Data: We may collect and process information about you that may 
> be subject to data protection laws. For more information about how we use and 
> disclose your personal data, how we protect your information, our legal basis 
> to use your information, your rights and who you can contact, please refer 
> to: www.gs.com/privacy-notices 

--
Ken Krugler
http://www.scaleunlimited.com 
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Flink restart strategy on specific exception

2020-05-12 Thread Ken Krugler
Hi Til,

Sorry to resurface an ancient question, but is there a working example anywhere 
of setting a custom restart strategy?

Asking because I’ve been wandering through the Flink 1.9 code base for a while, 
and the restart strategy implementation is…pretty tangled.

From what I’ve been able to figure out, you have to provide a factory class, 
something like this:

Configuration config = new Configuration();
config.setString(ConfigConstants.RESTART_STRATEGY, 
MyRestartStrategyFactory.class.getCanonicalName());
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(4, config);

That factory class should extend RestartStrategyFactory, but it also needs to 
implement a static method that looks like:

public static MyRestartStrategyFactory createFactory(Configuration config) {
return new MyRestartStrategyFactory();
}

I wasn’t able to find any documentation that mentioned this particular method 
being a requirement.

And also the documentation at 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance
 

 doesn’t mention you can set a custom class name for the restart-strategy.

Thanks,

— Ken


> On Nov 22, 2018, at 8:18 AM, Till Rohrmann  wrote:
> 
> Hi Kasif,
> 
> I think in this situation it is best if you defined your own custom 
> RestartStrategy by specifying a class which has a `RestartStrategyFactory 
> createFactory(Configuration configuration)` method as `restart-strategy: 
> MyRestartStrategyFactoryFactory` in `flink-conf.yaml`.
> 
> Cheers,
> Till
> 
> On Thu, Nov 22, 2018 at 7:18 AM Ali, Kasif  > wrote:
> Hello,
> 
>  
> 
> Looking at existing restart strategies they are kind of generic. We have a 
> requirement to restart the job only in case of specific exception/issues.
> 
> What would be the best way to have a re start strategy which is based on few 
> rules like looking at particular type of exception or some extra condition 
> checks which are application specific.?
> 
>  
> 
> Just a background on one specific issue which invoked this requirement is 
> slots not getting released when the job finishes. In our applications, we 
> keep track of jobs submitted with the amount of parallelism allotted to it.  
> Once the job finishes we assume that the slots are free and try to submit 
> next set of jobs which at times fail with error  “not enough slots available”.
> 
>  
> 
> So we think a job re start can solve this issue but we only want to re start 
> only if this particular situation is encountered.
> 
>  
> 
> Please let us know If there are better ways to solve this problem other than 
> re start strategy.
> 
>  
> 
> Thanks,
> 
> Kasif
> 
>  
> 
> 
> 
> Your Personal Data: We may collect and process information about you that may 
> be subject to data protection laws. For more information about how we use and 
> disclose your personal data, how we protect your information, our legal basis 
> to use your information, your rights and who you can contact, please refer 
> to: www.gs.com/privacy-notices 

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Register time attribute while converting a DataStream to Table

2020-05-12 Thread Jiahui Jiang
Hello Flink friends, I have a retract stream in the format of 
'DataStream' that I want to register into my table environment, and also 
expose processing time column in the table.

For a regular datastream, I have being doing 
'tableEnvironment.createTemporaryView(path, dataStream, 'field1,field2, 
..,__processing_time_column.proctime')'. with no issue. But for this retract 
stream, I was getting an error "org.apache.flink.table.api.ValidationException: 
Too many fields referenced from an atomic type."

Digging a little bit deeper, in 
TypeInfoUtils#extractFieldInformation,
 it doesn't handle CRowTypeInfo as a known case. Looking at the behavior of

Since it's a standard CompositeType, instead of only handling 'if (inputType 
instanceof PojoTypeInfo)', can we just add CRowTypeInfo here too? Is there any 
risk that I'm not aware of?

Thank you!


Re: Need suggestion on Flink-Kafka stream processing design

2020-05-12 Thread hemant singh
Hi Arvid,

I don't want to aggregate all events, rather I want to create a record for
a device combining data from multiple events. Each of this event gives me a
metric for a device, so for example if I want one record for device-id=1
the metric will look like metric1, metric2, metric3  where metric1
comes from a event1, metric2 from event2 and likewise
>From each event get latest data to form a kind of snapshot of device
performance across the metrics.

Thanks,
Hemant

On Wed, May 13, 2020 at 1:38 AM Arvid Heise  wrote:

> Hi Hemant,
>
> In general, you want to keep all data coming from one device in one Kafka
> partition, such that the timestamps of that device are monotonically
> increasing. Thus, when processing data from one device, you have ensured
> that no out-of-order events with respect to this device happen.
>
> If you now want to aggregate all events of a given timestamp for a device,
> it is a matter of keying by device id and applying a custom window. There
> is no need for joins.
>
> On Tue, May 12, 2020 at 9:05 PM hemant singh  wrote:
>
>> Hello Flink Users,
>>
>> Any views on this question of mine.
>>
>> Thanks,
>> Hemant
>>
>> On Tue, May 12, 2020 at 7:00 PM hemant singh 
>> wrote:
>>
>>> Hello Roman,
>>>
>>> Thanks for your response.
>>>
>>> I think partitioning you described (event type + protocol type) is
>>> subject to data skew. Including a device ID should solve this problem.
>>> Also, including "protocol_type" into the key and having topic per
>>> protocol_type seems redundant.
>>> Each protocol is in single topic and event_type is key to distribute
>>> data to a specific partition.
>>>
>>> Furthermore, do you have any particular reason to maintain multiple
>>> topics?
>>> I could imagine protocols have different speeds or other
>>> characteristics, so you can tune Flink accordingly.
>>> Otherwise, having a single topic partitioned only by device ID would
>>> simplify deployment and reduce data skew.
>>> Yes, you are right. These protocols have separate characteristics like
>>> speed, data format. If I do have only one topic with data partitioned by
>>> device_id then it could be that events from faster protocol is processed
>>> faster and the joins which I want to do will not have enough matching data.
>>> I have a question here how are you referring to tune Flink to handle
>>> different characteristics like speed of streams as reading from kafka could
>>> result in uneven processing of data?
>>>
>>> > By consume do you mean the downstream system?
>>> My downstream is TSDB and other DBs where the data will be written to.
>>> All these is time-series data.
>>>
>>> Thanks,
>>> Hemant
>>>
>>>
>>>
>>> On Tue, May 12, 2020 at 5:28 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hello Hemant,

 Thanks for your reply.

 I think partitioning you described (event type + protocol type) is
 subject to data skew. Including a device ID should solve this problem.
 Also, including "protocol_type" into the key and having topic per
 protocol_type seems redundant.

 Furthermore, do you have any particular reason to maintain multiple
 topics?
 I could imagine protocols have different speeds or other
 characteristics, so you can tune Flink accordingly.
 Otherwise, having a single topic partitioned only by device ID would
 simplify deployment and reduce data skew.

 > By consume do you mean the downstream system?
 Yes.

 Regards,
 Roman


 On Mon, May 11, 2020 at 11:30 PM hemant singh 
 wrote:

> Hello Roman,
>
> PFB my response -
>
> As I understand, each protocol has a distinct set of event types
> (where event type == metrics type); and a distinct set of devices. Is this
> correct?
> Yes, correct. distinct events and devices. Each device emits these
> event.
>
> > Based on data protocol I have 4-5 topics. Currently the data for a
> single event is being pushed to a partition of the kafka topic(producer 
> key
> -> event_type + data_protocol).
> Here you are talking about the source (to Flink job), right?
> Yes, you are right.
>
> Can you also share how are you going to consume these data?
> By consume do you mean the downstream system?
> If yes then this data will be written to a DB, some metrics goes to
> TSDB(Influx) as well.
>
> Thanks,
> Hemant
>
> On Tue, May 12, 2020 at 2:08 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Hemant,
>>
>> As I understand, each protocol has a distinct set of event types
>> (where event type == metrics type); and a distinct set of devices. Is 
>> this
>> correct?
>>
>> > Based on data protocol I have 4-5 topics. Currently the data for a
>> single event is being pushed to a partition of the kafka topic(producer 
>> key
>> -> event_type + data_protocol).
>> 

Re: Statefun 2.0 questions

2020-05-12 Thread Igal Shilman
Hi Wouter,

Triggering a stateful function from a frontend indeed requires an ingress
between them, so the way you've approached this is also the way we were
thinking of.
As Gordon mentioned a potential improvement might be an HTTP ingress, that
would allow triggering stateful functions directly from the front end
servers.
But this kind of ingress is not implemented yet.

Regarding scaling: Your understanding is correct, you can scale both the
Flink cluster and the remote "python-stateful-function" cluster
independently.
Scaling the Flink cluster, tho, requires taking a savepoint, bumping the
job parallelism, and starting the cluster with more workers from the
savepoint taken previously.

Scaling "python-stateful-function" workers can be done transparently to the
Flink cluster, but the exact details are deployment specific.
- For example the python workers are a k8s service.
- Or the python workers are deployed behind a load balancer
- Or you add new entries to the DNS record of your python worker.

I didn't understand "ensuring that it ends op in the correct Flink job" can
you please clarify?
Flink would be the one contacting the remote workers and not the other way
around. So as long as the new instances
are visible to Flink they would be reached with the same shared state.

I'd recommend watching [1] and the demo at the end, and [2] for a demo
using stateful functions on AWS lambda.

[1] https://youtu.be/NF0hXZfUyqE
[2] https://www.youtube.com/watch?v=tuSylBadNSo

It seems like you are on the correct path!
Good luck!
Igal.


On Tue, May 12, 2020 at 11:18 PM Wouter Zorgdrager 
wrote:

> Hi Igal, all,
>
> In the meantime we found a way to serve Flink stateful functions in a
> frontend. We decided to add another (set of) Flask application(s) which
> link to Kafka topics. These Kafka topics then serve as ingress and egress
> for the statefun cluster. However, we're wondering how we can scale this
> cluster. On the documentation page some nice figures are provided for
> different setups but no implementation details are given. In our case we
> are using a remote cluster so we have a Docker instance containing the
> `python-stateful-function` and of course the Flink cluster containing a
> `master` and `worker`. If I understood correctly, in a remote setting, we
> can scale both the Flink cluster and the `python-stateful-function`.
> Scaling the Flink cluster is trivial because I can add just more
> workers/task-managers (providing more taskslots) just by scaling the worker
> instance. However, how can I scale the stateful function also ensuring that
> it ends op in the correct Flink job (because we need shared state there). I
> tried scaling the Docker instance as well but that didn't seem to work.
>
> Hope you can give me some leads there.
> Thanks in advance!
>
> Kind regards,
> Wouter
>
> Op do 7 mei 2020 om 17:17 schreef Wouter Zorgdrager  >:
>
>> Hi Igal,
>>
>> Thanks for your quick reply. Getting back to point 2, I was wondering if
>> you could trigger indeed a stateful function directly from Flask and also
>> get the reply there instead of using Kafka in between. We want to
>> experiment running stateful functions behind a front-end (which should be
>> able to trigger a function), but we're a bit afraid that using Kafka
>> doesn't scale well if on the frontend side a user has to consume all Kafka
>> messages to find the correct reply/output for a certain request/input. Any
>> thoughts?
>>
>> Thanks in advance,
>> Wouter
>>
>> Op do 7 mei 2020 om 10:51 schreef Igal Shilman :
>>
>>> Hi Wouter!
>>>
>>> Glad to read that you are using Flink for quite some time, and also
>>> exploring with StateFun!
>>>
>>> 1) yes it is correct and you can follow the Dockerhub contribution PR at
>>> [1]
>>>
>>> 2) I’m not sure I understand what do you mean by trigger from the
>>> browser.
>>> If you mean, for testing / illustration purposes triggering the function
>>> independently of StateFun, you would need to write some JavaScript and
>>> preform the POST (assuming CORS are enabled)
>>> Let me know if you’d like getting further information of how to do it.
>>> Broadly speaking, GET is traditionally used to get data from a resource
>>> and POST to send data (the data is the invocation batch in our case).
>>>
>>> One easier walk around for you would be to expose another endpoint in
>>> your Flask application, and call your stateful function directly from there
>>> (possibly populating the function argument with values taken from the query
>>> params)
>>>
>>> 3) I would expect a performance loss when going from the embedded SDK to
>>> the remote one, simply because the remote function is at a different
>>> process, and a round trip is required. There are different ways of
>>> deployment even for remote functions.
>>> For example they can be co-located with the Task managers and
>>> communicate via the loop back device /Unix domain socket, or they can be
>>> deployed behind a load balancer with an auto-scaler, and thus 

Re: Statefun 2.0 questions

2020-05-12 Thread Wouter Zorgdrager
Hi Igal, all,

In the meantime we found a way to serve Flink stateful functions in a
frontend. We decided to add another (set of) Flask application(s) which
link to Kafka topics. These Kafka topics then serve as ingress and egress
for the statefun cluster. However, we're wondering how we can scale this
cluster. On the documentation page some nice figures are provided for
different setups but no implementation details are given. In our case we
are using a remote cluster so we have a Docker instance containing the
`python-stateful-function` and of course the Flink cluster containing a
`master` and `worker`. If I understood correctly, in a remote setting, we
can scale both the Flink cluster and the `python-stateful-function`.
Scaling the Flink cluster is trivial because I can add just more
workers/task-managers (providing more taskslots) just by scaling the worker
instance. However, how can I scale the stateful function also ensuring that
it ends op in the correct Flink job (because we need shared state there). I
tried scaling the Docker instance as well but that didn't seem to work.

Hope you can give me some leads there.
Thanks in advance!

Kind regards,
Wouter

Op do 7 mei 2020 om 17:17 schreef Wouter Zorgdrager :

> Hi Igal,
>
> Thanks for your quick reply. Getting back to point 2, I was wondering if
> you could trigger indeed a stateful function directly from Flask and also
> get the reply there instead of using Kafka in between. We want to
> experiment running stateful functions behind a front-end (which should be
> able to trigger a function), but we're a bit afraid that using Kafka
> doesn't scale well if on the frontend side a user has to consume all Kafka
> messages to find the correct reply/output for a certain request/input. Any
> thoughts?
>
> Thanks in advance,
> Wouter
>
> Op do 7 mei 2020 om 10:51 schreef Igal Shilman :
>
>> Hi Wouter!
>>
>> Glad to read that you are using Flink for quite some time, and also
>> exploring with StateFun!
>>
>> 1) yes it is correct and you can follow the Dockerhub contribution PR at
>> [1]
>>
>> 2) I’m not sure I understand what do you mean by trigger from the browser.
>> If you mean, for testing / illustration purposes triggering the function
>> independently of StateFun, you would need to write some JavaScript and
>> preform the POST (assuming CORS are enabled)
>> Let me know if you’d like getting further information of how to do it.
>> Broadly speaking, GET is traditionally used to get data from a resource
>> and POST to send data (the data is the invocation batch in our case).
>>
>> One easier walk around for you would be to expose another endpoint in
>> your Flask application, and call your stateful function directly from there
>> (possibly populating the function argument with values taken from the query
>> params)
>>
>> 3) I would expect a performance loss when going from the embedded SDK to
>> the remote one, simply because the remote function is at a different
>> process, and a round trip is required. There are different ways of
>> deployment even for remote functions.
>> For example they can be co-located with the Task managers and communicate
>> via the loop back device /Unix domain socket, or they can be deployed
>> behind a load balancer with an auto-scaler, and thus reacting to higher
>> request rate/latency increases by spinning new instances (something that is
>> not yet supported with the embedded API)
>>
>> Good luck,
>> Igal.
>>
>>
>>
>>
>>
>> [1] https://github.com/docker-library/official-images/pull/7749
>>
>>
>> On Wednesday, May 6, 2020, Wouter Zorgdrager 
>> wrote:
>>
>>> Hi all,
>>>
>>> I've been using Flink for quite some time now and for a university
>>> project I'm planning to experiment with statefun. During the walkthrough
>>> I've run into some issues, I hope you can help me with.
>>>
>>> 1) Is it correct that the Docker image of statefun is not yet published?
>>> I couldn't find it anywhere, but was able to run it by building the image
>>> myself.
>>> 2) In the example project using the Python SDK, it uses Flask to expose
>>> a function using POST. Is there also a way to serve GET request so that you
>>> can trigger a stateful function by for instance using your browser?
>>> 3) Do you expect a lot of performance loss when using the Python SDK
>>> over Java?
>>>
>>> Thanks in advance!
>>>
>>> Regards,
>>> Wouter
>>>
>>


changing the output files names in Streamfilesink from part-00 to something else

2020-05-12 Thread dhurandar S
We want to change the name of the file being generated as the output of our
StreamFileSink.
, when files are generated they are named part-00*, is there a way that we
can change the name.

In Hadoop, we can change RecordWriters and MultipleOutputs. May I please
some help in this regard. This is causing blockers for us and will force us
t move to MR job

-- 
Thank you and regards,
Dhurandar


Re: Window processing in Stateful Functions

2020-05-12 Thread Igal Shilman
Hi,
One way to keep the state size under control would be:
1) attach for every incoming edge it's "insertion time" into the vertex
function's state.
2) in addition, the vertex function would send a delayed message, with a
delay of insertion time + expiration duration
3) once a delayed message arrives, iterate over your edge state and remove
all the edges with "insertion time" <= now()

To reduce the number of delayed messages, you can make sure to send a
single delayed message once a fixed expiration interval
(a.k.a. tumbling window).

A better way to deal with that would be to wait until [1] would be
implemented in StateFun (I don't believe it should take more than couple of
weeks)
Then you can simply define your state with expiration and StateFun would
make sure that the edge state would be purged automatically some configured
time
after insertion.

I hope this helps,
Good luck!
Igal.


[1] https://issues.apache.org/jira/browse/FLINK-17644

On Fri, May 8, 2020 at 1:00 PM m@xi  wrote:

> Dear Igal, Very insightful answer. Thanks.
>
> Igal Shilman wrote
> An alternative approach would be to implement a *thumbling window* per
> vertex (a stateful function instance) by sending to itself a delayed
> message [2]. When that specific delayed message arrives you would have to
> purge the oldest edges by examining the edges in state.
>
> Indeed, the delayed asynchronous messages are a workaround for *tumbling
> window* simulation in SF. I believe you assume a message received by a
> stateful function contains multiple edges, i.e. which can all be delayed by
> a certain amount of time. Therefore, when a function receives a message, it
> purges all of its existing edges and incorporates the new (delayed) ones.
> Correct? Nevertheless, if you think of it, the delay is essentially the 
> *window
> slide*. Now, what about the *window range*?
>
> Igal Shilman wrote
> Data stream windows are not yet supported in statefun, but it seems like
> the main motivation here is to purge old edges? If this is the case perhaps
> we need to integrate state TTL [1] into persisted values/persistedtables.
>
> I was not aware about the TTL, very interesting and handful. Essentially,
> the TTL can enforce the *window range* i.e., attach to each tuple
> received by a stateful function its lifespan/duration. So, the first TTL
> attribute sets the range *StateTtlConfig.newBuilder(Time.seconds(window
> range))*. Therefore, by *combining TTL and SF Delayed Messaging* we can 
> *simulate
> sliding window* processing on a stateful function basis. However, TTL is
> a Flink constuct and I am not sure if I got it correctly. You said
>
> Igal Shilman wrote
> If this is the case perhaps *we need to integrate* state TTL [1] into
> persisted values/persistedtables.
>
> If this is the case, then I believe it would be great to integrate TLL
> into Persisted Values/Tables
> .
>
> --
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


Re: Need suggestion on Flink-Kafka stream processing design

2020-05-12 Thread Arvid Heise
Hi Hemant,

In general, you want to keep all data coming from one device in one Kafka
partition, such that the timestamps of that device are monotonically
increasing. Thus, when processing data from one device, you have ensured
that no out-of-order events with respect to this device happen.

If you now want to aggregate all events of a given timestamp for a device,
it is a matter of keying by device id and applying a custom window. There
is no need for joins.

On Tue, May 12, 2020 at 9:05 PM hemant singh  wrote:

> Hello Flink Users,
>
> Any views on this question of mine.
>
> Thanks,
> Hemant
>
> On Tue, May 12, 2020 at 7:00 PM hemant singh  wrote:
>
>> Hello Roman,
>>
>> Thanks for your response.
>>
>> I think partitioning you described (event type + protocol type) is
>> subject to data skew. Including a device ID should solve this problem.
>> Also, including "protocol_type" into the key and having topic per
>> protocol_type seems redundant.
>> Each protocol is in single topic and event_type is key to distribute data
>> to a specific partition.
>>
>> Furthermore, do you have any particular reason to maintain multiple
>> topics?
>> I could imagine protocols have different speeds or other characteristics,
>> so you can tune Flink accordingly.
>> Otherwise, having a single topic partitioned only by device ID would
>> simplify deployment and reduce data skew.
>> Yes, you are right. These protocols have separate characteristics like
>> speed, data format. If I do have only one topic with data partitioned by
>> device_id then it could be that events from faster protocol is processed
>> faster and the joins which I want to do will not have enough matching data.
>> I have a question here how are you referring to tune Flink to handle
>> different characteristics like speed of streams as reading from kafka could
>> result in uneven processing of data?
>>
>> > By consume do you mean the downstream system?
>> My downstream is TSDB and other DBs where the data will be written to.
>> All these is time-series data.
>>
>> Thanks,
>> Hemant
>>
>>
>>
>> On Tue, May 12, 2020 at 5:28 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hello Hemant,
>>>
>>> Thanks for your reply.
>>>
>>> I think partitioning you described (event type + protocol type) is
>>> subject to data skew. Including a device ID should solve this problem.
>>> Also, including "protocol_type" into the key and having topic per
>>> protocol_type seems redundant.
>>>
>>> Furthermore, do you have any particular reason to maintain multiple
>>> topics?
>>> I could imagine protocols have different speeds or other
>>> characteristics, so you can tune Flink accordingly.
>>> Otherwise, having a single topic partitioned only by device ID would
>>> simplify deployment and reduce data skew.
>>>
>>> > By consume do you mean the downstream system?
>>> Yes.
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, May 11, 2020 at 11:30 PM hemant singh 
>>> wrote:
>>>
 Hello Roman,

 PFB my response -

 As I understand, each protocol has a distinct set of event types (where
 event type == metrics type); and a distinct set of devices. Is this 
 correct?
 Yes, correct. distinct events and devices. Each device emits these
 event.

 > Based on data protocol I have 4-5 topics. Currently the data for a
 single event is being pushed to a partition of the kafka topic(producer key
 -> event_type + data_protocol).
 Here you are talking about the source (to Flink job), right?
 Yes, you are right.

 Can you also share how are you going to consume these data?
 By consume do you mean the downstream system?
 If yes then this data will be written to a DB, some metrics goes to
 TSDB(Influx) as well.

 Thanks,
 Hemant

 On Tue, May 12, 2020 at 2:08 AM Khachatryan Roman <
 khachatryan.ro...@gmail.com> wrote:

> Hi Hemant,
>
> As I understand, each protocol has a distinct set of event types
> (where event type == metrics type); and a distinct set of devices. Is this
> correct?
>
> > Based on data protocol I have 4-5 topics. Currently the data for a
> single event is being pushed to a partition of the kafka topic(producer 
> key
> -> event_type + data_protocol).
> Here you are talking about the source (to Flink job), right?
>
> Can you also share how are you going to consume these data?
>
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 8:57 PM hemant singh 
> wrote:
>
>> Hi,
>>
>> I have different events from a device which constitutes different
>> metrics for same device. Each of these event is produced by the device in
>> interval of few milli seconds to a minute.
>>
>> Event1(Device1) -> Stream1 -> Metric 1
>> Event2 (Device1) -> Stream2 -> Metric 2 ...
>> ..
>> ...
>> Event100(Device1) -> Stream100 -> Metric100
>>
>> 

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Nick Bendtner
Hi Gary,
Its because the flink distribution of the cluster is 1.7.2. We use a
standalone cluster , so in the lib directory in flink the artifact is
flink-core-1.7.2.jar . I need to pack flink-core-1.9.0.jar from application
and use child first class loading to use newer version of flink-core. If I
have it as provided scope, sure it will work in IntelliJ but not outside of
it .

Best,
Nick

On Tue, May 12, 2020 at 2:53 PM Gary Yao  wrote:

> Hi Nick,
>
> Can you explain why it is required to package flink-core into your
> application jar? Usually flink-core is a dependency with provided
> scope [1]
>
> Best,
> Gary
>
> [1]
> https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#dependency-scope
>
> On Tue, May 12, 2020 at 5:41 PM Nick Bendtner  wrote:
> >
> > Hi Gary,
> > Thanks for the info. I am aware this feature is available in 1.9.0
> onwards. Our cluster is still very old and have CICD challenges,I was
> hoping not to bloat up the application jar by packaging even flink-core
> with it. If its not possible to do this with older version without writing
> our own kafka sink implementation similar to the flink provided version in
> 1.9.0 then I think we will pack flink-core 1.9.0 with the application and
> follow the approach that you suggested. Thanks again for getting back to me
> so quickly.
> >
> > Best,
> > Nick
> >
> > On Tue, May 12, 2020 at 3:37 AM Gary Yao  wrote:
> >>
> >> Hi Nick,
> >>
> >> Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you can
> use
> >> KafkaSerializationSchema to produce a ProducerRecord [1][2].
> >>
> >> Best,
> >> Gary
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-11693
> >> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html
> >>
> >> On Mon, May 11, 2020 at 10:59 PM Nick Bendtner 
> wrote:
> >> >
> >> > Hi guys,
> >> > I use 1.8.0 version for flink-connector-kafka. Do you have any
> recommendations on how to produce a ProducerRecord from a kafka sink.
> Looking to add support to kafka headers therefore thinking about
> ProducerRecord. If you have any thoughts its highly appreciated.
> >> >
> >> > Best,
> >> > Nick.
>


Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Gary Yao
Hi Nick,

Can you explain why it is required to package flink-core into your
application jar? Usually flink-core is a dependency with provided
scope [1]

Best,
Gary

[1] 
https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#dependency-scope

On Tue, May 12, 2020 at 5:41 PM Nick Bendtner  wrote:
>
> Hi Gary,
> Thanks for the info. I am aware this feature is available in 1.9.0 onwards. 
> Our cluster is still very old and have CICD challenges,I was hoping not to 
> bloat up the application jar by packaging even flink-core with it. If its not 
> possible to do this with older version without writing our own kafka sink 
> implementation similar to the flink provided version in 1.9.0 then I think we 
> will pack flink-core 1.9.0 with the application and follow the approach that 
> you suggested. Thanks again for getting back to me so quickly.
>
> Best,
> Nick
>
> On Tue, May 12, 2020 at 3:37 AM Gary Yao  wrote:
>>
>> Hi Nick,
>>
>> Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you can use
>> KafkaSerializationSchema to produce a ProducerRecord [1][2].
>>
>> Best,
>> Gary
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11693
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html
>>
>> On Mon, May 11, 2020 at 10:59 PM Nick Bendtner  wrote:
>> >
>> > Hi guys,
>> > I use 1.8.0 version for flink-connector-kafka. Do you have any 
>> > recommendations on how to produce a ProducerRecord from a kafka sink. 
>> > Looking to add support to kafka headers therefore thinking about 
>> > ProducerRecord. If you have any thoughts its highly appreciated.
>> >
>> > Best,
>> > Nick.


Re: Flink Metrics in kubernetes

2020-05-12 Thread Gary Yao
Hi Averell,

If you are seeing the log message from [1] and Scheduled#report() is
not called, the thread in the "Flink-MetricRegistry" thread pool might
be blocked. You can use the jstack utility to see on which task the
thread pool is blocked.

Best,
Gary

[1] 
https://github.com/apache/flink/blob/e346215edcf2252cc60c5cef507ea77ce2ac9aca/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java#L141

On Tue, May 12, 2020 at 4:32 PM Averell  wrote:
>
> Hi,
>
> I'm trying to config Flink running in Kubernetes native to push some metrics
> to NewRelic (using a custom ScheduledDropwizardReporter).
>
> From the logs, I could see that an instance of ScheduledDropwizardReporter
> has already been created successfully (the overridden  getReporter() method
> 
> was called).
> An instance of  MetricRegistryImpl
> 
> also created successfully (this log was shown: /Periodically reporting
> metrics in intervals of 30 SECONDS for reporter my_newrelic_reporter/)
>
> However, the  report() method
> 
> was not called.
>
> When running on my laptop, there's no issue at all.
> Are there any special things that I need to care for when running in
> Kubernetes?
>
> Thanks a lot.
>
> Regards,
> Averell
>
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Need suggestion on Flink-Kafka stream processing design

2020-05-12 Thread hemant singh
Hello Flink Users,

Any views on this question of mine.

Thanks,
Hemant

On Tue, May 12, 2020 at 7:00 PM hemant singh  wrote:

> Hello Roman,
>
> Thanks for your response.
>
> I think partitioning you described (event type + protocol type) is subject
> to data skew. Including a device ID should solve this problem.
> Also, including "protocol_type" into the key and having topic per
> protocol_type seems redundant.
> Each protocol is in single topic and event_type is key to distribute data
> to a specific partition.
>
> Furthermore, do you have any particular reason to maintain multiple
> topics?
> I could imagine protocols have different speeds or other characteristics,
> so you can tune Flink accordingly.
> Otherwise, having a single topic partitioned only by device ID would
> simplify deployment and reduce data skew.
> Yes, you are right. These protocols have separate characteristics like
> speed, data format. If I do have only one topic with data partitioned by
> device_id then it could be that events from faster protocol is processed
> faster and the joins which I want to do will not have enough matching data.
> I have a question here how are you referring to tune Flink to handle
> different characteristics like speed of streams as reading from kafka could
> result in uneven processing of data?
>
> > By consume do you mean the downstream system?
> My downstream is TSDB and other DBs where the data will be written to. All
> these is time-series data.
>
> Thanks,
> Hemant
>
>
>
> On Tue, May 12, 2020 at 5:28 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hello Hemant,
>>
>> Thanks for your reply.
>>
>> I think partitioning you described (event type + protocol type) is
>> subject to data skew. Including a device ID should solve this problem.
>> Also, including "protocol_type" into the key and having topic per
>> protocol_type seems redundant.
>>
>> Furthermore, do you have any particular reason to maintain multiple
>> topics?
>> I could imagine protocols have different speeds or other characteristics,
>> so you can tune Flink accordingly.
>> Otherwise, having a single topic partitioned only by device ID would
>> simplify deployment and reduce data skew.
>>
>> > By consume do you mean the downstream system?
>> Yes.
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, May 11, 2020 at 11:30 PM hemant singh 
>> wrote:
>>
>>> Hello Roman,
>>>
>>> PFB my response -
>>>
>>> As I understand, each protocol has a distinct set of event types (where
>>> event type == metrics type); and a distinct set of devices. Is this correct?
>>> Yes, correct. distinct events and devices. Each device emits these event.
>>>
>>> > Based on data protocol I have 4-5 topics. Currently the data for a
>>> single event is being pushed to a partition of the kafka topic(producer key
>>> -> event_type + data_protocol).
>>> Here you are talking about the source (to Flink job), right?
>>> Yes, you are right.
>>>
>>> Can you also share how are you going to consume these data?
>>> By consume do you mean the downstream system?
>>> If yes then this data will be written to a DB, some metrics goes to
>>> TSDB(Influx) as well.
>>>
>>> Thanks,
>>> Hemant
>>>
>>> On Tue, May 12, 2020 at 2:08 AM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi Hemant,

 As I understand, each protocol has a distinct set of event types (where
 event type == metrics type); and a distinct set of devices. Is this 
 correct?

 > Based on data protocol I have 4-5 topics. Currently the data for a
 single event is being pushed to a partition of the kafka topic(producer key
 -> event_type + data_protocol).
 Here you are talking about the source (to Flink job), right?

 Can you also share how are you going to consume these data?


 Regards,
 Roman


 On Mon, May 11, 2020 at 8:57 PM hemant singh 
 wrote:

> Hi,
>
> I have different events from a device which constitutes different
> metrics for same device. Each of these event is produced by the device in
> interval of few milli seconds to a minute.
>
> Event1(Device1) -> Stream1 -> Metric 1
> Event2 (Device1) -> Stream2 -> Metric 2 ...
> ..
> ...
> Event100(Device1) -> Stream100 -> Metric100
>
> The number of events can go up to few 100s for each data protocol and
> we have around 4-5 data protocols. Metrics from different streams makes up
> a records
> like for example from above example for device 1 -
>
> Device1 -> Metric1, Metric 2, Metric15 forms a single record for the
> device. Currently in development phase I am using interval join to achieve
> this, that is to create a record with latest data from different
> streams(events).
>
> Based on data protocol I have 4-5 topics. Currently the data for a
> single event is being pushed to a partition of the kafka topic(producer 
> key
> -> event_type + 

Incremental state with purging

2020-05-12 Thread Annemarie Burger
Hi,

I'm trying to implement the most efficient way to incrementally put incoming
DataStream elements in my (map)state, while removing old elements (older
that x) from that same state. I then want to output the state every y
seconds. I've looked into using the ProcessFunction with onTimer, or
building my own Trigger for a window function, but I struggle with putting
all this together in a logical and efficient way. Since the state is very
big I don't want to duplicate it over multiple (sliding)windows. Does
anybody know the best way to achieve this? Some pseudo code would be very
helpful. 

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Memory analyze on AWS EMR

2020-05-12 Thread Jacky D
hi, Arvid

thanks for the advice  ,  I removed the quotes and it do created a yarn
session on EMR , but I didn't find any jit log file generated .

The config with quotes is working on standalone cluster . I also tried to
dynamic pass the property within the yarn session command :

flink-yarn-session -n 1 -d -nm testSession -yD
env.java.opts="-XX:+UnlockDiagnosticVMOptions
-XX:+TraceClassLoading -XX:+LogCompilation
-XX:LogFile=${FLINK_LOG_PREFIX}.jit -XX:+PrintAssembly"


but get same result , session created , but can not find any jit log file
under container log .


Thanks

Jacky

Arvid Heise  于2020年5月12日周二 下午12:57写道:

> Hi Jacky,
>
> I suspect that the quotes are the actual issue. Could you try to remove
> them? See also [1].
>
> [1]
> http://blogs.perl.org/users/tinita/2018/03/strings-in-yaml---to-quote-or-not-to-quote.html
>
> On Tue, May 12, 2020 at 4:03 PM Jacky D  wrote:
>
>> hi, Xintong
>>
>> Thanks for reply , I attached those lines below for application master
>> start command :
>>
>>
>> 2020-05-11 21:16:16,635 DEBUG org.apache.hadoop.util.PerformanceAdvisory
>>   - Crypto codec
>> org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec is not available.
>> 2020-05-11 21:16:16,635 DEBUG org.apache.hadoop.util.PerformanceAdvisory
>>   - Using crypto codec
>> org.apache.hadoop.crypto.JceAesCtrCryptoCodec.
>> 2020-05-11 21:16:16,636 DEBUG org.apache.hadoop.hdfs.DataStreamer
>>- DataStreamer block
>> BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315 sending packet
>> packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false
>> lastByteOffsetInBlock: 1697
>> 2020-05-11 21:16:16,637 DEBUG org.apache.hadoop.hdfs.DataStreamer
>>- DFSClient seqno: 0 reply: SUCCESS
>> downstreamAckTimeNanos: 0 flag: 0
>> 2020-05-11 21:16:16,637 DEBUG org.apache.hadoop.hdfs.DataStreamer
>>- DataStreamer block
>> BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315 sending packet
>> packet seqno: 1 offsetInBlock: 1697 lastPacketInBlock: true
>> lastByteOffsetInBlock: 1697
>> 2020-05-11 21:16:16,638 DEBUG org.apache.hadoop.hdfs.DataStreamer
>>- DFSClient seqno: 1 reply: SUCCESS
>> downstreamAckTimeNanos: 0 flag: 0
>> 2020-05-11 21:16:16,638 DEBUG org.apache.hadoop.hdfs.DataStreamer
>>- Closing old block
>> BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315
>> 2020-05-11 21:16:16,641 DEBUG org.apache.hadoop.ipc.Client
>>   - IPC Client (1954985045) connection to
>> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #70
>> org.apache.hadoop.hdfs.protocol.ClientProtocol.complete
>> 2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.Client
>>   - IPC Client (1954985045) connection to
>> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value #70
>> 2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine
>>- Call: complete took 2ms
>> 2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.Client
>>   - IPC Client (1954985045) connection to
>> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #71
>> org.apache.hadoop.hdfs.protocol.ClientProtocol.setTimes
>> 2020-05-11 21:16:16,645 DEBUG org.apache.hadoop.ipc.Client
>>   - IPC Client (1954985045) connection to
>> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value #71
>> 2020-05-11 21:16:16,645 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine
>>- Call: setTimes took 2ms
>> 2020-05-11 21:16:16,647 DEBUG org.apache.hadoop.ipc.Client
>>   - IPC Client (1954985045) connection to
>> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #72
>> org.apache.hadoop.hdfs.protocol.ClientProtocol.setPermission
>> 2020-05-11 21:16:16,648 DEBUG org.apache.hadoop.ipc.Client
>>   - IPC Client (1954985045) connection to
>> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value #72
>> 2020-05-11 21:16:16,648 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine
>>- Call: setPermission took 2ms
>> 2020-05-11 21:16:16,654 DEBUG
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Application
>> Master start command: $JAVA_HOME/bin/java -Xmx424m
>> "-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading -XX:+LogCompilation
>> -XX:LogFile=${FLINK_LOG_PREFIX}.jit -XX:+PrintAssembly"
>> -Dlog.file="/jobmanager.log"
>> -Dlog4j.configuration=file:log4j.properties
>> org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint  1>
>> /jobmanager.out 2> /jobmanager.err
>> 2020-05-11 21:16:16,654 DEBUG org.apache.hadoop.ipc.Client
>>   - stopping client from cache:
>> org.apache.hadoop.ipc.Client@28194a50
>> 2020-05-11 21:16:16,656 DEBUG
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor$ApplicationSubmissionContextReflector
>> - 

Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-12 Thread Peter Groesbeck
Robert, Yun, and Theo,

Thanks for the responses! I'm very much looking forward to upgrading once
those changes are made.

I hacked this in myself but likely in a much less elegant way than Theo.
For anybody who is curious - I extended the DateTimeBucketAssigner class
and overrode the getBucketId method so that I could use the AWS SDK to
directly write an empty success file whenever a new bucket was opened. I
keep a Map of the last bucket that I've already written to, as well as the
current bucket, so that I don't overwhelm S3 with requests (getBucketId
appears to be called quite frequently).

It's far from ideal but it's performing quite nicely and generally I'm not
seeing any significant race conditions between files closing in the
previous bucket and opening in the next. I of course only write the
_SUCCESS file to the previous bucket not the current one.

I will keep an eye on that ticket, and very much appreciate the effort!

Warm regards,
Peter

On Tue, May 12, 2020 at 11:00 AM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Yun,
>
> For me, that sounds quite nice. I implemented the same for my application
> a few weeks ago, but of course tailored only to my app.
> What I did:
> 1. I wrapped the Parquet-StreamingFileSink into a Process-Function.
> 2. I extended the default ProcessOperator and instead of
> "notifyCheckpointComplete(long checkpointId)", I provided my
> WrappedProcessFunction a "notifyCheckpointComplete(checkointId,
> lastCommitWatermark)".
> 3. I added a custom sink with parallelism 1 behind the
> WrappedProcessFunction.
> 4. From my WrappedProcessFunction, in notifyCheckpointComplete, I send a
> message downstream to the parallelism 1 sink containing data about which
> partitions were written to between in the phase to the last checkpoint.
> 5. In the parallelism 1 sink, I make sure that I get the messages from all
> upstream task (Give the constructor an int parameter telling it the
> parallelism of the WrappedProcessFunction) and then perform my parallelism
> 1 operation, in my case, telling Impala which partitions were added or got
> new data. Luckily, in case of Impala, that operation can be made idempotent
> so I only needed to make sure that I have an at least once processing from
> the state perspective here.
>
> I had to go for notifyCheckpointComplete as only there, the parquet files
> are ultimately committed and thus available for spark, impala and so on.
>
> So if you go on with that issue, I'd be really happy to be able to
> customize the solution and e.g. get rid of my custom setup by only
> specifiying kind of a lambda function which should be run with parallelism
> 1 and update impala. That function would however still need the info which
> partitions were updated/added.
> And in my case, I was not really interested in the watermark (I sent it
> downstream only for metric purposes) but want to tell impala after each
> commit which partitions changed, regardless of the value from the watermark.
>
> Best regards
> Theo
>
> --
> *Von: *"Yun Gao" 
> *An: *"Robert Metzger" , "Jingsong Li" <
> jingsongl...@gmail.com>
> *CC: *"Peter Groesbeck" , "user" <
> user@flink.apache.org>
> *Gesendet: *Dienstag, 12. Mai 2020 10:36:59
> *Betreff: *回复:Re: Writing _SUCCESS Files (Streaming and Batch)
>
> Hi Peter,
>
> Sorry for missing the question and response later, I'm currently
> sworking together with Jingsong on the issue to support "global committing"
> (like writing _SUCCESS file or adding partitions to hive store) after
> buckets terminated. In 1.11 we may first support watermark/time related
> buckets in Table/SQL API, and we are also thinking of supporting "global
> committing" for arbitrary bucket assigner policy for StreamingFileSink
> users. The current rough thought is to let users specify when a bucket is
> terminated on a single task, and the OperatorCoordinator[1] of the sink
> will aggreate the information from all subtasks about this bucket and do
> the global committing if the bucket has been finished on all the subtasks,
> but this is still under thinking and discussion. Any thoughts or
> requirements on this issue are warmly welcome.
>
> Best,
>  Yun
>
>
> [1] OperatorCoordinator is introduced in FLIP-27:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface.
> This is a component resides in JobManager and could communicate with all
> the subtasks of the corresponding operator, thus it could be used to
> aggregate status from subtasks.
>
> --原始邮件 --
> *发件人:*Robert Metzger 
> *发送时间:*Tue May 12 15:36:26 2020
> *收件人:*Jingsong Li 
> *抄送:*Peter Groesbeck , user <
> user@flink.apache.org>
> *主题:*Re: Writing _SUCCESS Files (Streaming and Batch)
>
>> Hi Peter,
>> I filed a ticket for this feature request:
>> https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your
>> thoughts / requirements to the ticket)
>>
>> Best,
>> Robert
>>
>>
>> On Wed, 

Re: Flink Memory analyze on AWS EMR

2020-05-12 Thread Arvid Heise
Hi Jacky,

I suspect that the quotes are the actual issue. Could you try to remove
them? See also [1].

[1]
http://blogs.perl.org/users/tinita/2018/03/strings-in-yaml---to-quote-or-not-to-quote.html

On Tue, May 12, 2020 at 4:03 PM Jacky D  wrote:

> hi, Xintong
>
> Thanks for reply , I attached those lines below for application master
> start command :
>
>
> 2020-05-11 21:16:16,635 DEBUG org.apache.hadoop.util.PerformanceAdvisory
>   - Crypto codec
> org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec is not available.
> 2020-05-11 21:16:16,635 DEBUG org.apache.hadoop.util.PerformanceAdvisory
>   - Using crypto codec
> org.apache.hadoop.crypto.JceAesCtrCryptoCodec.
> 2020-05-11 21:16:16,636 DEBUG org.apache.hadoop.hdfs.DataStreamer
>  - DataStreamer block
> BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315 sending packet
> packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false
> lastByteOffsetInBlock: 1697
> 2020-05-11 21:16:16,637 DEBUG org.apache.hadoop.hdfs.DataStreamer
>  - DFSClient seqno: 0 reply: SUCCESS
> downstreamAckTimeNanos: 0 flag: 0
> 2020-05-11 21:16:16,637 DEBUG org.apache.hadoop.hdfs.DataStreamer
>  - DataStreamer block
> BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315 sending packet
> packet seqno: 1 offsetInBlock: 1697 lastPacketInBlock: true
> lastByteOffsetInBlock: 1697
> 2020-05-11 21:16:16,638 DEBUG org.apache.hadoop.hdfs.DataStreamer
>  - DFSClient seqno: 1 reply: SUCCESS
> downstreamAckTimeNanos: 0 flag: 0
> 2020-05-11 21:16:16,638 DEBUG org.apache.hadoop.hdfs.DataStreamer
>  - Closing old block
> BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315
> 2020-05-11 21:16:16,641 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1954985045) connection to
> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #70
> org.apache.hadoop.hdfs.protocol.ClientProtocol.complete
> 2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1954985045) connection to
> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value #70
> 2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine
>  - Call: complete took 2ms
> 2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1954985045) connection to
> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #71
> org.apache.hadoop.hdfs.protocol.ClientProtocol.setTimes
> 2020-05-11 21:16:16,645 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1954985045) connection to
> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value #71
> 2020-05-11 21:16:16,645 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine
>  - Call: setTimes took 2ms
> 2020-05-11 21:16:16,647 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1954985045) connection to
> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #72
> org.apache.hadoop.hdfs.protocol.ClientProtocol.setPermission
> 2020-05-11 21:16:16,648 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1954985045) connection to
> ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value #72
> 2020-05-11 21:16:16,648 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine
>  - Call: setPermission took 2ms
> 2020-05-11 21:16:16,654 DEBUG
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Application
> Master start command: $JAVA_HOME/bin/java -Xmx424m
> "-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading -XX:+LogCompilation
> -XX:LogFile=${FLINK_LOG_PREFIX}.jit -XX:+PrintAssembly"
> -Dlog.file="/jobmanager.log"
> -Dlog4j.configuration=file:log4j.properties
> org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint  1>
> /jobmanager.out 2> /jobmanager.err
> 2020-05-11 21:16:16,654 DEBUG org.apache.hadoop.ipc.Client
>   - stopping client from cache:
> org.apache.hadoop.ipc.Client@28194a50
> 2020-05-11 21:16:16,656 DEBUG
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$ApplicationSubmissionContextReflector
> - org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext supports
> method setApplicationTags.
> 2020-05-11 21:16:16,656 DEBUG
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$ApplicationSubmissionContextReflector
> - org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext supports
> method setAttemptFailuresValidityInterval.
> 2020-05-11 21:16:16,656 DEBUG
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$ApplicationSubmissionContextReflector
> - org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext supports
> method setKeepContainersAcrossApplicationAttempts.
> 2020-05-11 21:16:16,656 DEBUG
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$ApplicationSubmissionContextReflector
> - 

Re: Flink Streaming Job Tuning help

2020-05-12 Thread Senthil Kumar
I forgot to mention, we are consuming said records from AWS kinesis and writing 
out to S3.

From: Senthil Kumar 
Date: Tuesday, May 12, 2020 at 10:47 AM
To: "user@flink.apache.org" 
Subject: Flink Streaming Job Tuning help

Hello Flink Community!

We have a fairly intensive flink streaming application, processing 8-9 million 
records a minute, with each record being 10k.
One of our steps is a keyBy operation. We are finding that flink lags seriously 
behind when we introduce the keyBy (presumably because of shuffle across the 
network).

We are trying to tune it ourselves (size of nodes, memory, network buffers 
etc), but before we spend way too much time on
this; would it be better to hire some “flink tuning expert” to get us through?

If so what resources are recommended on this list?

Cheers
Kumar


Flink Streaming Job Tuning help

2020-05-12 Thread Senthil Kumar
Hello Flink Community!

We have a fairly intensive flink streaming application, processing 8-9 million 
records a minute, with each record being 10k.
One of our steps is a keyBy operation. We are finding that flink lags seriously 
behind when we introduce the keyBy (presumably because of shuffle across the 
network).

We are trying to tune it ourselves (size of nodes, memory, network buffers 
etc), but before we spend way too much time on
this; would it be better to hire some “flink tuning expert” to get us through?

If so what resources are recommended on this list?

Cheers
Kumar


Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-12 Thread Theo Diefenthal
Hi Yun, 

For me, that sounds quite nice. I implemented the same for my application a few 
weeks ago, but of course tailored only to my app. 
What I did: 
1. I wrapped the Parquet-StreamingFileSink into a Process-Function. 
2. I extended the default ProcessOperator and instead of 
"notifyCheckpointComplete(long checkpointId)", I provided my 
WrappedProcessFunction a "notifyCheckpointComplete(checkointId, 
lastCommitWatermark)". 
3. I added a custom sink with parallelism 1 behind the WrappedProcessFunction. 
4. From my WrappedProcessFunction, in notifyCheckpointComplete, I send a 
message downstream to the parallelism 1 sink containing data about which 
partitions were written to between in the phase to the last checkpoint. 
5. In the parallelism 1 sink, I make sure that I get the messages from all 
upstream task (Give the constructor an int parameter telling it the parallelism 
of the WrappedProcessFunction) and then perform my parallelism 1 operation, in 
my case, telling Impala which partitions were added or got new data. Luckily, 
in case of Impala, that operation can be made idempotent so I only needed to 
make sure that I have an at least once processing from the state perspective 
here. 

I had to go for notifyCheckpointComplete as only there, the parquet files are 
ultimately committed and thus available for spark, impala and so on. 

So if you go on with that issue, I'd be really happy to be able to customize 
the solution and e.g. get rid of my custom setup by only specifiying kind of a 
lambda function which should be run with parallelism 1 and update impala. That 
function would however still need the info which partitions were updated/added. 
And in my case, I was not really interested in the watermark (I sent it 
downstream only for metric purposes) but want to tell impala after each commit 
which partitions changed, regardless of the value from the watermark. 

Best regards 
Theo 


Von: "Yun Gao"  
An: "Robert Metzger" , "Jingsong Li" 
 
CC: "Peter Groesbeck" , "user" 
 
Gesendet: Dienstag, 12. Mai 2020 10:36:59 
Betreff: 回复:Re: Writing _SUCCESS Files (Streaming and Batch) 

Hi Peter, 

Sorry for missing the question and response later, I'm currently sworking 
together with Jingsong on the issue to support "global committing" (like 
writing _SUCCESS file or adding partitions to hive store) after buckets 
terminated. In 1.11 we may first support watermark/time related buckets in 
Table/SQL API, and we are also thinking of supporting "global committing" for 
arbitrary bucket assigner policy for StreamingFileSink users. The current rough 
thought is to let users specify when a bucket is terminated on a single task, 
and the OperatorCoordinator[1] of the sink will aggreate the information from 
all subtasks about this bucket and do the global committing if the bucket has 
been finished on all the subtasks, but this is still under thinking and 
discussion. Any thoughts or requirements on this issue are warmly welcome. 

Best, 
Yun 


[1] OperatorCoordinator is introduced in FLIP-27: [ 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 | 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 ] . This is a component resides in JobManager and could communicate with all 
the subtasks of the corresponding operator, thus it could be used to aggregate 
status from subtasks. 




--原始邮件 -- 
发件人: Robert Metzger  
发送时间: Tue May 12 15:36:26 2020 
收件人: Jingsong Li  
抄送: Peter Groesbeck , user  
主题: Re: Writing _SUCCESS Files (Streaming and Batch) 

BQ_BEGIN

Hi Peter, 
I filed a ticket for this feature request: [ 
https://issues.apache.org/jira/browse/FLINK-17627 | 
https://issues.apache.org/jira/browse/FLINK-17627 ] (feel free to add your 
thoughts / requirements to the ticket) 

Best, 
Robert 


On Wed, May 6, 2020 at 3:41 AM Jingsong Li < [ mailto:jingsongl...@gmail.com | 
jingsongl...@gmail.com ] > wrote: 

BQ_BEGIN

Hi Peter, 
The troublesome is how to know the "ending" for a bucket in streaming job. 
In 1.11, we are trying to implement a watermark-related bucket ending 
mechanism[1] in Table/SQL. 

[1] [ 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
 | 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
 ] 

Best, 
Jingsong Lee 

On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck < [ 
mailto:peter.groesb...@gmail.com | peter.groesb...@gmail.com ] > wrote: 

BQ_BEGIN

I am replacing an M/R job with a Streaming job using the StreamingFileSink and 
there is a requirement to generate an empty _SUCCESS file like the old Hadoop 
job. I have to implement a similar Batch job to read from backup files in case 
of outages or downtime. 

The Batch job question was answered here and appears to be still relevant 
although if someone could confirm for me that would be great. 
[ https://stackoverflow.com/a/39413810 | 

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Nick Bendtner
Hi Gary,
Thanks for the info. I am aware this feature is available in 1.9.0 onwards.
Our cluster is still very old and have CICD challenges,I was hoping not to
bloat up the application jar by packaging even flink-core with it. If its
not possible to do this with older version without writing our own kafka
sink implementation similar to the flink provided version in 1.9.0 then I
think we will pack flink-core 1.9.0 with the application and follow the
approach that you suggested. Thanks again for getting back to me so
quickly.

Best,
Nick

On Tue, May 12, 2020 at 3:37 AM Gary Yao  wrote:

> Hi Nick,
>
> Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you can use
> KafkaSerializationSchema to produce a ProducerRecord [1][2].
>
> Best,
> Gary
>
> [1] https://issues.apache.org/jira/browse/FLINK-11693
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html
>
> On Mon, May 11, 2020 at 10:59 PM Nick Bendtner  wrote:
> >
> > Hi guys,
> > I use 1.8.0 version for flink-connector-kafka. Do you have any
> recommendations on how to produce a ProducerRecord from a kafka sink.
> Looking to add support to kafka headers therefore thinking about
> ProducerRecord. If you have any thoughts its highly appreciated.
> >
> > Best,
> > Nick.
>


Re:回复:flink10读取kafka报错

2020-05-12 Thread PCL



感谢回复!
这个很神奇的是,执行sqlquery就没问题
/*Table tb1 =tableEnv.sqlQuery("select sum(amount),TUMBLE_END(proctime, 
INTERVAL '5' SECOND)" +
" from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND) ");
tb1.printSchema();*/
放开注释后,打印的schema是
root
 |-- EXPR$0: DOUBLE
 |-- EXPR$1: TIMESTAMP(3)











在 2020-05-12 22:36:17,"忝忝向仧" <153488...@qq.com> 写道:
>TIMESTAMP(3)时间格式不对吧
>
>
>--原始邮件--
>发件人:"PCL"发送时间:2020年5月12日(星期二) 晚上9:43
>收件人:"user-zh"
>主题:flink10读取kafka报错
>
>
>
>各位大佬:
>有没有遇到过这个问题,Window aggregate can only be defined over a time attribute 
>column, but TIMESTAMP(3) encountered.
>无论是事件时间还是处理时间,都报这个错;flink和blink的planner报错差不多。
>版本如下:
>代码如下:
>//获取运行环境
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings settings = 
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>//EnvironmentSettings settings = 
>EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
> //创建一个tableEnvironment
>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
>
>Schema schema = new Schema()
>//.field("id", "VARCHAR").from("id")
>.field("id", "STRING")
>//.field("name", "VARCHAR")
>.field("amount", "DOUBLE")
>
> .field("proctime", Types.SQL_TIMESTAMP).proctime()
>//.field("rowtime", Types.SQL_TIMESTAMP)
>
> //.rowtime(
>
> // new Rowtime()
>
> 
>//
> .timestampsFromField(
>
> 
>//
> "eventtime")
>
> 
>//
> .watermarksPeriodicBounded(2000))
>;
>
>// "0.8", "0.9", "0.10", "0.11", and "universal"
>tableEnv.connect(new Kafka().version("universal")
>
> .topic("source0511")
>
> .property("zookeeper.connect", "172.16.44.28:7758")
>
> .property("bootstrap.servers", "172.16.44.28:9096")
>
> .property("group.id", "source0511-group")
>
> .startFromEarliest()
>
> )
>
> .withFormat(new Csv())
>
> .withSchema(schema)
>
> .inAppendMode()
>
> .createTemporaryTable("sourceTable");
>
>tableEnv.connect(
>new Kafka()
>
> .version("universal")
>// "0.8", "0.9", "0.10", "0.11", and "universal"
>.topic("sink0511")
>
> .property("acks", "all")
>
> .property("retries", "0")
>
> .property("batch.size", "16384")
>
> .property("linger.ms", "10")
>
> .property("zookeeper.connect", "172.16.44.28:7758")
>
> .property("bootstrap.servers", "172.16.44.28:9096")
>
> .sinkPartitionerFixed())
>
> .inAppendMode()
>
> .withFormat(new Json())
>
> .withSchema(
>new Schema().field("totalamount", "DOUBLE")
>//.field("total", "INT")
>.field("time", Types.SQL_TIMESTAMP)
>
> )
>
> .createTemporaryTable("sinkTable");
>
>tableEnv.sqlUpdate("insert into sinkTable"
>+ " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) "
>+ "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)");
>//SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT 
>user_name)
> // FROM user_actions
> // GROUP BY TUMBLE(user_action_time, 
>INTERVAL '10' MINUTE);
>env.execute("test");


回复:Re:Prometheus Pushgateway 监控 Flink 问题

2020-05-12 Thread 972684638
谢谢杨纲的解答。
我之前是简单粗暴地把flink-metrcs-prometheus包中的推送到数据到pushgateway的方式改成了post,重新打包,这样jm和tm的数据就不会互相覆盖了。看来还是只知其一,不知其二。
看到你对metrics.reporter.promgateway.randomJobNameSuffix的解释,才明白了原因。



---原始邮件---
发件人: "yanggang_it_job"

??????flink10????kafka????

2020-05-12 Thread ????????
TIMESTAMP(3)??


----
??:"PCL"

Flink Metrics in kubernetes

2020-05-12 Thread Averell
Hi,

I'm trying to config Flink running in Kubernetes native to push some metrics
to NewRelic (using a custom ScheduledDropwizardReporter).

>From the logs, I could see that an instance of ScheduledDropwizardReporter
has already been created successfully (the overridden  getReporter() method

  
was called).
An instance of  MetricRegistryImpl

  
also created successfully (this log was shown: /Periodically reporting
metrics in intervals of 30 SECONDS for reporter my_newrelic_reporter/)

However, the  report() method

  
was not called.

When running on my laptop, there's no issue at all.
Are there any special things that I need to care for when running in
Kubernetes?

Thanks a lot.

Regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re:Prometheus Pushgateway 监控 Flink 问题

2020-05-12 Thread yanggang_it_job
HI 佳宸
跟你介绍下这几个参数的目的
metrics.reporter.promgateway.deleteOnShutdown:这个参数用于控制,当通过stop或者cancel下线一个任务的时候,会把pushgateway内存中缓存的指标进行清理,如果通过yarn
 kill的方式就不会清除
metrics.reporter.promgateway.randomJobNameSuffix:这个参数用于控制在我们定义的jobName后面加一个随机后缀以区别相同任务的不同container的metric_name,否则会出现覆盖写,也就是你描述的那样,指标不全的问题。原理是:当一个任务启动之后至少会有两个container(一个JM和一个TM),每个container都会往pushgateway推送指标,如果不设置这个参数为true的话,会用同一个jobName进行指标推送,那么此时后一个推送的指标就会前一个指标,就会产生一会是JM的指标,一会是TM的指标,所以要加上这个参数,那么每个container的就会不一样,这样就不会覆盖。

祝好
杨纲

















在 2020-05-12 18:25:10,"李佳宸"  写道:
>hi,大家好
>
>我在使用Prometheus Pushgateway 监控
>Flink时,metrics.reporter.promgateway.deleteOnShutdown:
>true 这一配置失效,
>Flink集群关闭时,pushgateway中仍然存有metrics数据
>reporter相关的全部配置为:
>
>metrics.reporter.promgateway.class:
>org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
>
>metrics.reporter.promgateway.host: localhost
>
>metrics.reporter.promgateway.port: 9091
>
>metrics.reporter.promgateway.jobName: myJob
>
>metrics.reporter.promgateway.randomJobNameSuffix: *true*
>
>metrics.reporter.promgateway.deleteOnShutdown: *true*
>
>
>Flink版本为1.9.1, pushgateway版本0.9 和1.2都尝试过,一样的问题。
>
>
>不知道这是不是bug,
>
>有谁有成功的案例么?
>
>谢谢


Re: Flink Memory analyze on AWS EMR

2020-05-12 Thread Jacky D
hi, Xintong

Thanks for reply , I attached those lines below for application master
start command :


2020-05-11 21:16:16,635 DEBUG org.apache.hadoop.util.PerformanceAdvisory
- Crypto codec
org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec is not available.
2020-05-11 21:16:16,635 DEBUG org.apache.hadoop.util.PerformanceAdvisory
- Using crypto codec
org.apache.hadoop.crypto.JceAesCtrCryptoCodec.
2020-05-11 21:16:16,636 DEBUG org.apache.hadoop.hdfs.DataStreamer
 - DataStreamer block
BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315 sending packet
packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false
lastByteOffsetInBlock: 1697
2020-05-11 21:16:16,637 DEBUG org.apache.hadoop.hdfs.DataStreamer
 - DFSClient seqno: 0 reply: SUCCESS
downstreamAckTimeNanos: 0 flag: 0
2020-05-11 21:16:16,637 DEBUG org.apache.hadoop.hdfs.DataStreamer
 - DataStreamer block
BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315 sending packet
packet seqno: 1 offsetInBlock: 1697 lastPacketInBlock: true
lastByteOffsetInBlock: 1697
2020-05-11 21:16:16,638 DEBUG org.apache.hadoop.hdfs.DataStreamer
 - DFSClient seqno: 1 reply: SUCCESS
downstreamAckTimeNanos: 0 flag: 0
2020-05-11 21:16:16,638 DEBUG org.apache.hadoop.hdfs.DataStreamer
 - Closing old block
BP-1519523618-98.94.65.144-1581106168138:blk_1073745139_4315
2020-05-11 21:16:16,641 DEBUG org.apache.hadoop.ipc.Client
- IPC Client (1954985045) connection to
ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #70
org.apache.hadoop.hdfs.protocol.ClientProtocol.complete
2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.Client
- IPC Client (1954985045) connection to
ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value #70
2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine
 - Call: complete took 2ms
2020-05-11 21:16:16,643 DEBUG org.apache.hadoop.ipc.Client
- IPC Client (1954985045) connection to
ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #71
org.apache.hadoop.hdfs.protocol.ClientProtocol.setTimes
2020-05-11 21:16:16,645 DEBUG org.apache.hadoop.ipc.Client
- IPC Client (1954985045) connection to
ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value #71
2020-05-11 21:16:16,645 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine
 - Call: setTimes took 2ms
2020-05-11 21:16:16,647 DEBUG org.apache.hadoop.ipc.Client
- IPC Client (1954985045) connection to
ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop sending #72
org.apache.hadoop.hdfs.protocol.ClientProtocol.setPermission
2020-05-11 21:16:16,648 DEBUG org.apache.hadoop.ipc.Client
- IPC Client (1954985045) connection to
ip-98-94-65-144.ec2.internal/98.94.65.144:8020 from hadoop got value #72
2020-05-11 21:16:16,648 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine
 - Call: setPermission took 2ms
2020-05-11 21:16:16,654 DEBUG
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Application
Master start command: $JAVA_HOME/bin/java -Xmx424m
"-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading -XX:+LogCompilation
-XX:LogFile=${FLINK_LOG_PREFIX}.jit -XX:+PrintAssembly"
-Dlog.file="/jobmanager.log"
-Dlog4j.configuration=file:log4j.properties
org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint  1>
/jobmanager.out 2> /jobmanager.err
2020-05-11 21:16:16,654 DEBUG org.apache.hadoop.ipc.Client
- stopping client from cache:
org.apache.hadoop.ipc.Client@28194a50
2020-05-11 21:16:16,656 DEBUG
org.apache.flink.yarn.AbstractYarnClusterDescriptor$ApplicationSubmissionContextReflector
- org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext supports
method setApplicationTags.
2020-05-11 21:16:16,656 DEBUG
org.apache.flink.yarn.AbstractYarnClusterDescriptor$ApplicationSubmissionContextReflector
- org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext supports
method setAttemptFailuresValidityInterval.
2020-05-11 21:16:16,656 DEBUG
org.apache.flink.yarn.AbstractYarnClusterDescriptor$ApplicationSubmissionContextReflector
- org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext supports
method setKeepContainersAcrossApplicationAttempts.
2020-05-11 21:16:16,656 DEBUG
org.apache.flink.yarn.AbstractYarnClusterDescriptor$ApplicationSubmissionContextReflector
- org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext supports
method setNodeLabelExpression.

Xintong Song  于2020年5月11日周一 下午10:11写道:

> Hi Jacky,
>
> Could you search for "Application Master start command:" in the debug log
> and post the result and a few lines before & after that? This is not
> included in the clip of attached log file.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, May 12, 2020 at 5:33 AM Jacky D  wrote:
>
>> hi, Robert
>>
>> Thanks so much for quick reply 

flink10读取kafka报错

2020-05-12 Thread PCL
各位大佬:
 有没有遇到过这个问题,Window aggregate can only be defined over a time attribute column, 
but TIMESTAMP(3) encountered.
无论是事件时间还是处理时间,都报这个错;flink和blink的planner报错差不多。
版本如下:
1.10.0
2.11
代码如下:
//获取运行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
//创建一个tableEnvironment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

Schema schema = new Schema()
//.field("id", "VARCHAR").from("id")
.field("id", "STRING")
//.field("name", "VARCHAR")
.field("amount", "DOUBLE")
.field("proctime", Types.SQL_TIMESTAMP).proctime()
//.field("rowtime", Types.SQL_TIMESTAMP)
//.rowtime(
//new Rowtime()
//.timestampsFromField(
   // "eventtime")
//.watermarksPeriodicBounded(2000))
;

//   "0.8", "0.9", "0.10", "0.11", and "universal"
tableEnv.connect(new Kafka().version("universal")
.topic("source0511")
.property("zookeeper.connect", 
"172.16.44.28:7758")
.property("bootstrap.servers", 
"172.16.44.28:9096")
.property("group.id", "source0511-group")
.startFromEarliest()
)
.withFormat(new Csv())
.withSchema(schema)
.inAppendMode()
.createTemporaryTable("sourceTable");

tableEnv.connect(
new Kafka()
.version("universal")
// "0.8", "0.9", "0.10", "0.11", and "universal"
.topic("sink0511")
.property("acks", "all")
.property("retries", "0")
.property("batch.size", "16384")
.property("linger.ms", "10")
.property("zookeeper.connect", "172.16.44.28:7758")
.property("bootstrap.servers", "172.16.44.28:9096")
.sinkPartitionerFixed())
.inAppendMode()
.withFormat(new Json())
.withSchema(
new Schema().field("totalamount", "DOUBLE")
//.field("total", "INT")
.field("time", Types.SQL_TIMESTAMP)
)
.createTemporaryTable("sinkTable");

tableEnv.sqlUpdate("insert into sinkTable"
+ " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) "
+ "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)");
//SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT 
user_name)
   // FROM user_actions
   // GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
env.execute("test");

Re: Need suggestion on Flink-Kafka stream processing design

2020-05-12 Thread hemant singh
Hello Roman,

Thanks for your response.

I think partitioning you described (event type + protocol type) is subject
to data skew. Including a device ID should solve this problem.
Also, including "protocol_type" into the key and having topic per
protocol_type seems redundant.
Each protocol is in single topic and event_type is key to distribute data
to a specific partition.

Furthermore, do you have any particular reason to maintain multiple topics?
I could imagine protocols have different speeds or other characteristics,
so you can tune Flink accordingly.
Otherwise, having a single topic partitioned only by device ID would
simplify deployment and reduce data skew.
Yes, you are right. These protocols have separate characteristics like
speed, data format. If I do have only one topic with data partitioned by
device_id then it could be that events from faster protocol is processed
faster and the joins which I want to do will not have enough matching data.
I have a question here how are you referring to tune Flink to handle
different characteristics like speed of streams as reading from kafka could
result in uneven processing of data?

> By consume do you mean the downstream system?
My downstream is TSDB and other DBs where the data will be written to. All
these is time-series data.

Thanks,
Hemant



On Tue, May 12, 2020 at 5:28 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hello Hemant,
>
> Thanks for your reply.
>
> I think partitioning you described (event type + protocol type) is subject
> to data skew. Including a device ID should solve this problem.
> Also, including "protocol_type" into the key and having topic per
> protocol_type seems redundant.
>
> Furthermore, do you have any particular reason to maintain multiple
> topics?
> I could imagine protocols have different speeds or other characteristics,
> so you can tune Flink accordingly.
> Otherwise, having a single topic partitioned only by device ID would
> simplify deployment and reduce data skew.
>
> > By consume do you mean the downstream system?
> Yes.
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 11:30 PM hemant singh 
> wrote:
>
>> Hello Roman,
>>
>> PFB my response -
>>
>> As I understand, each protocol has a distinct set of event types (where
>> event type == metrics type); and a distinct set of devices. Is this correct?
>> Yes, correct. distinct events and devices. Each device emits these event.
>>
>> > Based on data protocol I have 4-5 topics. Currently the data for a
>> single event is being pushed to a partition of the kafka topic(producer key
>> -> event_type + data_protocol).
>> Here you are talking about the source (to Flink job), right?
>> Yes, you are right.
>>
>> Can you also share how are you going to consume these data?
>> By consume do you mean the downstream system?
>> If yes then this data will be written to a DB, some metrics goes to
>> TSDB(Influx) as well.
>>
>> Thanks,
>> Hemant
>>
>> On Tue, May 12, 2020 at 2:08 AM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Hemant,
>>>
>>> As I understand, each protocol has a distinct set of event types (where
>>> event type == metrics type); and a distinct set of devices. Is this correct?
>>>
>>> > Based on data protocol I have 4-5 topics. Currently the data for a
>>> single event is being pushed to a partition of the kafka topic(producer key
>>> -> event_type + data_protocol).
>>> Here you are talking about the source (to Flink job), right?
>>>
>>> Can you also share how are you going to consume these data?
>>>
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, May 11, 2020 at 8:57 PM hemant singh 
>>> wrote:
>>>
 Hi,

 I have different events from a device which constitutes different
 metrics for same device. Each of these event is produced by the device in
 interval of few milli seconds to a minute.

 Event1(Device1) -> Stream1 -> Metric 1
 Event2 (Device1) -> Stream2 -> Metric 2 ...
 ..
 ...
 Event100(Device1) -> Stream100 -> Metric100

 The number of events can go up to few 100s for each data protocol and
 we have around 4-5 data protocols. Metrics from different streams makes up
 a records
 like for example from above example for device 1 -

 Device1 -> Metric1, Metric 2, Metric15 forms a single record for the
 device. Currently in development phase I am using interval join to achieve
 this, that is to create a record with latest data from different
 streams(events).

 Based on data protocol I have 4-5 topics. Currently the data for a
 single event is being pushed to a partition of the kafka topic(producer key
 -> event_type + data_protocol). So essentially one topic is made up of many
 streams. I am filtering on the key to define the streams.

 My question is - Is this correct way to stream the data, I had thought
 of maintaining different topic for an event, however in that case number of
 topics could go 

Re: TableConfig TTL and Watermark TTL

2020-05-12 Thread lec ssmi
Thank you for the clarification.

Benchao Li  于 2020年5月12日周二 20:48写道:

> Yes, you are right.
>
> If you add time constraints, it will be translated
> [Proc/Row]TimeBoundedStreamJoin,
> If not, it will be translated into StreamingJoinOperator.
> They are totally different two operators.
>
> lec ssmi  于2020年5月12日周二 下午8:43写道:
>
>> Then if I don't write time constraints,
>> will it expire with the TTL time configured by TableConfig?
>> Benchao Li  于 2020年5月12日周二 20:27写道:
>>
>>> The state will be cleaned with watermark movement.
>>>
>>> lec ssmi  于2020年5月12日周二 下午5:55写道:
>>>
 Hi:
   If I  join two streams in SQL, the time range is used as a condition,
 similar to the time interval join in DataStream.  So, will this join state
 expire as the watermark moves, or will it expire with the TTL time
 configured by TableConfig? Or both?

  Best
 Lec Ssmi

>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: TableConfig TTL and Watermark TTL

2020-05-12 Thread Benchao Li
Yes, you are right.

If you add time constraints, it will be translated
[Proc/Row]TimeBoundedStreamJoin,
If not, it will be translated into StreamingJoinOperator.
They are totally different two operators.

lec ssmi  于2020年5月12日周二 下午8:43写道:

> Then if I don't write time constraints,
> will it expire with the TTL time configured by TableConfig?
> Benchao Li  于 2020年5月12日周二 20:27写道:
>
>> The state will be cleaned with watermark movement.
>>
>> lec ssmi  于2020年5月12日周二 下午5:55写道:
>>
>>> Hi:
>>>   If I  join two streams in SQL, the time range is used as a condition,
>>> similar to the time interval join in DataStream.  So, will this join state
>>> expire as the watermark moves, or will it expire with the TTL time
>>> configured by TableConfig? Or both?
>>>
>>>  Best
>>> Lec Ssmi
>>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: TableConfig TTL and Watermark TTL

2020-05-12 Thread lec ssmi
Then if I don't write time constraints,
will it expire with the TTL time configured by TableConfig?
Benchao Li  于 2020年5月12日周二 20:27写道:

> The state will be cleaned with watermark movement.
>
> lec ssmi  于2020年5月12日周二 下午5:55写道:
>
>> Hi:
>>   If I  join two streams in SQL, the time range is used as a condition,
>> similar to the time interval join in DataStream.  So, will this join state
>> expire as the watermark moves, or will it expire with the TTL time
>> configured by TableConfig? Or both?
>>
>>  Best
>> Lec Ssmi
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: TableConfig TTL and Watermark TTL

2020-05-12 Thread Benchao Li
The state will be cleaned with watermark movement.

lec ssmi  于2020年5月12日周二 下午5:55写道:

> Hi:
>   If I  join two streams in SQL, the time range is used as a condition,
> similar to the time interval join in DataStream.  So, will this join state
> expire as the watermark moves, or will it expire with the TTL time
> configured by TableConfig? Or both?
>
>  Best
> Lec Ssmi
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Need suggestion on Flink-Kafka stream processing design

2020-05-12 Thread Khachatryan Roman
Hello Hemant,

Thanks for your reply.

I think partitioning you described (event type + protocol type) is subject
to data skew. Including a device ID should solve this problem.
Also, including "protocol_type" into the key and having topic per
protocol_type seems redundant.

Furthermore, do you have any particular reason to maintain multiple topics?
I could imagine protocols have different speeds or other characteristics,
so you can tune Flink accordingly.
Otherwise, having a single topic partitioned only by device ID would
simplify deployment and reduce data skew.

> By consume do you mean the downstream system?
Yes.

Regards,
Roman


On Mon, May 11, 2020 at 11:30 PM hemant singh  wrote:

> Hello Roman,
>
> PFB my response -
>
> As I understand, each protocol has a distinct set of event types (where
> event type == metrics type); and a distinct set of devices. Is this correct?
> Yes, correct. distinct events and devices. Each device emits these event.
>
> > Based on data protocol I have 4-5 topics. Currently the data for a
> single event is being pushed to a partition of the kafka topic(producer key
> -> event_type + data_protocol).
> Here you are talking about the source (to Flink job), right?
> Yes, you are right.
>
> Can you also share how are you going to consume these data?
> By consume do you mean the downstream system?
> If yes then this data will be written to a DB, some metrics goes to
> TSDB(Influx) as well.
>
> Thanks,
> Hemant
>
> On Tue, May 12, 2020 at 2:08 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Hemant,
>>
>> As I understand, each protocol has a distinct set of event types (where
>> event type == metrics type); and a distinct set of devices. Is this correct?
>>
>> > Based on data protocol I have 4-5 topics. Currently the data for a
>> single event is being pushed to a partition of the kafka topic(producer key
>> -> event_type + data_protocol).
>> Here you are talking about the source (to Flink job), right?
>>
>> Can you also share how are you going to consume these data?
>>
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, May 11, 2020 at 8:57 PM hemant singh 
>> wrote:
>>
>>> Hi,
>>>
>>> I have different events from a device which constitutes different
>>> metrics for same device. Each of these event is produced by the device in
>>> interval of few milli seconds to a minute.
>>>
>>> Event1(Device1) -> Stream1 -> Metric 1
>>> Event2 (Device1) -> Stream2 -> Metric 2 ...
>>> ..
>>> ...
>>> Event100(Device1) -> Stream100 -> Metric100
>>>
>>> The number of events can go up to few 100s for each data protocol and we
>>> have around 4-5 data protocols. Metrics from different streams makes up a
>>> records
>>> like for example from above example for device 1 -
>>>
>>> Device1 -> Metric1, Metric 2, Metric15 forms a single record for the
>>> device. Currently in development phase I am using interval join to achieve
>>> this, that is to create a record with latest data from different
>>> streams(events).
>>>
>>> Based on data protocol I have 4-5 topics. Currently the data for a
>>> single event is being pushed to a partition of the kafka topic(producer key
>>> -> event_type + data_protocol). So essentially one topic is made up of many
>>> streams. I am filtering on the key to define the streams.
>>>
>>> My question is - Is this correct way to stream the data, I had thought
>>> of maintaining different topic for an event, however in that case number of
>>> topics could go to few thousands and that is something which becomes little
>>> challenging to maintain and not sure if kafka handles that well.
>>>
>>> I know there are traditional ways to do this like pushing it to
>>> timeseries db and then joining data for different metric but that is
>>> something which will never scale, also this processing should be as
>>> realtime as possible.
>>>
>>> Are there better ways to handle this use case or I am on correct path.
>>>
>>> Thanks,
>>> Hemant
>>>
>>


Re: Flink SQL join usecase

2020-05-12 Thread Benchao Li
Yes. Flink SQL supports this syntax.

shadowell  于2020年5月12日周二 下午3:25写道:

> Hi,
>
> I am new to Flink SQL, I want to know whether Flink SQL(Flink-1.10)
> supports the following join syntax:
>
>  ```
>select a.id, a.col_1, b.col_1, c.col_1 from topic_a a
>  inner join topic_b b on a.id = b.id
>  left join topic_c c on a.id = c.id and a.col_1 = c.col_1 and
> b.col_1 = c.col_1;
>  ```
>
> Best Regards,
> Jie Feng
>
> Jie Feng
> shadow...@126.com
>
> 
> 签名由 网易邮箱大师  定制
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink BLOB server port exposed externally

2020-05-12 Thread Arvid Heise
Hi Omar,

wouldn't it be possible to just create an iptable rule that allows access
to 1098 only from localhost? I don't think you can open a socket just for
localhost programmatically (at least not from Java).

Best,

Arvid

On Tue, May 12, 2020 at 12:51 PM Omar Gawi  wrote:

> Hi All,
>
> I have Apache Flink running as part of our java program , on a linux
> machine.
> The Flink runs on thread(s) within the same java process.
> I see that the machine has the BLOB server port 1098 exposed to the
> outside :
>
> davc@sdavc:~$ netstat -anp | grep LISTEN
>
> (Not all processes could be identified, non-owned process info
>
> will not be shown, you would have to be root to see it all.)
>
> tcp0  0 0.0.0.0:22  0.0.0.0:*
> LISTEN  -
>
> tcp0  0 127.0.0.1:5432  0.0.0.0:*
> LISTEN  311/postgres
>
> tcp6   0  0 :::8080 :::*
> LISTEN  -
>
> tcp6   0  0 :::21   :::*
> LISTEN  -
>
> tcp6   0  0 :::22   :::*
> LISTEN  -
>
> tcp6   0  0 ::1:5432:::*
> LISTEN  311/postgres
>
> tcp6   0  0 :::8443 :::*
> LISTEN  -
> *tcp6   0  0 :::1098 :::*
> LISTEN  -*
>
>
> This bring to our team security concerns , when other external user/system
> open connection (for telnet or other protocols) to this port
> (accidentally or not), we get below error in the java app log:
>
> 2020-04-23 07:54:58 ERROR BlobServerConnection:131 - Error while executing
> BLOB connection.
>
> java.io.IOException: Unknown operation 3
>
>at
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:122)
>
>
> My question if is there a way to avoid exposing this port  to the outside,
> and keep it available only for it's original purpose : serving the
> localhost/127.0.0.1 requests which come from the flink engine.
>
>
> Thank you and stay safe.
>
> Omar
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Broadcast state vs data enrichment

2020-05-12 Thread Khachatryan Roman
Thanks for the clarification.

Apparently, the second option (with enricher) creates more load by adding
configuration to every event. Unless events are much bigger than the
configuration, this will significantly increase network, memory, and CPU
usage.
Btw, I think you don't need a broadcast in the 2nd option, since the
interested subtask will receive the configuration anyways.

Regards,
Roman


On Tue, May 12, 2020 at 5:57 AM Manas Kale  wrote:

> Sure. Apologies for not making this clear enough.
>
> > each operator only stores what it needs.
> Lets imagine this setup :
>
> BROADCAST STREAM
> config-stream 
> 
> |   | 
>  |
> event-stream--> operator1--> operator2-> 
> operator3
>
>
> In this scenario, all 3 operators will be BroadcastProcessFunctions. Each
> of them will receive the whole config message in their
> processBroadcastElement method, but each one will only store what it
> needs in their state store. So even though operator1 will receive
>  config = {
> "config1" : 1,
> "config2" : 2,
> "config3" : 3
> }
> it will only store config1.
>
> > each downstream operator will "strip off" the config parameter that it
> needs.
>
> BROADCAST STREAM
> config-stream -
>   |
> event-stream-->  enricher --> 
> operator1--> operator2-> operator3
>
> In this case, the enricher operator will store the whole config message.
> When an event message arrives, this operator will append config1, config2
> and config3 to it. Operator 1 will extract and use config1, and output a
> message that has config1 stripped off.
>
> I hope that helps!
>
> Perhaps I am being too pedantic but I would like to know if these two
> methods have comparable performance differences and if so which one would
> be preferred.
>
>
>
>
> On Mon, May 11, 2020 at 11:46 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Manas,
>>
>> The approaches you described looks the same:
>> > each operator only stores what it needs.
>> > each downstream operator will "strip off" the config parameter that it
>> needs.
>>
>> Can you please explain the difference?
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, May 11, 2020 at 8:07 AM Manas Kale  wrote:
>>
>>> Hi,
>>> I have a single broadcast message that contains configuration data
>>> consumed by different operators. For eg:
>>> config = {
>>> "config1" : 1,
>>> "config2" : 2,
>>> "config3" : 3
>>> }
>>>
>>> Operator 1 will consume config1 only, operator 2 will consume config2
>>> only etc.
>>>
>>>
>>>- Right now in my implementation the config message gets broadcast
>>>over operators 1,2,3 and each operator only stores what it needs.
>>>
>>>
>>>- A different approach would be to broadcast the config message to a
>>>single root operator. This will then enrich event data flowing through it
>>>with config1,config2 and config3 and each downstream operator will "strip
>>>off" the config parameter that it needs.
>>>
>>>
>>> *I was wondering which approach would be the best to go with performance
>>> wise. *I don't really have the time to implement both and compare, so
>>> perhaps someone here already knows if one approach is better or both
>>> provide similar performance.
>>>
>>> FWIW, the config stream is very sporadic compared to the event stream.
>>>
>>> Thank you,
>>> Manas Kale
>>>
>>>
>>>
>>>


Re: Tumbling window per key

2020-05-12 Thread Arvid Heise
Hi Navneeth,

Your understanding is correct.

In the image, all windows across the keys for the same timespan are grouped
together, which make sense from a logical perspective as you would talk
about the first, second, ... window.

But technically, there are 15 small windows involved instead of the
depicted 5 windows. I guess, for you, the technical representation would be
easier to understand.

For session windows, for example, only the technical view makes sense. So
you could also look at examples of these windows if it's easier for you.

On Tue, May 12, 2020 at 10:05 AM Navneeth Krishnan 
wrote:

> Hi All,
>
> I was looking at the documentation for windows and got a little confused.
> As per my understanding tumbling window per key will create a non
> overlapping window based on when the data for that key arrived. For example
> consider a tumbling window of 30 seconds
> user1 - 10:01:01
> user2 - 10:01:02
> user1 - 10:01:05
> user2 - 10:01:06
> user2 - 10:01:08
>
> Result:
> user1 (10:01:01 & 10:01:05)
> user2 (10:01:02 & 10:01:06)
> user2 (10:01:08)
>
> Is this the right understanding?
>
> But as per the below image in docs, it looks like the window is not per
> key, please correct me if i'm wrong.
> [image: image.png]
>
> Thanks
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Flink BLOB server port exposed externally

2020-05-12 Thread Omar Gawi
Hi All,

I have Apache Flink running as part of our java program , on a linux
machine.
The Flink runs on thread(s) within the same java process.
I see that the machine has the BLOB server port 1098 exposed to the outside
:

davc@sdavc:~$ netstat -anp | grep LISTEN

(Not all processes could be identified, non-owned process info

will not be shown, you would have to be root to see it all.)

tcp0  0 0.0.0.0:22  0.0.0.0:*
LISTEN  -

tcp0  0 127.0.0.1:5432  0.0.0.0:*
LISTEN  311/postgres

tcp6   0  0 :::8080 :::*
LISTEN  -

tcp6   0  0 :::21   :::*
LISTEN  -

tcp6   0  0 :::22   :::*
LISTEN  -

tcp6   0  0 ::1:5432:::*
LISTEN  311/postgres

tcp6   0  0 :::8443 :::*
LISTEN  -
*tcp6   0  0 :::1098 :::*
LISTEN  -*


This bring to our team security concerns , when other external user/system
open connection (for telnet or other protocols) to this port
(accidentally or not), we get below error in the java app log:

2020-04-23 07:54:58 ERROR BlobServerConnection:131 - Error while executing
BLOB connection.

java.io.IOException: Unknown operation 3

   at
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:122)


My question if is there a way to avoid exposing this port  to the outside,
and keep it available only for it's original purpose : serving the
localhost/127.0.0.1 requests which come from the flink engine.


Thank you and stay safe.

Omar


Prometheus Pushgateway 监控 Flink 问题

2020-05-12 Thread 李佳宸
hi,大家好

我在使用Prometheus Pushgateway 监控
Flink时,metrics.reporter.promgateway.deleteOnShutdown:
true 这一配置失效,
Flink集群关闭时,pushgateway中仍然存有metrics数据
reporter相关的全部配置为:

metrics.reporter.promgateway.class:
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter

metrics.reporter.promgateway.host: localhost

metrics.reporter.promgateway.port: 9091

metrics.reporter.promgateway.jobName: myJob

metrics.reporter.promgateway.randomJobNameSuffix: *true*

metrics.reporter.promgateway.deleteOnShutdown: *true*


Flink版本为1.9.1, pushgateway版本0.9 和1.2都尝试过,一样的问题。


不知道这是不是bug,

有谁有成功的案例么?

谢谢


TableConfig TTL and Watermark TTL

2020-05-12 Thread lec ssmi
Hi:
  If I  join two streams in SQL, the time range is used as a condition,
similar to the time interval join in DataStream.  So, will this join state
expire as the watermark moves, or will it expire with the TTL time
configured by TableConfig? Or both?

 Best
Lec Ssmi


Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Gary Yao
Hi Nick,

Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you can use
KafkaSerializationSchema to produce a ProducerRecord [1][2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-11693
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html

On Mon, May 11, 2020 at 10:59 PM Nick Bendtner  wrote:
>
> Hi guys,
> I use 1.8.0 version for flink-connector-kafka. Do you have any 
> recommendations on how to produce a ProducerRecord from a kafka sink. Looking 
> to add support to kafka headers therefore thinking about ProducerRecord. If 
> you have any thoughts its highly appreciated.
>
> Best,
> Nick.


回复:Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-12 Thread Yun Gao
Hi Peter,

Sorry for missing the question and response later, I'm currently sworking 
together with Jingsong on the issue to support "global committing" (like 
writing _SUCCESS file or adding partitions to hive store) after buckets 
terminated. In 1.11 we may first support watermark/time related buckets in 
Table/SQL API, and we are also thinking of supporting "global committing" for 
arbitrary bucket assigner policy for StreamingFileSink users. The current rough 
thought is to let users specify when a bucket is terminated on a single task, 
and the OperatorCoordinator[1] of the sink will aggreate the information from 
all subtasks about this bucket and do the global committing if the bucket has 
been finished on all the subtasks, but this is still under thinking and 
discussion. Any thoughts or requirements on this issue are warmly welcome. 

Best,
 Yun


[1] OperatorCoordinator is introduced in FLIP-27: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface.
 This is a component resides in JobManager and could communicate with all the 
subtasks of the corresponding operator, thus it could be used to aggregate 
status from subtasks. 


 --原始邮件 --
发件人:Robert Metzger 
发送时间:Tue May 12 15:36:26 2020
收件人:Jingsong Li 
抄送:Peter Groesbeck , user 
主题:Re: Writing _SUCCESS Files (Streaming and Batch)

Hi Peter,

I filed a ticket for this feature request: 
https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your 
thoughts / requirements to the ticket)

Best,
Robert


On Wed, May 6, 2020 at 3:41 AM Jingsong Li  wrote:

Hi Peter,

The troublesome is how to know the "ending" for a bucket in streaming job.
In 1.11, we are trying to implement a watermark-related bucket ending 
mechanism[1] in Table/SQL.

[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee
On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck  
wrote:

I am replacing an M/R job with a Streaming job using the StreamingFileSink and 
there is a requirement to generate an empty _SUCCESS file like the old Hadoop 
job. I have to implement a similar Batch job to read from backup files in case 
of outages or downtime.

The Batch job question was answered here and appears to be still relevant 
although if someone could confirm for me that would be great.
https://stackoverflow.com/a/39413810

The question of the Streaming job came up back in 2018 here:
http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3cff74eed5-602f-4eaa-9bc1-6cdf56611...@gmail.com%3E

But the solution to use or extend the BucketingSink class seems out of date now 
that BucketingSink has been deprecated.

Is there a way to implement a similar solution for StreamingFileSink?

I'm currently on 1.8.1 although I hope to update to 1.10 in the near future.

Thank you,
Peter

-- 
Best, Jingsong Lee

Tumbling window per key

2020-05-12 Thread Navneeth Krishnan
Hi All,

I was looking at the documentation for windows and got a little confused.
As per my understanding tumbling window per key will create a non
overlapping window based on when the data for that key arrived. For example
consider a tumbling window of 30 seconds
user1 - 10:01:01
user2 - 10:01:02
user1 - 10:01:05
user2 - 10:01:06
user2 - 10:01:08

Result:
user1 (10:01:01 & 10:01:05)
user2 (10:01:02 & 10:01:06)
user2 (10:01:08)

Is this the right understanding?

But as per the below image in docs, it looks like the window is not per
key, please correct me if i'm wrong.
[image: image.png]

Thanks


Re: Not able to implement an usecase

2020-05-12 Thread Jingsong Li
Thanks Roman for involving me.

Hi Jaswin,

FLIP-115[1] will finish Kafka -> Hive/Filesystem. And will be released in
1.11.

We will provide two connectors in table:
- file system connector, this connector manage partitions and files by file
system paths. You can define a file system table with parquet/orc format,
this should be consistent with hive exclude hive metastore support.
- hive connector, this connector manage partitions and files by hive
metastore, support automatic adding partition to hive metastore.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee

On Tue, May 12, 2020 at 3:52 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> AFAIK, yes, you can write streams.
>
> I'm pulling in Jingsong Li and Rui Li as they might know better.
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 10:21 PM Jaswin Shah 
> wrote:
>
>> If I go with table apis, can I write the streams to hive or it is only
>> for batch processing as of now.
>>
>> Get Outlook for Android 
>>
>> --
>> *From:* Khachatryan Roman 
>> *Sent:* Tuesday, May 12, 2020 1:49:10 AM
>> *To:* Jaswin Shah 
>> *Cc:* user@flink.apache.org 
>> *Subject:* Re: Not able to implement an usecase
>>
>> Hi Jaswin,
>>
>> Currently, DataStream API doesn't support outer joins.
>> As a workaround, you can use coGroup function [1].
>>
>> Hive is also not supported by DataStream API though it's supported by
>> Table API [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, May 11, 2020 at 6:03 PM Jaswin Shah 
>> wrote:
>>
>> Hi,
>> I want to implement the below use case in my application:
>> I am doing an interval join between two data streams and then, in process
>> function catching up the discrepant results on joining. Joining is done on
>> key orderId. Now, I want to identify all the messages in both datastreams
>> which are not joined. Means, for a message in left stream if I do not
>> find any message in right stream over the interval defined, then, that
>> message should be caught and same for right stream if there are messages
>> which do not have corresponding messages in left streams then, catch
>> them.Need an help how can I achieve the use case. I know this can be
>> done with outer join but interval join or tumbling event time window joins
>> only support inner join as per my knowledge. I do not want to use table/sql
>> api here but want to work on this datastream apis only.
>>
>> Currently I am using this which is working for 90 % of the cases but 10 %
>> of the cases where large large delay can happen and messages in left or
>> right streams are missing are not getting supported with my this
>> implementaions:
>>
>> /**
>>  * Join cart and pg streams on mid and orderId, and the interval specified.
>>  *
>>  * @param leftStream
>>  * @param rightStream
>>  * @return
>>  */
>> public SingleOutputStreamOperator 
>> intervalJoinCartAndPGStreams(DataStream leftStream, 
>> DataStream rightStream, ParameterTool parameter) {
>> //Descripant results are sent to kafka from CartPGProcessFunction.
>> return leftStream
>> .keyBy(new CartJoinColumnsSelector())
>> .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
>> 
>> .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))),
>>  
>> Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND
>> .process(new CartPGProcessFunction());
>>
>> }
>>
>>
>>
>> Secondly, I am unable to find the streaming support to stream out the
>> datastreams I am reading from kafka to hive which I want to batch process
>> with Flink
>>
>> Please help me on resolving this use cases.
>>
>> Thanks,
>> Jaswin
>>
>>
>> Get Outlook for Android 
>>
>>

-- 
Best, Jingsong Lee


Re: Not able to implement an usecase

2020-05-12 Thread Rui Li
The hive table sink is only for batch processing in Flink 1.10. There're
some on-going efforts to support writing streams to hive and we intend to
make it available in 1.11. Stay tuned :)

On Tue, May 12, 2020 at 3:52 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> AFAIK, yes, you can write streams.
>
> I'm pulling in Jingsong Li and Rui Li as they might know better.
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 10:21 PM Jaswin Shah 
> wrote:
>
>> If I go with table apis, can I write the streams to hive or it is only
>> for batch processing as of now.
>>
>> Get Outlook for Android 
>>
>> --
>> *From:* Khachatryan Roman 
>> *Sent:* Tuesday, May 12, 2020 1:49:10 AM
>> *To:* Jaswin Shah 
>> *Cc:* user@flink.apache.org 
>> *Subject:* Re: Not able to implement an usecase
>>
>> Hi Jaswin,
>>
>> Currently, DataStream API doesn't support outer joins.
>> As a workaround, you can use coGroup function [1].
>>
>> Hive is also not supported by DataStream API though it's supported by
>> Table API [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, May 11, 2020 at 6:03 PM Jaswin Shah 
>> wrote:
>>
>> Hi,
>> I want to implement the below use case in my application:
>> I am doing an interval join between two data streams and then, in process
>> function catching up the discrepant results on joining. Joining is done on
>> key orderId. Now, I want to identify all the messages in both datastreams
>> which are not joined. Means, for a message in left stream if I do not
>> find any message in right stream over the interval defined, then, that
>> message should be caught and same for right stream if there are messages
>> which do not have corresponding messages in left streams then, catch
>> them.Need an help how can I achieve the use case. I know this can be
>> done with outer join but interval join or tumbling event time window joins
>> only support inner join as per my knowledge. I do not want to use table/sql
>> api here but want to work on this datastream apis only.
>>
>> Currently I am using this which is working for 90 % of the cases but 10 %
>> of the cases where large large delay can happen and messages in left or
>> right streams are missing are not getting supported with my this
>> implementaions:
>>
>> /**
>>  * Join cart and pg streams on mid and orderId, and the interval specified.
>>  *
>>  * @param leftStream
>>  * @param rightStream
>>  * @return
>>  */
>> public SingleOutputStreamOperator 
>> intervalJoinCartAndPGStreams(DataStream leftStream, 
>> DataStream rightStream, ParameterTool parameter) {
>> //Descripant results are sent to kafka from CartPGProcessFunction.
>> return leftStream
>> .keyBy(new CartJoinColumnsSelector())
>> .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
>> 
>> .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))),
>>  
>> Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND
>> .process(new CartPGProcessFunction());
>>
>> }
>>
>>
>>
>> Secondly, I am unable to find the streaming support to stream out the
>> datastreams I am reading from kafka to hive which I want to batch process
>> with Flink
>>
>> Please help me on resolving this use cases.
>>
>> Thanks,
>> Jaswin
>>
>>
>> Get Outlook for Android 
>>
>>

-- 
Cheers,
Rui Li


Re: Not able to implement an usecase

2020-05-12 Thread Khachatryan Roman
AFAIK, yes, you can write streams.

I'm pulling in Jingsong Li and Rui Li as they might know better.

Regards,
Roman


On Mon, May 11, 2020 at 10:21 PM Jaswin Shah 
wrote:

> If I go with table apis, can I write the streams to hive or it is only for
> batch processing as of now.
>
> Get Outlook for Android 
>
> --
> *From:* Khachatryan Roman 
> *Sent:* Tuesday, May 12, 2020 1:49:10 AM
> *To:* Jaswin Shah 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Not able to implement an usecase
>
> Hi Jaswin,
>
> Currently, DataStream API doesn't support outer joins.
> As a workaround, you can use coGroup function [1].
>
> Hive is also not supported by DataStream API though it's supported by
> Table API [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 6:03 PM Jaswin Shah 
> wrote:
>
> Hi,
> I want to implement the below use case in my application:
> I am doing an interval join between two data streams and then, in process
> function catching up the discrepant results on joining. Joining is done on
> key orderId. Now, I want to identify all the messages in both datastreams
> which are not joined. Means, for a message in left stream if I do not
> find any message in right stream over the interval defined, then, that
> message should be caught and same for right stream if there are messages
> which do not have corresponding messages in left streams then, catch
> them.Need an help how can I achieve the use case. I know this can be done
> with outer join but interval join or tumbling event time window joins only
> support inner join as per my knowledge. I do not want to use table/sql api
> here but want to work on this datastream apis only.
>
> Currently I am using this which is working for 90 % of the cases but 10 %
> of the cases where large large delay can happen and messages in left or
> right streams are missing are not getting supported with my this
> implementaions:
>
> /**
>  * Join cart and pg streams on mid and orderId, and the interval specified.
>  *
>  * @param leftStream
>  * @param rightStream
>  * @return
>  */
> public SingleOutputStreamOperator 
> intervalJoinCartAndPGStreams(DataStream leftStream, 
> DataStream rightStream, ParameterTool parameter) {
> //Descripant results are sent to kafka from CartPGProcessFunction.
> return leftStream
> .keyBy(new CartJoinColumnsSelector())
> .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
> 
> .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))),
>  
> Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND
> .process(new CartPGProcessFunction());
>
> }
>
>
>
> Secondly, I am unable to find the streaming support to stream out the
> datastreams I am reading from kafka to hive which I want to batch process
> with Flink
>
> Please help me on resolving this use cases.
>
> Thanks,
> Jaswin
>
>
> Get Outlook for Android 
>
>


回复: flink on kubernetes 作业卡主现象咨询

2020-05-12 Thread shao.hongxiao
兄弟 请问你的问题解决了吗?怎么解决的,谢谢


| |
邵红晓
|
|
邮箱:17611022...@163.com
|
签名由网易邮箱大师定制
在2020年5月8日 10:08,LakeShen 写道:
Hi ,

你可以看下你的内存配置情况,看看是不是内存配置太小,导致 networkd bufffers 不够。

具体文档参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html

Best,
LakeShen

a511955993  于2020年5月7日周四 下午9:54写道:

hi, all


集群信息:
flink版本是1.10,部署在kubernetes上,kubernetes版本为1.17.4,docker版本为19.03,
cni使用的是weave。


现象:
作业运行的时候,偶发会出现operation卡住,下游收不到数据,水位线无法更新,反压上游,作业在一段时间会被kill掉的情况。


通过jstack出来的堆栈信息片段如下:


"Map (152/200)" #155 prio=5 os_prio=0 tid=0x7f67a4076800 nid=0x31f
waiting on condition [0x7f66b04ed000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x000608f3c600> (a
java.util.concurrent.CompletableFuture$Signaller)
at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.runtime.io
.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
at org.apache.flink.runtime.io
.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
at org.apache.flink.runtime.io
.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
at org.apache.flink.runtime.io
.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
at org.apache.flink.runtime.io
.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
at org.apache.flink.runtime.io
.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
at org.apache.flink.runtime.io
.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)




有怀疑过是虚拟化网络问题,增加了如下参数,不见效:
taskmanager.network.request-backoff.max: 30
akka.ask.timeout: 120s
akka.watch.heartbeat.interval: 10s


尝试过调整buffer数量,不见效:
taskmanager.network.memory.floating-buffers-per-gate: 16
taskmanager.network.memory.buffers-per-channel: 6




目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,非常感谢。

Looking forward to your reply and help.

Best






Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-12 Thread Robert Metzger
Hi Peter,

I filed a ticket for this feature request:
https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your
thoughts / requirements to the ticket)

Best,
Robert


On Wed, May 6, 2020 at 3:41 AM Jingsong Li  wrote:

> Hi Peter,
>
> The troublesome is how to know the "ending" for a bucket in streaming job.
> In 1.11, we are trying to implement a watermark-related bucket ending
> mechanism[1] in Table/SQL.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
> Best,
> Jingsong Lee
>
> On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck 
> wrote:
>
>> I am replacing an M/R job with a Streaming job using the
>> StreamingFileSink and there is a requirement to generate an empty _SUCCESS
>> file like the old Hadoop job. I have to implement a similar Batch job to
>> read from backup files in case of outages or downtime.
>>
>> The Batch job question was answered here and appears to be still relevant
>> although if someone could confirm for me that would be great.
>> https://stackoverflow.com/a/39413810
>>
>> The question of the Streaming job came up back in 2018 here:
>>
>> http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3cff74eed5-602f-4eaa-9bc1-6cdf56611...@gmail.com%3E
>>
>> But the solution to use or extend the BucketingSink class seems out of
>> date now that BucketingSink has been deprecated.
>>
>> Is there a way to implement a similar solution for StreamingFileSink?
>>
>> I'm currently on 1.8.1 although I hope to update to 1.10 in the near
>> future.
>>
>> Thank you,
>> Peter
>>
>
>
> --
> Best, Jingsong Lee
>


Flink SQL join usecase

2020-05-12 Thread shadowell
Hi,


I am new to Flink SQL, I want to know whether Flink SQL(Flink-1.10) supports 
the following join syntax:


 ``` 
   select a.id, a.col_1, b.col_1, c.col_1 from topic_a a 
 inner join topic_b b on a.id = b.id 
 left join topic_c c on a.id = c.id and a.col_1 = c.col_1 and b.col_1 = 
c.col_1;
 ```


Best Regards,
Jie Feng


| |
Jie Feng
|
|
shadow...@126.com
|
签名由网易邮箱大师定制