Re: Use different functions for different signal values

2019-04-02 Thread Hequn Cheng
Hi Marke,

Ken is right. We can use split and select to achieve what you want.

Besides, I am thinking if there is a lot of ingesting signals with unique
Id's, why not use one function and process different logic in the function.
For example, we can call different methods in the function according to the
value of the input "id field".  This can make the code more simple and
generic, IMO.
Is it because the return types of these functions are different?

Best, Hequn

On Wed, Apr 3, 2019 at 7:17 AM Ken Krugler 
wrote:

> Hi Marke,
>
> You can use DataStream.split() to create a SplitStream, and then call
> SplitStream.select() to create the three different paths to the three
> functions.
>
> See
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#datastream-transformations
>
> — Ken
>
> On Apr 2, 2019, at 8:41 AM, Marke Builder  wrote:
>
> Hi,
>
> I want to implement the following behavior:
>
> 
> There are a lot of ingest signals with unique Id's, I would use for each
> signal set a special function. E.g. Signal1, Signal2 ==> function1,
> Signal3, Signal4 ==> function2.
> What is the recommended way to implement this pattern?
>
> Thanks!
>
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


请教大佬们,blink提交yarn集群的问题

2019-04-02 Thread 苏 欣
我在fink-conf.yaml文件中配置了principal和keytab,可以提交到带有kerberos认证的yarn集群中,现在我有两个问题:
1.同一客户机切换到不同的yarn集群时,提交作业之前需要改变HADOOP_CONF_DIR,krb5.conf和fink-conf.yaml的配置。这样做有点不太方便,也不太好处理同时提交的问题。
blink目前能否通过提交命令传参的方式来切换票据,或者有没有什么使用上的建议呢?
2.我看到文档上说,缓存票据目前只支持在yarn上的独立集群,这句话的意思是指目前缓存票据只能用在flink yarn session模式中吗?

发送自 Windows 10 版邮件应用



?????? ??????????????Flink????

2019-04-02 Thread ????


??lib??(??:Flink??Kafka)Flink??-C??jar??jarJobGraph??JobManager
 

StreamExecutionEnvironment.getExecutionEnvironment().getStreamGraph().getJobGraph()??JobGraphJobGraph??JobManager
   ??




--  --
??: "Yuan Yifan";
: 2019??4??2??(??) 3:24
??: "user-zh";

: Re:?? ??Flink



??env.??JAR??








?? 2019-04-02 14:39:45??"" <1010467...@qq.com> ??
>
>   
> gitjenkinsjar??shell??JobManager??JobGraph??JobGraphJobManagerjar??
>??
>
>
>
>
>--  --
>??: ""<1010467...@qq.com>;
>: 2019??3??29??(??) 2:19
>??: "user-zh";
>
>: ?? ??Flink
>
>
>
>
>
>
>
>
>--  --
>??: "Lifei Chen";
>: 2019??3??29??(??) 11:10
>??: "user-zh";
>
>: Re: ??Flink
>
>
>
>go cli, jarflink manager
>
>https://github.com/ing-bank/flink-deployer
>
>??
>
>Kaibo Zhou  ??2019??3??29?? 11:08??
>
>> ?? flink ?? Restful API ??upload  jar ?? run??
>>
>> ??
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-upload
>> ?? https://files.alicdn.com/tpsservice/a8d224d6a3b8b82d03aa84e370c008cc.pdf
>> ??
>>
>>  <1010467...@qq.com> ??2019??3??28?? 9:06??
>>
>> > 
>> >
>> >
>> Flink???(Flinkjarweb??)
>>

答复: 回复: 方案询问

2019-04-02 Thread 戴嘉诚
这样写好复杂。弊端和性能方面具体就不清楚,但肯定是可比MapState弱一点的

写个简单的MapState demo吧,如下:
env
.addSource(flinkKafkaConsumer)
.process(new ProcessFunction() {
  private static final long serialVersionUID = -8357959184126038977L;

  private MapState accumulateState;

  @Override
  public void open(Configuration parameters) throws Exception {
MapStateDescriptor accumulateStateDescriptor =
new MapStateDescriptor<>(
"map_state",
StringSerializer.INSTANCE,
StringSerializer.INSTANCE);
accumulateState = 
getRuntimeContext().getMapState(accumulateStateDescriptor);
  }

  @Override
  public void processElement(String value, Context ctx, Collector 
out)
  throws Exception {
  String key = null;
if (accumulateState.contains(key)) {
  String  存在的订单号 = accumulateState.get(key);
  存在订单号 和 value 合并;
out.collect(合并的订单);
} else {
  accumulateState.put(key, value);
}
  }
})
;



发件人: 1900
发送时间: 2019年4月2日 20:59
收件人: paullin3280
主题: 回复: 方案询问

MapState 暂时还不知道怎么做,后面继续研究,我现在做了个版本


1.将收到的流分成两份流,一份初始状态的流,一份终态的流
2.watermark用订单的eventtime,采用滑动窗口进行流的切分
3.根据订单号进行合并,采用CoGroupFunction进行流的处理
4.在CoGroupFunction中合并两个流,流1跟流2进行过滤合并,同一个订单号最终只有一条数据,最终变成一个流


不知道现在这样写怎么样?有没有什么弊端?性能怎么样?会不会造成数据丢失什么的?


-- 原始邮件 --
发件人: "paullin3280";
发送时间: 2019年4月2日(星期二) 下午2:10
收件人: "user-zh";

主题: Re: 方案询问



Hi,

推荐可以维护两个 MapState 分别缓存尚未匹配的两种订单。一条订单数据进来首先查找另一种订单的 MapState,若找到则输出合并的数据并删除对应的 
entry,否则放入所属订单类型的 MapState。

Best,
Paul Lam

> 在 2019年4月2日,13:46,1900 <575209...@qq.com> 写道:
> 
> 现在有个需求,从kafka接收订单信息,每条订单信息有1-2条数据(一般第一条是订单初始状态数据,第二条是订单终态数据);时间间隔不等(一般5秒以内),
> 如何能将数据进行合并,最终合并成一条数据?
> 
> 
> 现在有一个考虑,根据订单号keyby分组后处理,这样的话是不是开启的窗口太多了?



Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

2019-04-02 Thread Timothy Victor
Flink needs type information for serializing and deserializing objects, and
that is lost due to Java type erasure.   The only way to workaround this is
to specify the return type of the function called in the lambda.

Fabian's answer here explains it well.

https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554

Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan  wrote:

> Hi,
> I am trying to use the KeyedStream with Tuple to handle diffrent types of
> Tuples including Tuple6.
> Keep getting the Exception:
> *Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
> Tuple2, etc.) instead*.
> Is there a way around Type Erasure here ?
> I want to use KeyedStream so that I can pass it on to
> treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.
>
> Code below:
>
> KeyedStream monitoringTupleKeyedStream = null;
>> String keyOperationType = ;//provided
>> if (StringUtils.isNotEmpty(keyOperationType)) {
>> if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
>> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>> "gameId", "eventName", "component");
>> } else if
>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
>> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>> "gameId", "eventName", "component", "instance");
>> } else if
>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
>> TypeInformation> String>> info = TypeInformation.of(new TypeHint> String, String, String, String>>(){});
>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>> KeySelector() {
>> public Tuple getKey(Monitoring mon) throws Exception {
>> String key = "";
>> String keyName = "";
>> final String eventName = mon.getEventName();
>> if (eventName != null &&
>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>> )) {
>> keyName = PCAM_ID;
>> key = mon.getEventDataMap() != null ? (String)
>> mon.getEventDataMap().get(PCAM_ID) : "";
>> } else if (eventName != null &&
>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>> keyName = OUT_BITRATE;
>> key = mon.getEventDataMap() != null ? (String)
>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
>> }
>> mon.setKeyName(keyName);
>> mon.setKeyValue(key);
>> return new Tuple6<>(mon.getDeployment(), mon.getGameId(),
>> eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
>> }
>> }); //, info)
>> } else if
>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
>> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>> "gameId", "eventName", "component", "instance", "container"); //<== this is
>> also a Tuple6 but no complaints ?
>> }
>> }
>
>
>
> This example below needs monitoringTupleKeyedStream  to be
> KeyedStream String>>
>
>> TypeInformation>
>> info = TypeInformation.of(new TypeHint> String, String, String>>(){});
>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>> KeySelector> String>>() {
>> @Override
>> public Tuple6> String> getKey(Monitoring mon) throws Exception {
>> String key = "";
>> String keyName = "";
>> //TODO: extract to a method to pull key to use
>> from a config file
>> final String eventName = mon.getEventName();
>> if (eventName != null &&
>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>> )) {
>> keyName = PCAM_ID;
>> key = mon.getEventDataMap() != null ?
>> (String) mon.getEventDataMap().get(PCAM_ID) : "";
>> } else if (eventName != null &&
>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>> keyName = OUT_BITRATE;
>> key = mon.getEventDataMap() != null ?
>> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key
>> to use
>> }
>> mon.setKeyName(keyName);
>> mon.setKeyValue(key);
>> return new Tuple6<>(mon.getDeployment(),
>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>> mon.getKeyValue());
>> }
>> }, info);
>
>
> TIA
>


Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

2019-04-02 Thread Vijay Balakrishnan
Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of
Tuples including Tuple6.
Keep getting the Exception:
*Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
Tuple2, etc.) instead*.
Is there a way around Type Erasure here ?
I want to use KeyedStream so that I can pass it on to
treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

KeyedStream monitoringTupleKeyedStream = null;
> String keyOperationType = ;//provided
> if (StringUtils.isNotEmpty(keyOperationType)) {
> if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
> "gameId", "eventName", "component");
> } else if
> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
> "gameId", "eventName", "component", "instance");
> } else if
> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
> TypeInformation String>> info = TypeInformation.of(new TypeHint String, String, String, String>>(){});
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> KeySelector() {
> public Tuple getKey(Monitoring mon) throws Exception {
> String key = "";
> String keyName = "";
> final String eventName = mon.getEventName();
> if (eventName != null &&
> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
> )) {
> keyName = PCAM_ID;
> key = mon.getEventDataMap() != null ? (String)
> mon.getEventDataMap().get(PCAM_ID) : "";
> } else if (eventName != null &&
> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
> keyName = OUT_BITRATE;
> key = mon.getEventDataMap() != null ? (String)
> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
> }
> mon.setKeyName(keyName);
> mon.setKeyValue(key);
> return new Tuple6<>(mon.getDeployment(), mon.getGameId(),
> eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
> }
> }); //, info)
> } else if
> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
> "gameId", "eventName", "component", "instance", "container"); //<== this is
> also a Tuple6 but no complaints ?
> }
> }



This example below needs monitoringTupleKeyedStream  to be
KeyedStream>

> TypeInformation>
> info = TypeInformation.of(new TypeHint String, String, String>>(){});
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> KeySelector String>>() {
> @Override
> public Tuple6 String> getKey(Monitoring mon) throws Exception {
> String key = "";
> String keyName = "";
> //TODO: extract to a method to pull key to use
> from a config file
> final String eventName = mon.getEventName();
> if (eventName != null &&
> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
> )) {
> keyName = PCAM_ID;
> key = mon.getEventDataMap() != null ? (String)
> mon.getEventDataMap().get(PCAM_ID) : "";
> } else if (eventName != null &&
> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
> keyName = OUT_BITRATE;
> key = mon.getEventDataMap() != null ? (String)
> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
> }
> mon.setKeyName(keyName);
> mon.setKeyValue(key);
> return new Tuple6<>(mon.getDeployment(),
> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
> mon.getKeyValue());
> }
> }, info);


TIA


Re: Use different functions for different signal values

2019-04-02 Thread Ken Krugler
Hi Marke,

You can use DataStream.split() to create a SplitStream, and then call 
SplitStream.select() to create the three different paths to the three functions.

See 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#datastream-transformations
 


— Ken

> On Apr 2, 2019, at 8:41 AM, Marke Builder  wrote:
> 
> Hi,
> 
> I want to implement the following behavior:
> 
> 
> There are a lot of ingest signals with unique Id's, I would use for each 
> signal set a special function. E.g. Signal1, Signal2 ==> function1, Signal3, 
> Signal4 ==> function2.
> What is the recommended way to implement this pattern?
> 
> Thanks!
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: long lived standalone job session cluster in kubernetes

2019-04-02 Thread Till Rohrmann
Hi Heath,

I think some of the PRs are already open and ready for review [1, 2].

[1] https://issues.apache.org/jira/browse/FLINK-10932
[2] https://issues.apache.org/jira/browse/FLINK-10935

Cheers,
Till

On Wed, Feb 27, 2019 at 10:48 AM Heath Albritton  wrote:

> Great, my team is eager to get started.  I’m curious what progress had
> been made so far?
>
> -H
>
> On Feb 26, 2019, at 14:43, Chunhui Shi  wrote:
>
> Hi Heath and Till, thanks for offering help on reviewing this feature. I
> just reassigned the JIRAs to myself after offline discussion with Jin. Let
> us work together to get kubernetes integrated natively with flink. Thanks.
>
> On Fri, Feb 15, 2019 at 12:19 AM Till Rohrmann 
> wrote:
>
>> Alright, I'll get back to you once the PRs are open. Thanks a lot for
>> your help :-)
>>
>> Cheers,
>> Till
>>
>> On Thu, Feb 14, 2019 at 5:45 PM Heath Albritton 
>> wrote:
>>
>>> My team and I are keen to help out with testing and review as soon as
>>> there is a pill request.
>>>
>>> -H
>>>
>>> On Feb 11, 2019, at 00:26, Till Rohrmann  wrote:
>>>
>>> Hi Heath,
>>>
>>> I just learned that people from Alibaba already made some good progress
>>> with FLINK-9953. I'm currently talking to them in order to see how we can
>>> merge this contribution into Flink as fast as possible. Since I'm quite
>>> busy due to the upcoming release I hope that other community members will
>>> help out with the reviewing once the PRs are opened.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Feb 8, 2019 at 8:50 PM Heath Albritton 
>>> wrote:
>>>
 Has any progress been made on this?  There are a number of folks in
 the community looking to help out.


 -H

 On Wed, Dec 5, 2018 at 10:00 AM Till Rohrmann 
 wrote:
 >
 > Hi Derek,
 >
 > there is this issue [1] which tracks the active Kubernetes
 integration. Jin Sun already started implementing some parts of it. There
 should also be some PRs open for it. Please check them out.
 >
 > [1] https://issues.apache.org/jira/browse/FLINK-9953
 >
 > Cheers,
 > Till
 >
 > On Wed, Dec 5, 2018 at 6:39 PM Derek VerLee 
 wrote:
 >>
 >> Sounds good.
 >>
 >> Is someone working on this automation today?
 >>
 >> If not, although my time is tight, I may be able to work on a PR for
 getting us started down the path Kubernetes native cluster mode.
 >>
 >>
 >> On 12/4/18 5:35 AM, Till Rohrmann wrote:
 >>
 >> Hi Derek,
 >>
 >> what I would recommend to use is to trigger the cancel with
 savepoint command [1]. This will create a savepoint and terminate the job
 execution. Next you simply need to respawn the job cluster which you
 provide with the savepoint to resume from.
 >>
 >> [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
 >>
 >> Cheers,
 >> Till
 >>
 >> On Tue, Dec 4, 2018 at 10:30 AM Andrey Zagrebin <
 and...@data-artisans.com> wrote:
 >>>
 >>> Hi Derek,
 >>>
 >>> I think your automation steps look good.
 >>> Recreating deployments should not take long
 >>> and as you mention, this way you can avoid unpredictable old/new
 version collisions.
 >>>
 >>> Best,
 >>> Andrey
 >>>
 >>> > On 4 Dec 2018, at 10:22, Dawid Wysakowicz 
 wrote:
 >>> >
 >>> > Hi Derek,
 >>> >
 >>> > I am not an expert in kubernetes, so I will cc Till, who should
 be able
 >>> > to help you more.
 >>> >
 >>> > As for the automation for similar process I would recommend
 having a
 >>> > look at dA platform[1] which is built on top of kubernetes.
 >>> >
 >>> > Best,
 >>> >
 >>> > Dawid
 >>> >
 >>> > [1] https://data-artisans.com/platform-overview
 >>> >
 >>> > On 30/11/2018 02:10, Derek VerLee wrote:
 >>> >>
 >>> >> I'm looking at the job cluster mode, it looks great and I and
 >>> >> considering migrating our jobs off our "legacy" session cluster
 and
 >>> >> into Kubernetes.
 >>> >>
 >>> >> I do need to ask some questions because I haven't found a lot of
 >>> >> details in the documentation about how it works yet, and I gave
 up
 >>> >> following the the DI around in the code after a while.
 >>> >>
 >>> >> Let's say I have a deployment for the job "leader" in HA with
 ZK, and
 >>> >> another deployment for the taskmanagers.
 >>> >>
 >>> >> I want to upgrade the code or configuration and start from a
 >>> >> savepoint, in an automated way.
 >>> >>
 >>> >> Best I can figure, I can not just update the deployment
 resources in
 >>> >> kubernetes and allow the containers to restart in an arbitrary
 order.
 >>> >>
 >>> >> Instead, I expect sequencing is important, something along the
 lines
 >>> >> of this:
 >>> >>
 >>> >> 1. issue savepoint command on leader

Re: Metrics for received records per TaskManager

2019-04-02 Thread Yun Tang
Hi Benjamin

Flink could support to report its metrics to external system such as 
Prometheus, Graphite and so on [1]. And you could then use web front end such 
as Grafana to query those system. Take `numBytesInLocalPerSecond` metrics for 
example, it would have many metrics tags and one of them is `tm_id` (task 
manager id). And if you group this metrics by `tm_id` to a specific task 
manager node, you would view received bytes from local at that task manager.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter

Best
Yun Tang

From: Benjamin Burkhardt 
Sent: Wednesday, April 3, 2019 0:21
To: user@flink.apache.org; Yun Tang
Subject: Re: Metrics for received records per TaskManager

Hi Yun,

thank you for the advice, but how would you suggest doing it to get the metrics 
also for each TaskManager?
I do not urgently need to use REST because I’m running my code within Flink. 
Maybe there is another way to access it?

Thanks a lot.

Benjamin
Am 2. Apr. 2019, 18:26 +0200 schrieb Yun Tang :
Hi Benjamin

Try this
http://localhost:8081/jobs/{job-id}/vertices/{vertices-id}/subtasks/{subtask-index}/metrics?get=numBytesInLocalPerSecond

You could GET 
http://localhost:8081/jobs/
 to know running jobs,  and GET 
http://localhost:8081/jobs/{job-id}/vertices/
 to know all vertices similarly.

However, AFAIK, if you use REST API to query I'm afraid you cannot directly 
know the received records per task manager, and you have to gather these 
metrics per task.

Best
Yun Tang

From: Benjamin Burkhardt 
Sent: Tuesday, April 2, 2019 21:56
To: user@flink.apache.org; Yun Tang
Subject: Re: Metrics for received records per TaskManager

Hi Yun,

thanks for the hint. I tried to access the metric through the REST API calling 
http://localhost:8081/taskmanagers/2264f296385854f2d1fb4d121495822a/metrics?get=
 numBytesInRemotePerSecond.

Unfortunately the metric is not available...

Only these are avaiblable:
[{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{„id“:“Status.JVM.Memory.NonHeap.Max“}]


How do I enable it, maybe in the flink-conf?

Thanks.

Benjamin
Am 2. Apr. 2019, 10:37 +0200 schrieb Yun Tang :
Hi Benjamin

I think 'numBytesInLocalPerSecond' and 'numBytesInRemotePerSecond' which 
indicate 'The number of bytes this task reads from a local source per second' 
and 'The number of bytes this task reads from a remote source per second' 
respectively could help you. If you want to track the information by each 
TaskManager, please group the metrics by tag 'tm_id'.

You could refer to 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
  for more information.

Best
Yun Tang


From: Benjamin Burkhardt 
Sent: Tuesday, April 2, 2019 15:00
To: user@flink.apache.org
Subject: Metrics for received records per TaskManager

Hi all,

I’m looking for a metric which allows me keeping track of the records or bytes 
each TaskManager has received or processed for the current task.

Can anyone help me getting this?

Thanks.

Benjamin


Re: BucketAssigner - Confusion

2019-04-02 Thread Jeff Crane
 According to my IDE (Jetbrains), I get an error with getBucketID(IN, Context) 
signature requiring a return of string (Flink 1.7 libs), so I still don't think 
the BucketID is a variable type.

I still don't understand the role of the:public 
SimpleVersionedSerializer getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}Where does that come into play, if the getBucketID makes a string anyway?



On Monday, April 1, 2019, 11:44:14 AM PDT, Jeff Crane 
 wrote:  
 
 I have had an issue understanding the documentation, in regard to 
BucketAssigner.BucketID getBucketId(IN element,
 BucketAssigner.Context 
context)SimpleVersionedSerializer getSerializer()First of all, I 
don't understand what type of "BucketID" means. I assume that's the returned 
type fo the getBucketID, which doesn't make sense. The description says 
getBucketId (returns?)  "A string representing the identifier of the bucket" So 
BucketID is not a type, it's always a string?Base on the docs, I implemented 
like this, which doesn't write anything!public final class CustomBucketAssigner 
implements BucketAssigner {

public String getBucketId(final MyEvent element, final Context context) {

DateTime dateTimeL = new DateTime(context.currentWatermark());

return String.join("_",
String.valueOf(dateTimeL.getYear()),
String.valueOf(dateTimeL.getMonthOfYear()),
String.valueOf(dateTimeL.getDayOfMonth()),
String.valueOf(dateTimeL.getHourOfDay()),
String.valueOf(dateTimeL.getMinuteOfHour())
);
}

// I assume  because BucketID is always string?
public SimpleVersionedSerializer getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}}
Can someone explain how bucketAssigned is supposed to do in plainer english. I 
don't think the docs are clear and I'm lost.  

Re: Metrics for received records per TaskManager

2019-04-02 Thread Benjamin Burkhardt
Hi Yun,

thank you for the advice, but how would you suggest doing it to get the metrics 
also for each TaskManager?
I do not urgently need to use REST because I’m running my code within Flink. 
Maybe there is another way to access it?

Thanks a lot.

Benjamin
Am 2. Apr. 2019, 18:26 +0200 schrieb Yun Tang :
> Hi Benjamin
>
> Try this
> http://localhost:8081/jobs/{job-id}/vertices/{vertices-id}/subtasks/{subtask-index}/metrics?get=numBytesInLocalPerSecond
>
> You could GET http://localhost:8081/jobs/ to know running jobs,  and GET 
> http://localhost:8081/jobs/{job-id}/vertices/ to know all vertices similarly.
>
> However, AFAIK, if you use REST API to query I'm afraid you cannot directly 
> know the received records per task manager, and you have to gather these 
> metrics per task.
>
> Best
> Yun Tang
> From: Benjamin Burkhardt 
> Sent: Tuesday, April 2, 2019 21:56
> To: user@flink.apache.org; Yun Tang
> Subject: Re: Metrics for received records per TaskManager
>
> Hi Yun,
>
> thanks for the hint. I tried to access the metric through the REST API 
> calling 
> http://localhost:8081/taskmanagers/2264f296385854f2d1fb4d121495822a/metrics?get=
>  numBytesInRemotePerSecond.
>
> Unfortunately the metric is not available...
>
> Only these are avaiblable:
> [{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{„id“:“Status.JVM.Memory.NonHeap.Max“}]
>
>
> How do I enable it, maybe in the flink-conf?
>
> Thanks.
>
> Benjamin
> Am 2. Apr. 2019, 10:37 +0200 schrieb Yun Tang :
> > Hi Benjamin
> >
> > I think 'numBytesInLocalPerSecond' and 'numBytesInRemotePerSecond' which 
> > indicate 'The number of bytes this task reads from a local source per 
> > second' and 'The number of bytes this task reads from a remote source per 
> > second' respectively could help you. If you want to track the information 
> > by each TaskManager, please group the metrics by tag 'tm_id'.
> >
> > You could refer to 
> > https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
> >   for more information.
> >
> > Best
> > Yun Tang
> >
> > From: Benjamin Burkhardt 
> > Sent: Tuesday, April 2, 2019 15:00
> > To: user@flink.apache.org
> > Subject: Metrics for received records per TaskManager
> >
> > Hi all,
> >
> > I’m looking for a metric which allows me keeping track of the records or 
> > bytes each TaskManager has received or processed for the current task.
> >
> > Can anyone help me getting this?
> >
> > Thanks.
> >
> > Benjamin


Re: Metrics for received records per TaskManager

2019-04-02 Thread Yun Tang
Hi Benjamin

Try this
http://localhost:8081/jobs/{job-id}/vertices/{vertices-id}/subtasks/{subtask-index}/metrics?get=numBytesInLocalPerSecond

You could GET 
http://localhost:8081/jobs/
 to know running jobs,  and GET 
http://localhost:8081/jobs/{job-id}/vertices/
 to know all vertices similarly.

However, AFAIK, if you use REST API to query I'm afraid you cannot directly 
know the received records per task manager, and you have to gather these 
metrics per task.

Best
Yun Tang

From: Benjamin Burkhardt 
Sent: Tuesday, April 2, 2019 21:56
To: user@flink.apache.org; Yun Tang
Subject: Re: Metrics for received records per TaskManager

Hi Yun,

thanks for the hint. I tried to access the metric through the REST API calling 
http://localhost:8081/taskmanagers/2264f296385854f2d1fb4d121495822a/metrics?get=
 numBytesInRemotePerSecond.

Unfortunately the metric is not available...

Only these are avaiblable:
[{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{„id“:“Status.JVM.Memory.NonHeap.Max“}]


How do I enable it, maybe in the flink-conf?

Thanks.

Benjamin
Am 2. Apr. 2019, 10:37 +0200 schrieb Yun Tang :
Hi Benjamin

I think 'numBytesInLocalPerSecond' and 'numBytesInRemotePerSecond' which 
indicate 'The number of bytes this task reads from a local source per second' 
and 'The number of bytes this task reads from a remote source per second' 
respectively could help you. If you want to track the information by each 
TaskManager, please group the metrics by tag 'tm_id'.

You could refer to 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
  for more information.

Best
Yun Tang


From: Benjamin Burkhardt 
Sent: Tuesday, April 2, 2019 15:00
To: user@flink.apache.org
Subject: Metrics for received records per TaskManager

Hi all,

I’m looking for a metric which allows me keeping track of the records or bytes 
each TaskManager has received or processed for the current task.

Can anyone help me getting this?

Thanks.

Benjamin


Use different functions for different signal values

2019-04-02 Thread Marke Builder
Hi,

I want to implement the following behavior:

[image: image.png]
There are a lot of ingest signals with unique Id's, I would use for each
signal set a special function. E.g. Signal1, Signal2 ==> function1,
Signal3, Signal4 ==> function2.
What is the recommended way to implement this pattern?

Thanks!


Re: How to run a job with job cluster mode on top of mesos?

2019-04-02 Thread Jacky Yin 殷传旺
Yes, it worked for me. However, just like what you said, it is not that 
straightforward, so I would like to learn from ` StandaloneJobClusterEntrypoint 
` and try to enhance the ` MesosJobClusterEntrypoint`. 

Jacky Yin
发件人: Till Rohrmann 
日期: 2019年4月2日 星期二 下午10:50
收件人: Jacky Yin 殷传旺 
抄送: "user@flink.apache.org" 
主题: Re: How to run a job with job cluster mode on top of mesos?

By the way, did the Mesos job mode work for you in the end?

On Tue, Apr 2, 2019 at 7:47 AM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Sure I will help with the review. Thanks for opening the PR Jacky.

Cheers,
Till

On Tue, Apr 2, 2019 at 2:17 AM Jacky Yin 殷传旺 
mailto:jacky@eoitek.com>> wrote:
Hello Till,

I submitted a PR(#8084) for this issue. Could you help review it?

Many thanks!

Jacky Yin
发件人: Till Rohrmann mailto:trohrm...@apache.org>>
日期: 2019年3月29日 星期五 下午11:06
收件人: Jacky Yin 殷传旺 mailto:jacky@eoitek.com>>
抄送: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
主题: Re: How to run a job with job cluster mode on top of mesos?

Thanks a lot Jacky.

Cheers,
Till

On Fri, Mar 29, 2019 at 1:50 PM Jacky Yin 殷传旺 
mailto:jacky@eoitek.com>> wrote:
Hello Till,

Thanks very much for the clear steps. And I noticed that you have already 
created a jira ticket for this issue and I would like to contribute it. (I have 
already assigned it to me). 


Jacky Yin
发件人: Till Rohrmann mailto:trohrm...@apache.org>>
日期: 2019年3月26日 星期二 下午6:31
收件人: Jacky Yin 殷传旺 mailto:jacky@eoitek.com>>
抄送: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
主题: Re: How to run a job with job cluster mode on top of mesos?

Hi Jacky,

you're right that we are currently lacking documentation for the 
`mesos-appmaster-job.sh` script. I've added a JIRA issue to cover this [1].

In order to use this script you first need to store a serialized version of the 
`JobGraph` you want to run somewhere where the Mesos appmaster can read it. 
Moreover, you need to make sure that the user code jars are on the classpath 
(e.g. putting them in the lib directory). See how the 
AbstractYarnClusterDescriptor serializes the `JobGraph` [2] for some details. 
The last thing before you can start the MesosJobClusterEntrypoint is to specify 
the path to the serialized job graph via the `internal.jobgraph-path` 
configuration option which you can specify in your flink-conf.yaml or pass in 
via dynamic property.

I know that this is not super streamlined and needs to be improved. E.g. one 
could do it similarly to the `StandaloneJobClusterEntrypoint` that one includes 
the user code jar and specifies the class name of the user code to load. That 
way one would not need to generate the JobGraph yourself and then serialize it.

I hope that I could help you a little bit.

[1] https://issues.apache.org/jira/browse/FLINK-12020
[2] 
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L846

Cheers,
Till

On Fri, Mar 22, 2019 at 7:24 AM Jacky Yin 殷传旺 
mailto:jacky@eoitek.com>> wrote:
Anybody can help? I found the help documentation about how to do with the job 
cluster mode for yarn, docker and k8s. However only the help of session cluster 
mode is provided in flink web site for mesos.
It looks like the shell “mesos-appmaster-job.sh” should be the right one to run 
a job with the mode of job cluster on top of mesos.  However I cannot find any 
help or example for this shell.
Any help will be greatly appreciated.

Thanks!

Jacky Yin
发件人: Jacky Yin 殷传旺 mailto:jacky@eoitek.com>>
日期: 2019年3月21日 星期四 下午2:31
收件人: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
主题: Documentation of mesos-appmaster-job.sh

Hello All,

I cannot find any documentation or help about how to use 
$flin_home/bin/mesos-appmaster-job.sh.  Anybody help?

Thanks!

Jacky Yin


Re: How to run a job with job cluster mode on top of mesos?

2019-04-02 Thread Till Rohrmann
By the way, did the Mesos job mode work for you in the end?

On Tue, Apr 2, 2019 at 7:47 AM Till Rohrmann  wrote:

> Sure I will help with the review. Thanks for opening the PR Jacky.
>
> Cheers,
> Till
>
> On Tue, Apr 2, 2019 at 2:17 AM Jacky Yin 殷传旺  wrote:
>
>> Hello Till,
>>
>>
>>
>> I submitted a PR(#8084) for this issue. Could you help review it?
>>
>>
>>
>> Many thanks!
>>
>>
>>
>> *Jacky Yin*
>>
>> *发件人**: *Till Rohrmann 
>> *日期**: *2019年3月29日 星期五 下午11:06
>> *收件人**: *Jacky Yin 殷传旺 
>> *抄送**: *"user@flink.apache.org" 
>> *主题**: *Re: How to run a job with job cluster mode on top of mesos?
>>
>>
>>
>> Thanks a lot Jacky.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Fri, Mar 29, 2019 at 1:50 PM Jacky Yin 殷传旺 
>> wrote:
>>
>> Hello Till,
>>
>>
>>
>> Thanks very much for the clear steps. And I noticed that you have already
>> created a jira ticket for this issue and I would like to contribute it. (I
>> have already assigned it to me). 
>>
>>
>>
>>
>>
>> *Jacky Yin *
>>
>> *发件人**: *Till Rohrmann 
>> *日期**: *2019年3月26日 星期二 下午6:31
>> *收件人**: *Jacky Yin 殷传旺 
>> *抄送**: *"user@flink.apache.org" 
>> *主题**: *Re: How to run a job with job cluster mode on top of mesos?
>>
>>
>>
>> Hi Jacky,
>>
>>
>>
>> you're right that we are currently lacking documentation for the
>> `mesos-appmaster-job.sh` script. I've added a JIRA issue to cover this [1].
>>
>>
>>
>> In order to use this script you first need to store a serialized version
>> of the `JobGraph` you want to run somewhere where the Mesos appmaster can
>> read it. Moreover, you need to make sure that the user code jars are on the
>> classpath (e.g. putting them in the lib directory). See how the
>> AbstractYarnClusterDescriptor serializes the `JobGraph` [2] for some
>> details. The last thing before you can start the MesosJobClusterEntrypoint
>> is to specify the path to the serialized job graph via the
>> `internal.jobgraph-path` configuration option which you can specify in your
>> flink-conf.yaml or pass in via dynamic property.
>>
>>
>>
>> I know that this is not super streamlined and needs to be improved. E.g.
>> one could do it similarly to the `StandaloneJobClusterEntrypoint` that one
>> includes the user code jar and specifies the class name of the user code to
>> load. That way one would not need to generate the JobGraph yourself and
>> then serialize it.
>>
>>
>>
>> I hope that I could help you a little bit.
>>
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12020
>>
>> [2]
>> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L846
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Fri, Mar 22, 2019 at 7:24 AM Jacky Yin 殷传旺 
>> wrote:
>>
>> Anybody can help? I found the help documentation about how to do with the
>> job cluster mode for yarn, docker and k8s. However only the help of session
>> cluster mode is provided in flink web site for mesos.
>>
>> It looks like the shell “mesos-appmaster-job.sh” should be the right one
>> to run a job with the mode of job cluster on top of mesos.  However I
>> cannot find any help or example for this shell.
>>
>> Any help will be greatly appreciated.
>>
>>
>>
>> Thanks!
>>
>>
>>
>> *Jacky Yin *
>>
>> *发件人**: *Jacky Yin 殷传旺 
>> *日期**: *2019年3月21日 星期四 下午2:31
>> *收件人**: *"user@flink.apache.org" 
>> *主题**: *Documentation of mesos-appmaster-job.sh
>>
>>
>>
>> Hello All,
>>
>>
>>
>> I cannot find any documentation or help about how to use
>> $flin_home/bin/mesos-appmaster-job.sh.  Anybody help?
>>
>>
>>
>> Thanks!
>>
>>
>>
>> *Jacky Yin*
>>
>>


Re: How to run a job with job cluster mode on top of mesos?

2019-04-02 Thread Till Rohrmann
Sure I will help with the review. Thanks for opening the PR Jacky.

Cheers,
Till

On Tue, Apr 2, 2019 at 2:17 AM Jacky Yin 殷传旺  wrote:

> Hello Till,
>
>
>
> I submitted a PR(#8084) for this issue. Could you help review it?
>
>
>
> Many thanks!
>
>
>
> *Jacky Yin*
>
> *发件人**: *Till Rohrmann 
> *日期**: *2019年3月29日 星期五 下午11:06
> *收件人**: *Jacky Yin 殷传旺 
> *抄送**: *"user@flink.apache.org" 
> *主题**: *Re: How to run a job with job cluster mode on top of mesos?
>
>
>
> Thanks a lot Jacky.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Mar 29, 2019 at 1:50 PM Jacky Yin 殷传旺 
> wrote:
>
> Hello Till,
>
>
>
> Thanks very much for the clear steps. And I noticed that you have already
> created a jira ticket for this issue and I would like to contribute it. (I
> have already assigned it to me). 
>
>
>
>
>
> *Jacky Yin *
>
> *发件人**: *Till Rohrmann 
> *日期**: *2019年3月26日 星期二 下午6:31
> *收件人**: *Jacky Yin 殷传旺 
> *抄送**: *"user@flink.apache.org" 
> *主题**: *Re: How to run a job with job cluster mode on top of mesos?
>
>
>
> Hi Jacky,
>
>
>
> you're right that we are currently lacking documentation for the
> `mesos-appmaster-job.sh` script. I've added a JIRA issue to cover this [1].
>
>
>
> In order to use this script you first need to store a serialized version
> of the `JobGraph` you want to run somewhere where the Mesos appmaster can
> read it. Moreover, you need to make sure that the user code jars are on the
> classpath (e.g. putting them in the lib directory). See how the
> AbstractYarnClusterDescriptor serializes the `JobGraph` [2] for some
> details. The last thing before you can start the MesosJobClusterEntrypoint
> is to specify the path to the serialized job graph via the
> `internal.jobgraph-path` configuration option which you can specify in your
> flink-conf.yaml or pass in via dynamic property.
>
>
>
> I know that this is not super streamlined and needs to be improved. E.g.
> one could do it similarly to the `StandaloneJobClusterEntrypoint` that one
> includes the user code jar and specifies the class name of the user code to
> load. That way one would not need to generate the JobGraph yourself and
> then serialize it.
>
>
>
> I hope that I could help you a little bit.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-12020
>
> [2]
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L846
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Mar 22, 2019 at 7:24 AM Jacky Yin 殷传旺 
> wrote:
>
> Anybody can help? I found the help documentation about how to do with the
> job cluster mode for yarn, docker and k8s. However only the help of session
> cluster mode is provided in flink web site for mesos.
>
> It looks like the shell “mesos-appmaster-job.sh” should be the right one
> to run a job with the mode of job cluster on top of mesos.  However I
> cannot find any help or example for this shell.
>
> Any help will be greatly appreciated.
>
>
>
> Thanks!
>
>
>
> *Jacky Yin *
>
> *发件人**: *Jacky Yin 殷传旺 
> *日期**: *2019年3月21日 星期四 下午2:31
> *收件人**: *"user@flink.apache.org" 
> *主题**: *Documentation of mesos-appmaster-job.sh
>
>
>
> Hello All,
>
>
>
> I cannot find any documentation or help about how to use
> $flin_home/bin/mesos-appmaster-job.sh.  Anybody help?
>
>
>
> Thanks!
>
>
>
> *Jacky Yin*
>
>


Re: kafka corrupt record exception

2019-04-02 Thread Dominik Wosiński
Hey,
As far as I understand the error is not caused by the deserialization but
really by the polling of the message, so custom deserialization schema
won't really help in this case. There seems to be an error in the messages
in Your topic.

You can see here
what
is the data that should be associated with the message. One thing you could
possibly do is simply find the offset of the corrupted message and start
reading after the record. However, You should probably verify what is the
reason for the message size being smaller than it should. One thing that
can cause this exact behavior may be a mismatch between Kafka versions on
broker and consumer.

Best Regards,
Dom.

wt., 2 kwi 2019 o 09:36 Ilya Karpov  napisał(a):

> According to docs (here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>  ,
> last paragraph) that’s an expected behaviour. May be you should think about
> writing your own deserialisation schema to skip corrupted messages.
>
> 1 апр. 2019 г., в 18:19, Sushant Sawant 
> написал(а):
>
> Hi,
> Thanks for reply.
> But is there a way one could skip this corrupt record from Flink consumer?
> Flink job goes in a loop, it restarts and then fails again at same record.
>
>
> On Mon, 1 Apr 2019, 07:34 Congxian Qiu,  wrote:
>
>> Hi
>> As you said, consume from ubuntu terminal has the same error, maybe you
>> could send a email to kafka user maillist.
>>
>> Best, Congxian
>> On Apr 1, 2019, 05:26 +0800, Sushant Sawant ,
>> wrote:
>>
>> Hi team,
>> I am facing this exception,
>>
>> org.apache.kafka.common.KafkaException: Received exception when fetching
>> the next record from topic_log-3. If needed, please seek past the record to
>> continue consumption.
>>
>> at
>> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1076)
>>
>> at
>> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:944)
>>
>> at
>> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:567)
>>
>> at
>> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528)
>>
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
>>
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>>
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
>>
>> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record
>> size is less than the minimum record overhead (14)
>>
>>
>> Also, when I consume message from ubuntu terminal consumer, I get same
>> error.
>>
>> How can skip this corrupt record?
>>
>>
>>
>>
>>
>


Re: Metrics for received records per TaskManager

2019-04-02 Thread Benjamin Burkhardt
Hi Yun,

thanks for the hint. I tried to access the metric through the REST API calling 
http://localhost:8081/taskmanagers/2264f296385854f2d1fb4d121495822a/metrics?get=
 numBytesInRemotePerSecond.

Unfortunately the metric is not available...

Only these are avaiblable:
[{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{„id“:“Status.JVM.Memory.NonHeap.Max“}]


How do I enable it, maybe in the flink-conf?

Thanks.

Benjamin
Am 2. Apr. 2019, 10:37 +0200 schrieb Yun Tang :
> Hi Benjamin
>
> I think 'numBytesInLocalPerSecond' and 'numBytesInRemotePerSecond' which 
> indicate 'The number of bytes this task reads from a local source per second' 
> and 'The number of bytes this task reads from a remote source per second' 
> respectively could help you. If you want to track the information by each 
> TaskManager, please group the metrics by tag 'tm_id'.
>
> You could refer to 
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
>   for more information.
>
> Best
> Yun Tang
>
> From: Benjamin Burkhardt 
> Sent: Tuesday, April 2, 2019 15:00
> To: user@flink.apache.org
> Subject: Metrics for received records per TaskManager
>
> Hi all,
>
> I’m looking for a metric which allows me keeping track of the records or 
> bytes each TaskManager has received or processed for the current task.
>
> Can anyone help me getting this?
>
> Thanks.
>
> Benjamin


?????? ????????

2019-04-02 Thread 1900
MapState ??


1.
2.watermarkeventtime,
3.CoGroupFunction
4.??CoGroupFunction12??


??


--  --
??: "paullin3280";
: 2019??4??2??(??) 2:10
??: "user-zh";

: Re: 



Hi,

 MapState 
 
MapState entry 
MapState??

Best,
Paul Lam

> ?? 2019??4??213:46??1900 <575209...@qq.com> ??
> 
> kafka1-25??
> ??
> 
> 
> ??keyby

Re: Any suggestions about which GC collector to use in Flink?

2019-04-02 Thread qi luo
+1. It would be great if someone could benchmark between difference GC in Flink 
(we may do it in next few months).

I’m told that the default parallel GC provides better throughput but longer 
pauses (we encountered 2min+ GC pauses in large dataset). Whereas the G1GC 
provides less pauses but also less throughput.

> On Apr 2, 2019, at 3:26 PM, 徐涛  wrote:
> 
> Hi Experts,
>   In my environment, when I submit the Flink program to yarn, I do not 
> specify which GC collector to use, in the web monitor page, I found it uses 
> PS_Scavenge as the young generation GC collector, PS_MarkSweep as the old 
> generation GC collector, I wonder if I can use G1 as the GC collector, does 
> anyone has evaluated the performance of which GC collector to use? I believe 
> that use G1 can reduce the old generation GC time, especially for the large 
> heap.
> 
> Best
> Henry



End to End Performance Testing

2019-04-02 Thread WILSON Frank
Hi,

I am looking for resources to help me test the performance of my Flink 
pipelines end-to-end. I want to verify that my pipelines meet throughput and 
latency requirements (so for a given number of events per second the latency of 
the output is under so many seconds). I read that Alibaba had developed a Blink 
test platform that offers performance and stability testing [1] that seem to be 
relevant to this problem.

I read also that Blink has been contributed back to Flink [2] and I was 
wondering if the test platform is also included in the branch [3]?

Thanks,


Frank Wilson


[1] 
https://hackernoon.com/from-code-quality-to-integration-optimizing-alibabas-blink-testing-framework-dc9c357319de#190e

[2] https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html

[3] https://github.com/apache/flink/tree/blink



InvalidProgramException when trying to sort a group within a dataset

2019-04-02 Thread Papadopoulos, Konstantinos
Hi all,

I am trying to sort a group within a dataset using KeySelector as follows:

in
  .groupBy("productId", "timePeriodId", "geographyId")
  .sortGroup(new KeySelector() {

@Override
public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

  return 
Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());

 }
  }, Order.ASCENDING)
  .reduceGroup(/* do something */)

And I am getting the following exception:

org.apache.flink.api.common.InvalidProgramException: KeySelector group-sorting 
keys can only be used with KeySelector grouping keys.

 at 
org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318)
 at 
com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91)
 at 
com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
 at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
 at java.util.ArrayList.forEach(ArrayList.java:1257)
 at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
 at java.util.ArrayList.forEach(ArrayList.java:1257)
 at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
 at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
 at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
 at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
 at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
 at 
org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
 at 
org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
 at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
 at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
 at 
com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
 at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
 at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
 at 

Re: How to run a job with job cluster mode on top of mesos?

2019-04-02 Thread Jacky Yin 殷传旺
Hello Till,

I submitted a PR(#8084) for this issue. Could you help review it?

Many thanks!

Jacky Yin
发件人: Till Rohrmann 
日期: 2019年3月29日 星期五 下午11:06
收件人: Jacky Yin 殷传旺 
抄送: "user@flink.apache.org" 
主题: Re: How to run a job with job cluster mode on top of mesos?

Thanks a lot Jacky.

Cheers,
Till

On Fri, Mar 29, 2019 at 1:50 PM Jacky Yin 殷传旺 
mailto:jacky@eoitek.com>> wrote:
Hello Till,

Thanks very much for the clear steps. And I noticed that you have already 
created a jira ticket for this issue and I would like to contribute it. (I have 
already assigned it to me). 


Jacky Yin
发件人: Till Rohrmann mailto:trohrm...@apache.org>>
日期: 2019年3月26日 星期二 下午6:31
收件人: Jacky Yin 殷传旺 mailto:jacky@eoitek.com>>
抄送: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
主题: Re: How to run a job with job cluster mode on top of mesos?

Hi Jacky,

you're right that we are currently lacking documentation for the 
`mesos-appmaster-job.sh` script. I've added a JIRA issue to cover this [1].

In order to use this script you first need to store a serialized version of the 
`JobGraph` you want to run somewhere where the Mesos appmaster can read it. 
Moreover, you need to make sure that the user code jars are on the classpath 
(e.g. putting them in the lib directory). See how the 
AbstractYarnClusterDescriptor serializes the `JobGraph` [2] for some details. 
The last thing before you can start the MesosJobClusterEntrypoint is to specify 
the path to the serialized job graph via the `internal.jobgraph-path` 
configuration option which you can specify in your flink-conf.yaml or pass in 
via dynamic property.

I know that this is not super streamlined and needs to be improved. E.g. one 
could do it similarly to the `StandaloneJobClusterEntrypoint` that one includes 
the user code jar and specifies the class name of the user code to load. That 
way one would not need to generate the JobGraph yourself and then serialize it.

I hope that I could help you a little bit.

[1] https://issues.apache.org/jira/browse/FLINK-12020
[2] 
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L846

Cheers,
Till

On Fri, Mar 22, 2019 at 7:24 AM Jacky Yin 殷传旺 
mailto:jacky@eoitek.com>> wrote:
Anybody can help? I found the help documentation about how to do with the job 
cluster mode for yarn, docker and k8s. However only the help of session cluster 
mode is provided in flink web site for mesos.
It looks like the shell “mesos-appmaster-job.sh” should be the right one to run 
a job with the mode of job cluster on top of mesos.  However I cannot find any 
help or example for this shell.
Any help will be greatly appreciated.

Thanks!

Jacky Yin
发件人: Jacky Yin 殷传旺 mailto:jacky@eoitek.com>>
日期: 2019年3月21日 星期四 下午2:31
收件人: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
主题: Documentation of mesos-appmaster-job.sh

Hello All,

I cannot find any documentation or help about how to use 
$flin_home/bin/mesos-appmaster-job.sh.  Anybody help?

Thanks!

Jacky Yin


Re: Metrics for received records per TaskManager

2019-04-02 Thread Yun Tang
Hi Benjamin

I think 'numBytesInLocalPerSecond' and 'numBytesInRemotePerSecond' which 
indicate 'The number of bytes this task reads from a local source per second' 
and 'The number of bytes this task reads from a remote source per second' 
respectively could help you. If you want to track the information by each 
TaskManager, please group the metrics by tag 'tm_id'.

You could refer to 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
  for more information.

Best
Yun Tang


From: Benjamin Burkhardt 
Sent: Tuesday, April 2, 2019 15:00
To: user@flink.apache.org
Subject: Metrics for received records per TaskManager

Hi all,

I’m looking for a metric which allows me keeping track of the records or bytes 
each TaskManager has received or processed for the current task.

Can anyone help me getting this?

Thanks.

Benjamin


Re: How can I get the right TaskExecutor in ProcessFunction

2019-04-02 Thread Andrey Zagrebin
Hi,

What kind of information do you need about the TaskExecutor? This is
usually quite low level type of information which might change randomly,
e.g. after restore.
What is the original problem why you need it? Maybe, there is another
solution. E.g. you can get index of local parallel subtask of your
ProcessFunction:
RuntimeContext.getIndexOfThisSubtask
out of
RuntimeContext.getNumberOfParallelSubtasks.

Best,
Andrey

On Mon, Apr 1, 2019 at 11:33 AM peibin wang  wrote:

> Hi all
>
>  I want to get the right TaskExecutor where the  ProcessFunction run at.
> Is there any way to get it from getRuntimeContext() . Or any other
> solution??
>
>
>
> Best wishes,
>


????: ????: ????????

2019-04-02 Thread baiyg25...@hundsun.com
table ??join?? ??



baiyg25...@hundsun.com
 
 492341344
?? 2019-04-02 14:42
 user-zh
?? ??: 

??blink 
sql
??datastreamrichFunctionopen??(,)??processElement??open
sql
 
 
 
 
--  --
??: "??";
: 2019??4??2??(??) 2:34
??: "user-zh@flink.apache.org";
 
: : 
 
 
 

 
??: 492341344
: 2019??4??2?? 10:06
??: user-zh
: 
 



Re: kafka corrupt record exception

2019-04-02 Thread Ilya Karpov
According to docs (here: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
 

 , last paragraph) that’s an expected behaviour. May be you should think about 
writing your own deserialisation schema to skip corrupted messages.

> 1 апр. 2019 г., в 18:19, Sushant Sawant  
> написал(а):
> 
> Hi,
> Thanks for reply. 
> But is there a way one could skip this corrupt record from Flink consumer?
> Flink job goes in a loop, it restarts and then fails again at same record.
> 
> 
> On Mon, 1 Apr 2019, 07:34 Congxian Qiu,  > wrote:
> Hi
> As you said, consume from ubuntu terminal has the same error, maybe you could 
> send a email to kafka user maillist.
> 
> Best, Congxian
> On Apr 1, 2019, 05:26 +0800, Sushant Sawant  >, wrote:
>> Hi team,
>> I am facing this exception,
>> org.apache.kafka.common.KafkaException: Received exception when fetching the 
>> next record from topic_log-3. If needed, please seek past the record to 
>> continue consumption.
>> 
>> at 
>> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1076)
>> 
>> at 
>> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:944)
>> 
>> at 
>> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:567)
>> 
>> at 
>> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528)
>> 
>> at 
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
>> 
>> at 
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>> 
>> at 
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
>> 
>> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record 
>> size is less than the minimum record overhead (14)
>> 
>> 
>> 
>> Also, when I consume message from ubuntu terminal consumer, I get same error.
>> 
>> How can skip this corrupt record?
>> 
>> 
>> 
>> 
>> 
>> 
>> 



Any suggestions about which GC collector to use in Flink?

2019-04-02 Thread 徐涛
Hi Experts,
In my environment, when I submit the Flink program to yarn, I do not 
specify which GC collector to use, in the web monitor page, I found it uses 
PS_Scavenge as the young generation GC collector, PS_MarkSweep as the old 
generation GC collector, I wonder if I can use G1 as the GC collector, does 
anyone has evaluated the performance of which GC collector to use? I believe 
that use G1 can reduce the old generation GC time, especially for the large 
heap.

Best
Henry

Re:回复: 远程提交代码到Flink集群

2019-04-02 Thread Yuan Yifan
获取生成的图是可以的,env.,但是有一个问题,你图中使用的各种依赖恐怕无法如期提交到各个节点上,所以运行的时候还是得打包的JAR的……我建议此事就不折腾了吧。








在 2019-04-02 14:39:45,"文报" <1010467...@qq.com> 写道:
>谢谢各位的回复。
>   
> 我通过将代码推到git上,利用jenkins打包生成jar文件,然后调用shell脚本完成了简单的自动化。今天看见了JobManager是可以直接接受JobGraph,那么我能不能在代码中直接获取到自己代码生成的JobGraph,提交到JobManager上,这样就不需要通过jar包的形式上传运行了。如果能实现,第一步我应该怎么去做。
>期待各位的回信,感谢。
>
>
>
>
>-- 原始邮件 --
>发件人: "我自己的邮箱"<1010467...@qq.com>;
>发送时间: 2019年3月29日(星期五) 下午2:19
>收件人: "user-zh";
>
>主题: 回复: 远程提交代码到Flink集群
>
>
>
>谢谢各位的解答,我试试。
>
>
>
>
>-- 原始邮件 --
>发件人: "Lifei Chen";
>发送时间: 2019年3月29日(星期五) 中午11:10
>收件人: "user-zh";
>
>主题: Re: 远程提交代码到Flink集群
>
>
>
>有一个小巧的go cli, 支持直接部署jar包到flink manager上。
>
>https://github.com/ing-bank/flink-deployer
>
>希望能帮到你!
>
>Kaibo Zhou  于2019年3月29日周五 上午11:08写道:
>
>> 可以用 flink 提供的 Restful API 接口,upload 上传 jar 包然后 run。
>>
>> 参考:
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-upload
>> 和 https://files.alicdn.com/tpsservice/a8d224d6a3b8b82d03aa84e370c008cc.pdf
>> 文档的介绍
>>
>> 文报 <1010467...@qq.com> 于2019年3月28日周四 下午9:06写道:
>>
>> > 各位好!
>> >
>> >
>> 请教一下各位,在本地开发完代码后,怎么样可以将编写好的代码直接提交到Flink集群上运行?(想做Flink任务的自动化,避免每次开发完成后都需要打jar包提交到web页面上)
>>


Re: 远程提交代码到Flink集群

2019-04-02 Thread Biao Liu
Hi, 由于你提供的细节并不多,无法很好地理解你的需求
你的作业中没有自定义代码 (例如 java/scala 代码) 吗?如果有的话,就必须上传 jar,Flink 接受的是编译后的字节码,并不提供编译功能

PS:我理解"自动化"和"上传 jar"并没有直接联系

文报 <1010467...@qq.com> 于2019年4月2日周二 下午2:40写道:

> 谢谢各位的回复。
>
>  
> 我通过将代码推到git上,利用jenkins打包生成jar文件,然后调用shell脚本完成了简单的自动化。今天看见了JobManager是可以直接接受JobGraph,那么我能不能在代码中直接获取到自己代码生成的JobGraph,提交到JobManager上,这样就不需要通过jar包的形式上传运行了。如果能实现,第一步我应该怎么去做。
> 期待各位的回信,感谢。
>
>
>
>
> -- 原始邮件 --
> 发件人: "我自己的邮箱"<1010467...@qq.com>;
> 发送时间: 2019年3月29日(星期五) 下午2:19
> 收件人: "user-zh";
>
> 主题: 回复: 远程提交代码到Flink集群
>
>
>
> 谢谢各位的解答,我试试。
>
>
>
>
> -- 原始邮件 --
> 发件人: "Lifei Chen";
> 发送时间: 2019年3月29日(星期五) 中午11:10
> 收件人: "user-zh";
>
> 主题: Re: 远程提交代码到Flink集群
>
>
>
> 有一个小巧的go cli, 支持直接部署jar包到flink manager上。
>
> https://github.com/ing-bank/flink-deployer
>
> 希望能帮到你!
>
> Kaibo Zhou  于2019年3月29日周五 上午11:08写道:
>
> > 可以用 flink 提供的 Restful API 接口,upload 上传 jar 包然后 run。
> >
> > 参考:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-upload
> > 和
> https://files.alicdn.com/tpsservice/a8d224d6a3b8b82d03aa84e370c008cc.pdf
> > 文档的介绍
> >
> > 文报 <1010467...@qq.com> 于2019年3月28日周四 下午9:06写道:
> >
> > > 各位好!
> > >
> > >
> >
> 请教一下各位,在本地开发完代码后,怎么样可以将编写好的代码直接提交到Flink集群上运行?(想做Flink任务的自动化,避免每次开发完成后都需要打jar包提交到web页面上)
> >


Metrics for received records per TaskManager

2019-04-02 Thread Benjamin Burkhardt
Hi all,

I’m looking for a metric which allows me keeping track of the records or bytes 
each TaskManager has received or processed for the current task.

Can anyone help me getting this?

Thanks.

Benjamin


??????????: ????????

2019-04-02 Thread 492341344

??blink 
sql
??datastreamrichFunctionopen??(,)??processElement??open
sql




--  --
??: "??";
: 2019??4??2??(??) 2:34
??: "user-zh@flink.apache.org";

: : 





??: 492341344
: 2019??4??2?? 10:06
??: user-zh
: 



?????? ??????????????Flink????

2019-04-02 Thread ????

   
gitjenkinsjar??shell??JobManager??JobGraph??JobGraphJobManagerjar??
??




--  --
??: ""<1010467...@qq.com>;
: 2019??3??29??(??) 2:19
??: "user-zh";

: ?? ??Flink








--  --
??: "Lifei Chen";
: 2019??3??29??(??) 11:10
??: "user-zh";

: Re: ??Flink



go cli, jarflink manager

https://github.com/ing-bank/flink-deployer

??

Kaibo Zhou  ??2019??3??29?? 11:08??

> ?? flink ?? Restful API ??upload  jar ?? run??
>
> ??
>
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-upload
> ?? https://files.alicdn.com/tpsservice/a8d224d6a3b8b82d03aa84e370c008cc.pdf
> ??
>
>  <1010467...@qq.com> ??2019??3??28?? 9:06??
>
> > 
> >
> >
> Flink???(Flinkjarweb??)
>

答复: 批流结合

2019-04-02 Thread 戴嘉诚
是什么样的离线数据?要如何累加到实时流?

发件人: 492341344
发送时间: 2019年4月2日 10:06
收件人: user-zh
主题: 批流结合

各位好,项目中有一批历史离线的统计数据,需要累加到实时流的统计中。请问有什么好的方案吗?



Re: 方案询问

2019-04-02 Thread Paul Lam
Hi,

推荐可以维护两个 MapState 分别缓存尚未匹配的两种订单。一条订单数据进来首先查找另一种订单的 MapState,若找到则输出合并的数据并删除对应的 
entry,否则放入所属订单类型的 MapState。

Best,
Paul Lam

> 在 2019年4月2日,13:46,1900 <575209...@qq.com> 写道:
> 
> 现在有个需求,从kafka接收订单信息,每条订单信息有1-2条数据(一般第一条是订单初始状态数据,第二条是订单终态数据);时间间隔不等(一般5秒以内),
> 如何能将数据进行合并,最终合并成一条数据?
> 
> 
> 现在有一个考虑,根据订单号keyby分组后处理,这样的话是不是开启的窗口太多了?



Re:方案询问

2019-04-02 Thread Yuan Yifan
keyby不会“开启的窗口太多”,而是会产生较多的状态。








在 2019-04-02 13:46:48,"1900" <575209...@qq.com> 写道:
>现在有个需求,从kafka接收订单信息,每条订单信息有1-2条数据(一般第一条是订单初始状态数据,第二条是订单终态数据);时间间隔不等(一般5秒以内),
>如何能将数据进行合并,最终合并成一条数据?
>
>
>现在有一个考虑,根据订单号keyby分组后处理,这样的话是不是开启的窗口太多了?


Re: HA切换

2019-04-02 Thread Biao Liu
Hi wuzhixin,
HA 切换时会重启 job,Flink 社区版目前的实现是这样的
可以了解下 Blink, 我们在社区版基础上优化了 master failover 的策略,可以避免重启 job

马 敬源  于2019年4月2日周二 上午9:45写道:

> Hi,wuzhixin:
> 尝试改一下flink-conf.yaml 这个配置:
>
> jobmanager.execution.failover-strategy: individual
>
>
> 来自 Outlook
>
> 
> 发件人: wuzhixin 
> 发送时间: 2019年4月1日 16:37
> 收件人: user-zh@flink.apache.org
> 主题: HA切换
>
> Hi all:
>
> 今天我们standalone的集群,使用zookeeper做了HA机制,但是今天因为zookeeper的一些原因,来了一次HA切换,然后
> 我们发现所有的job都重启了,请问这是标准处理么?
> flink的这种机制是不是不太好
>