Re: Null result cannot be used for atomic types

2020-01-09 Thread godfrey he
hi sunfulin,

which flink version are you using ?

best,
godfrey

sunfulin  于2020年1月10日周五 下午1:50写道:

> Hi, I am running a Flink app while reading Kafka records with JSON format.
> And the connect code is like the following:
>
>
> tableEnv.connect(
>
> new Kafka()
>
> .version(kafkaInstance.getVersion())
>
> .topic(chooseKafkaTopic(initPack.clusterMode))
>
> .property("bootstrap.servers",
> kafkaInstance.getBrokerList())
>
> .property("group.id", initPack.jobName)
>
> .startFromEarliest()
>
> ).withSchema(
>
> new Schema()
>
> // EVENT_TIME
>
> .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
>
> new Rowtime()
>
> .timestampsFromField("time")
>
> .watermarksPeriodicBounded(1000)
>
> )
>
> .field("type", Types.STRING)
>
> .field("event", Types.STRING)
>
> .field("user_id", Types.STRING)
>
> .field("distinct_id", Types.STRING)
>
> .field("project", Types.STRING)
>
> .field("recv_time", Types.SQL_TIMESTAMP)
>
> .field("properties", Types.ROW_NAMED(
>
> new String[] { "BROWSER_VERSION", "pathname",
> "search", "eventType", "message", "stack", "componentStack" },
>
> Types.STRING, Types.STRING, Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.STRING)
>
> )
>
> ).withFormat(
>
> new Json().failOnMissingField(false)
>
> .deriveSchema()
>
> )
>
> .inAppendMode()
>
> .registerTableSource(getTableName());
>
>
>
> However, the application throws the following Exception which really
> confused me. From the code above, the field types are only *Types.STRING*
> or *Types.SQL_TIMESTAMP. *
>
> *Not sure which data field can run to this. Wanner some help from
> community.*
>
>
> Caused by: java.lang.NullPointerException: Null result cannot be used for
> atomic types.
>
>  at DataStreamSinkConversion$5.map(Unknown Source)
>
>  at
> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)
>
>  at
> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)
>
>  at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>
>  at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>
>  at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>
>  at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>
>  at DataStreamSourceConversion$2.processElement(Unknown Source)
>
>  at
> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
>
>  at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>
>  at org.apache.flink.streaming.
>
>
>
>
>


Null result cannot be used for atomic types

2020-01-09 Thread sunfulin
Hi, I am running a Flink app while reading Kafka records with JSON format. And 
the connect code is like the following: 




tableEnv.connect(

new Kafka()

.version(kafkaInstance.getVersion())

.topic(chooseKafkaTopic(initPack.clusterMode))

.property("bootstrap.servers", kafkaInstance.getBrokerList())

.property("group.id", initPack.jobName)

.startFromEarliest()  

).withSchema(

new Schema()

// EVENT_TIME 

.field("rowtime", Types.SQL_TIMESTAMP).rowtime(

new Rowtime()

.timestampsFromField("time")

.watermarksPeriodicBounded(1000)

)

.field("type", Types.STRING)

.field("event", Types.STRING)

.field("user_id", Types.STRING)

.field("distinct_id", Types.STRING)

.field("project", Types.STRING)

.field("recv_time", Types.SQL_TIMESTAMP)

.field("properties", Types.ROW_NAMED(

new String[] { "BROWSER_VERSION", "pathname", "search", 
"eventType", "message", "stack", "componentStack" },

Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING, Types.STRING, Types.STRING)

)

).withFormat(

new Json().failOnMissingField(false)

.deriveSchema()

)

.inAppendMode()

.registerTableSource(getTableName());







However, the application throws the following Exception which really confused 
me. From the code above, the field types are only Types.STRING or 
Types.SQL_TIMESTAMP. 

Not sure which data field can run to this. Wanner some help from community.




Caused by: java.lang.NullPointerException: Null result cannot be used for 
atomic types.

 at DataStreamSinkConversion$5.map(Unknown Source)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)

 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

 at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

 at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

 at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

 at DataStreamSourceConversion$2.processElement(Unknown Source)

 at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)

 at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.




Re:回复:flink消费Kafka没有数据问题

2020-01-09 Thread sunfulin
感谢回复,排查后确实是hostname的配置问题。


任务还遇到了另外一个问题。下面是读取的Kafka连接配置,使用JSON SCHEMA来解析。不过实际运行时却抛出了如下异常,请问有大神知道是啥原因么?

Caused by: java.lang.NullPointerException: Null result cannot be used for 
atomic types.

 at DataStreamSinkConversion$5.map(Unknown Source)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)

 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

 at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

 at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

 at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

 at DataStreamSourceConversion$2.processElement(Unknown Source)

 at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)

 at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.






tableEnv.connect(

new Kafka()

.version(kafkaInstance.getVersion())

.topic(chooseKafkaTopic(initPack.clusterMode))

.property("bootstrap.servers", kafkaInstance.getBrokerList())

.property("group.id", initPack.jobName)

.startFromEarliest()   // 测试用,上生产可以去掉

).withSchema(

new Schema()

// 时间戳字段

.field("rowtime", Types.SQL_TIMESTAMP).rowtime(

new Rowtime()

.timestampsFromField("time")

.watermarksPeriodicBounded(1000)

)

.field("type", Types.STRING)

.field("event", Types.STRING)

.field("user_id", Types.STRING)

.field("distinct_id", Types.STRING)

.field("project", Types.STRING)

.field("recv_time", Types.SQL_TIMESTAMP)

.field("properties", Types.ROW_NAMED(

new String[] { "BROWSER_VERSION", "pathname", "search", 
"eventType", "message", "stack", "componentStack" },

Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING, Types.STRING, Types.STRING)

)

).withFormat(

new Json().failOnMissingField(false)

.deriveSchema()

)

.inAppendMode()

.registerTableSource(getTableName());












在 2020-01-10 09:53:52,"Evan"  写道:
>第一,查看神策的kafka的配置项advertised.host.name
>
>
>
>
>--原始邮件--
>发件人:"sunfulin"发送时间:2020年1月10日(星期五) 上午9:51
>收件人:"user-zh@flink.apache.org"
>主题:flink消费Kafka没有数据问题
>
>
>
>我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka
> console consumer能看到源源不断的数据。
>本身flink作业job里没有任何报错,想问下大家这种情况该如何排查?


Custom File Sink using EventTime and defined custom file name for parquet file

2020-01-09 Thread David Magalhães
Hi, I'm working for the first time with Flink and I'm trying to create
solution that will store events from Kafka into Parquet files in S3. This
also should support re-injection of events from Parquet files into a Kafka
topic.

Here 
is the code with a simple usage of StreamingFileSink with BulkEncode that
will get the events and store in parquet files. The files will be partition
by account_id and year and month (MM). The issue with this approach is
when running the backfill from a certain point in time, it will be hard to
not generate duplicated events, since we will not override the same files,
as the filename is generate by "*part--*".

To add predictability, I've used a tumbling window to aggregate multiple
GenericRecord, in order to write the parquet file with a list of them. For
that I've created a custom file sink, but I'm not sure of the properties I
am going to lose compared to the Streaming File Sink. Here
 is
the code. Still, there is something missing in this solution to close a
window for with a giving timeout, so it can write into the sink the last
events if no more events are sent.

Another work around, would be create a StreamingFileSink with a RowEncoder,
and receive a List of GenericRecord, and create a custom Encoder with
*AvroParquetWritter* to write to a File. This way I have access to a custom
rolling policy. But this looks like truly inefficient. Here
 is
the code.

Am I overthinking this solution ? I'm know there are some issues (recently
closed) for the StreamingFileSink to support more custom rolling policies
in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027, but
I just notice that now.



Re: Elasticsink sometimes gives NoClassDefFoundError

2020-01-09 Thread Jayant Ameta
Also, the ES version I'm using is 5.6.7

Jayant


On Thu, Jan 9, 2020 at 10:39 AM Jayant Ameta  wrote:

> Hi,
> The elastic connector is packaged in the uber jar that is submitted. There
> is only 1 version of the connector:
> flink-connector-elasticsearch5_2.11:1.7.1
> I'm using Flink 1.7.1
>
> I couldn't figure out whether this error causes the job to fail, or
> whether I see this error when the job is restarting after some other
> failure.
> But, the occurrence of this error and job restarts is correlated.
>
>
> Jayant Ameta
>
>
> On Wed, Jan 8, 2020 at 6:47 PM Arvid Heise  wrote:
>
>> Hi Jayant,
>>
>> if you only see it sometimes that indicates that you have it in two
>> different versions of the connectors where class loader order is
>> non-deterministic. Could you post the classpath?
>>
>> Btw, it's always good to add which Flink version you use.
>>
>> Best,
>>
>> Arvid
>>
>> On Wed, Jan 8, 2020 at 12:20 PM Jayant Ameta 
>> wrote:
>>
>>> Hi,
>>> I see the following error sometimes on my flink job, even though the
>>> class is present in my uber jar.
>>>
>>> java.lang.NoClassDefFoundError:
>>> org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1
>>> at
>>> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.connect(NioClientSocketPipelineSink.java:111)
>>> ... 17 common frames omitted Wrapped by:
>>> org.elasticsearch.ElasticsearchException: java.lang.NoClassDefFoundError:
>>> org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1
>>> at
>>> org.elasticsearch.transport.netty3.Netty3Transport.exceptionCaught(Netty3Transport.java:325)
>>> at
>>> org.elasticsearch.transport.netty3.Netty3MessageChannelHandler.exceptionCaught(Netty3MessageChannelHandler.java:83)
>>> at
>>> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
>>> at
>>> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>>> at
>>> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
>>> ... 23 frames truncated
>>>
>>>
>>> Jayant
>>>
>>


Re: 关于flink集群中调用dubbo服务的咨询

2020-01-09 Thread Leonard Xu
Hi, 依辰

对dubbo不太熟,你邮件里的图片挂了,可以通过图床工具发送下图片链接。

Best,
Leonard

> 在 2020年1月10日,11:12,依辰 <431737...@qq.com> 写道:
> 
> Hi All,
> 目前我这里有个需求是想通过flink集群,消费mq消息,然后调用发送push的dubbo服务,实现push分发的功能。
> 网上关于flink接入spring框架的资料没有找到太多,更别提dubbo服务调用相关的了(也可能是我查询资料的方式有限)。
> 
> 期望哪位有经验的朋友能够给予指点,我自己其实已经实现了一个测试demo,但还是期望能有更多相关资料作为参考,主要是担心资源消耗和性能、安全方面会发生问题,因为本身也是刚接触flink,了解有限。
> 方便的话可以提供一些实例代码或资料链接。
>   
> 感谢flink社区的各位朋友
> 
> ps:下图是当前的实现方式,总觉得太简陋了,还缺少close时对spring资源的释放操作
> 
> 



Re: flink savepoint checkpoint

2020-01-09 Thread zhisheng
hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint
的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。

祝好!
zhisheng

Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道:

> Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
> -->
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
> lucas.wu  于2019年12月11日周三 上午11:56写道:
>
> > hi 各位:
> >
> >
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
>


Re: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式

2020-01-09 Thread tison
cc Yadong

帮你抄送了 WebUI 重构的 manager

Best,
tison.


 于2020年1月10日周五 上午11:26写道:

> 1.9的前端ui不是相比于1.8重构了吗,官网的visualizer页面还是1.8的老样式
>
> -邮件原件-
> 发件人: tison 
> 发送时间: 2020年1月8日 13:14
> 收件人: user-zh 
> 主题: Re: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式
>
> 请问你所说的 1.9 的样式是怎么样的呢?我记得最近有跟 visualizer 相关的讨论,但是没有这个特殊的 issue,你可以直接在 JIRA
> 上提 issue
>
> Best,
> tison.
>
>
>  于2020年1月8日周三 下午12:56写道:
>
> > 有大佬能解答下吗
> >
> > -邮件原件-
> > 发件人: slle...@aliyun.com.INVALID 
> > 发送时间: 2020年1月6日 11:15
> > 收件人: user-zh@flink.apache.org
> > 主题: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式
> >
> > 链接地址:https://flink.apache.org/visualizer/index.html
> >
> >
>


回复: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式

2020-01-09 Thread sllence
1.9的前端ui不是相比于1.8重构了吗,官网的visualizer页面还是1.8的老样式

-邮件原件-
发件人: tison  
发送时间: 2020年1月8日 13:14
收件人: user-zh 
主题: Re: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式

请问你所说的 1.9 的样式是怎么样的呢?我记得最近有跟 visualizer 相关的讨论,但是没有这个特殊的 issue,你可以直接在 JIRA
上提 issue

Best,
tison.


 于2020年1月8日周三 下午12:56写道:

> 有大佬能解答下吗
>
> -邮件原件-
> 发件人: slle...@aliyun.com.INVALID 
> 发送时间: 2020年1月6日 11:15
> 收件人: user-zh@flink.apache.org
> 主题: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式
>
> 链接地址:https://flink.apache.org/visualizer/index.html
>
>


????flink??????????dubbo??????????

2020-01-09 Thread ????
Hi All,
  
??flink??mq??push??dubbo??push
  
flinkspring??dubbo()??
  
??demo??flink
  
   
   
   
  
flink??


ps:closespring??

Re: How to assign a UID to a KeyedStream?

2020-01-09 Thread Zhu Zhu
Hi Ken,

This is actually a bug that a Partition should not require a UID. It is
fixed in 1.9.2 and 1.10. see FLINK-14910
.

Thanks,
Zhu Zhu

Ken Krugler  于2020年1月10日周五 上午7:51写道:

> Hi all,
>
> [Of course, right after hitting send I realized I could just do
> rides.getTransformation().setUid(“blah”), ditto for the fares stream. Might
> be something to add to the docs, or provide a .uid() method on KeyedStreams
> for syntactic sugar]
>
> Just for grins, I disabled auto-generated UIDs for the taxi rides/fares
> state example in the online tutorial.
>
> env.getConfig().disableAutoGeneratedUIDs();
>
> I then added UIDs for all operators, sources & sinks. But I still get the
> following when calling env.getExecutionPlan() or env.execute():
>
> java.lang.IllegalStateException: Auto generated UIDs have been disabled
> but no UID or hash has been assigned to operator Partition
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:297)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(StreamGraphGenerator.java:682)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:252)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1529)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1564)
> at com.citi.flink.RidesAndFaresTool.main(RidesAndFaresTool.java:63)
>
> The simple workflow is:
>
> DataStream rides = env
> .addSource(new CheckpointedTaxiRideSource(ridesFile,
> servingSpeedFactor))
> .uid("source: taxi rides")
> .name("taxi rides")
> .filter((TaxiRide ride) -> ride.isStart)
> .uid("filter: only start rides")
> .name("only start rides")
> .keyBy((TaxiRide ride) -> ride.rideId);
>
> DataStream fares = env
> .addSource(new CheckpointedTaxiFareSource(faresFile,
> servingSpeedFactor))
> .uid("source: taxi fares")
> .name("taxi fares")
> .keyBy((TaxiFare fare) -> fare.rideId);
>
> DataStreamSink> enriched = rides
> .connect(fares)
> .flatMap(new EnrichmentFunction())
> .uid("function: enrich rides with fares")
> .name("enrich rides with fares")
> .addSink(sink)
> .uid("sink: enriched taxi rides")
> .name("enriched taxi rides");
>
> Internally the exception is thrown when the EnrichFunction (a
> RichCoFlatMapFunction) is being transformed by
> StreamGraphGenerator.transformTwoInputTransform().
>
> This calls StreamGraphGenerator.transform() with the two inputs, but the
> Transformation for each input is a PartitionTransformation.
>
> I don’t see a way to set the UID following the keyBy(), as a KeyedStream
> creates the PartitionTransformation without a UID.
>
> Any insight into setting the UID properly here? Or should
> StreamGraphGenerator.transform() skip the no-uid check for
> PartitionTransformation, since that’s not an operator with state?
>
> Thanks,
>
> — Ken
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>


Re: flink savepoint checkpoint

2020-01-09 Thread Px New
Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
-->

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


lucas.wu  于2019年12月11日周三 上午11:56写道:

> hi 各位:
>
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。


Re: Flink Job claster scalability

2020-01-09 Thread Zhu Zhu
Hi KristoffSC,

Did you increase the parallelism of the vertex that has the largest
parallelism?
Or did you explicitly set tasks to be in different slot sharing group?
With the default slot sharing, the number of slots required/used equals to
the max parallelism of a JobVertex, which is 6 in your case.

KristoffSC  于2020年1月9日周四 下午9:26写道:

> Thank you David and Zhu Zhu,
> this helps a lot.
>
> I have follow up questions though.
>
> Having this
> /"Instead the Job must be stopped via a savepoint and restarted with a new
> parallelism"/
>
> and slot sharing [1] feature, I got the impression that if I would start my
> cluster with more than 6 task slots, Flink will try deploy tasks across all
> resources, trying to use all available resources during job submission
>
> I did a two tests having my original task.
> 1. I started a Job Cluster with 7 task slots (7 task manager since in this
> case 1 task manager has one task slot).
> 2. I started a Session cluster with 28 task slots in total. In this case I
> had 7 task managers, 4 task slot each.
>
> For case 1, I use "FLINK_JOB" variable as stated in [2]. For case 2, I
> submitted my job from UI after Flink started to be operative.
>
>
> For both cases it used only 6 task slots, so it was still reusing task
> slots. I got the impression that it will try to use as much available
> resources as it can.
>
> What do you think about this?
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
> [2]
>
> https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
>
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


[Question] Failed to submit flink job to secure yarn cluster

2020-01-09 Thread Ethan Li
Hello

I was following  
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/yarn_setup.html#run-a-flink-job-on-yarn
 

 and trying to submit a flink job on yarn. 

I downloaded flink-1.9.1 and pre-bundled Hadoop 2.8.3 from 
https://flink.apache.org/downloads.html#apache-flink-191 
. I used default 
configs except:

security.kerberos.login.keytab: userA.keytab
security.kerberos.login.principal: userA@REALM


I have a secure Yarn cluster set up already. Then I ran “ ./bin/flink run -m 
yarn-cluster -p 1 -yjm 1024m -ytm 1024m ./examples/streaming/WordCount.jar” and 
got the following errors:


org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy 
Yarn session cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:385)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:251)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit 
application_1578605412668_0005 to YARN : Failed to renew token: Kind: kms-dt, 
Service: host3.com:3456, Ident: (owner=userA, renewer=adminB, realUser=, 
issueDate=1578606224956, maxDate=1579211024956, sequenceNumber=32, 
masterKeyId=52)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:275)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1004)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:507)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:378)
... 9 more


Full client 
log:https://gist.github.com/Ethanlm/221284bcaa272270a799957dc05b94fd 

Resource manager log: 
https://gist.github.com/Ethanlm/ecd0a3eb25582ad6b1552927fc0e5c47 

Hostname, IP address, username and etc. are anonymized.


Not sure how to proceed further. Wondering if anyone in the community has 
encountered this before. Thank you very much for your time!

Best,
Ethan



Please suggest helpful tools

2020-01-09 Thread Eva Eva
Hi,

I'm running Flink job on 1.9 version with blink planner.

My checkpoints are timing out intermittently, but as state grows they are
timing out more and more often eventually killing the job.

Size of the state is large with Minimum=10.2MB and Maximum=49GB (this one
is accumulated due to prior failed ones), Average=8.44GB.

Although size is huge, I have enough space on EC2 instance in which I'm
running job. I'm using RocksDB for checkpointing.

*Logs does not have any useful information to understand why checkpoints
are expiring/failing, can someone please point me to tools that can be used
to investigate and understand why checkpoints are failing.*

Also any other related suggestions are welcome.


Thanks,
Reva.


Re: checkpoint、state

2020-01-09 Thread Px New
Yes, CheckPoint 是一个封装后的Meta信息 而这个被封装的Meta信息是有所有Operator 的state 的组成了

hahaha sc  于2019年11月29日周五 下午4:12写道:

>
> flink的每条数据既然都做了checkpoint,做成全局分布式一致性快照,那还需要本地state干啥呢?是否可以理解成,本地state是一致性快照的一部分而已?
>   昨天看了 社区的直播回放,听PMC的介绍,好像不是一回事。
>


??????flink????Kafka????????????

2020-01-09 Thread ZhangChangjun
sourcekafka



------
??: "sunfulin"https://manual.sensorsdata.cn/sa/latest/page-1573828.html??FlinkKafka??kafka
 console consumer??
flinkjob??

flink消费Kafka没有数据问题

2020-01-09 Thread sunfulin
我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka
 console consumer能看到源源不断的数据。
本身flink作业job里没有任何报错,想问下大家这种情况该如何排查?

Re: flink遇到 valueState 自身的 NPE

2020-01-09 Thread Kevin Liao
谢谢,同定位到这个地方

已改成

```

Boolean existed = uniqMark.value();
// 如果是重复出现则被过滤掉
if (null != existed && existed) {
  return null;
}
uniqMark.update(true);

```

问题解决

Yun Tang  于2020年1月10日周五 上午3:05写道:

> Hi Kevin
>
> 帮你看了下代码,问题定位到了。
> 容易发生的原因在于你的可见性配置成了 
> ReturnExpiredIfNotCleanedUp,而Flink会先发现数据expire,然后触发clear操作,之后再返回expire的数据
> [1]。建议修改一下第39行的判断条件,每次取数据时都做是否为null的判断。
>
>
> [1]
> https://github.com/apache/flink/blob/dba5b9e0138b667c3ecd32f7b16645d531477720/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java#L96
>
> 祝好
> 唐云
>
> --
> *From:* Kevin Liao 
> *Sent:* Friday, January 10, 2020 1:08
> *To:* Yun Tang 
> *Cc:* user-zh@flink.apache.org 
> *Subject:* Re: flink遇到 valueState 自身的 NPE
>
> 谢答,首先贴的代码确实是运行的程序
>
> 此外刚刚又通过打印 log 确认了 uniqMark == null 是 false
>
> 我现在的怀疑点是这个地方
>
> if (null != uniqMark.value() && uniqMark.value()) {
>
> 第一处uniqMark.value()可以取到结果(不为 null),同时由于 ttl 策略会触发
> clear,进而导致第二个uniqMark.value()取出来的就是 null
> 了(看来编译器没有对这种写法做优化,就是真实的执行了两次value()函数),追了下代码似乎可以支持我的猜测,所以准备明天验证下这个猜测
>
> 明天有消息同步,谢谢
>
> Yun Tang  于2020年1月10日周五 上午12:59写道:
>
> > Hi Kevin
> >
> > State TTL 是清理的state中的数据条目entry,不是清理state在map函数中的对象本身。所以无论如何,作为value
> > state对象的uniqMark 是不会因为TTL而变成null的。
> >
> >
> >
> 我注意到你的作业即使发生failover之后,立刻恢复的时候,仍然遇到了这个NPE问题,我怀疑你实际运行的代码第39行并不是你贴出来的代码,很有可能是对应你的代码的第34行,也就是map方法的输入RDLog是null,这也符合作业成功restore之后,又再次立即遇到failover的场景,也就是处理到了非法“脏数据”,导致作业不断failover。建议你按照这个思路确认排除一下。
> >
> > 祝好
> > 唐云
> > --
> > *From:* Kevin Liao 
> > *Sent:* Thursday, January 9, 2020 23:17
> > *To:* user-zh@flink.apache.org 
> > *Subject:* Re: flink遇到 valueState 自身的 NPE
> >
> > 谢答
> > 但还有一问题想请教:
> >
> > 当 valueState 触发 ttl 被回收后,这里的引用应该也会被 gc 掉,就会变成 null 了吧?难道是 operator 原来处理这个
> > key 的线程会被一并回收掉,下次这个 key 再来时其实已经是另外新生成的线程提供服务了(这样肯定要重新调用一次 open
> > 方法)?看了代码但还没搞明白,求解惑,谢谢
> >
> > Benchao Li  于2020年1月9日周四 下午8:59写道:
> >
> > > 我感觉这个地方好像没有道理会有`uniqMark`变成`null`,
> > > 除非是什么地方反序列化出来`StreamMap`,并且没有调用`StreamMap.open()`.
> > > 但是看起来`StreamTask`是可以保证先调用`open`,再调用operator的处理函数的。我也看不出来这个地方有什么问题。
> > >
> > > Kevin Liao  于2020年1月9日周四 下午8:15写道:
> > >
> > > > https://tva4.sinaimg.cn/large/63137227ly1gaqkn1nlykj20mm0wvgq8.jpg
> > > >
> > > > 抱歉,再试试这个
> > > >
> > > > Benchao Li  于2020年1月9日周四 下午8:13写道:
> > > >
> > > > > 我这边点开是 403 Forbidden
> > > > >
> > > > > Kevin Liao  于2020年1月9日周四 下午8:09写道:
> > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://gm1.ggpht.com/FZGtbLggyPPZ_BoU0gt2SQTv7fyhNOKu03ZjsKq7G6DtqWJ5DY0NmL-2s64P-LUzedbTm8DE_FeggNtPAb4VEmypAoPfW8VFSFxOWxMGBvMi5G6xHoZ3THKPYHnAj8KydQ02OjvV-R3IEwBvwIDnZRmwCv3ohyjPF76gbvOOYrKzgaLb_pykWsQDpvROHr3lgU2rezH33Jt3xJEOjXGjHsUFUxiil0PYkQFdA0BP77lypYQLw4RL8BxMz3HfaCiNAGb_q5w8JNmckHLU3g9EuPgtqj6WP3XDv07PBuCXMvmfNcFbAciMeJuOOeE8VBqDCacjuiDtJzVrK1boxcBnzFvT_QazOwaJ27SSuJ_u5KCerTURen2vLBF1RN-x9eOVz9wg6w1oXyMAF7LMjGsYsVzUu3It5AyzLkm-_znosNtAJp2AW_qGmGo-k02fcrMjUoELiGvqn6W1kScnFI4gNWi_dpZe0Uoq1zF2m1crww1oNGOeRjFlCK_-iC19CPfsTVCtwN3tdKnaKdLe2TbfVdFA0DnBUz8NrhV-mvmZlEwi9-ngK-WOy8yjA4fin1zaE2SJCf2zfBSZwGR2eY_E_WZQiFRmSBI2A7vpoyFvTV3E99MIi0MC5PUAeRiu4v4JIVDkV_yUGIUvoa7pxdf7LpZN_DbikQVk7yES8kxxL5qG2Eae8vftWJuBVi5mWTxvElBgInyUntobXHdxfb2YR4JdBgVPN5QionJiIc9g5i0ClGECZbyHPbsQy4pEVw=s0-l75-ft-l75-ft
> > > > > >
> > > > > > 谢谢,看看能否看见
> > > > > >
> > > > > > Benchao Li  于2020年1月9日周四 下午8:07写道:
> > > > > >
> > > > > > > hi Kevin,
> > > > > > >
> > > > > > > 邮件里面贴不了图片,如果要贴图片,需要用一些第三方的图床工具。
> > > > > > > 或者你可以直接贴文字?
> > > > > > >
> > > > > > > Kevin Liao  于2020年1月9日周四 下午7:10写道:
> > > > > > >
> > > > > > > > [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]
> > > > > > > >
> > > > > > > > Benchao Li  于2020年1月9日周四 下午6:42写道:
> > > > > > > >
> > > > > > > >> hi Kevin,
> > > > > > > >>
> > > > > > > >> 能贴一下MyMapFunction2.java:39 这里的代码吗?
> > 从上面的日志看不出来是valueState是null呢。
> > > > > > > >>
> > > > > > > >> Kevin Liao  于2020年1月9日周四 下午5:57写道:
> > > > > > > >>
> > > > > > > >> > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
> > > > > > > >> >
> > > > > > > >> > ```
> > > > > > > >> > 2020-01-09 05:14:04.087
> > > [flink-akka.actor.default-dispatcher-28]
> > > > > > INFO
> > > > > > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  -
> > Map
> > > > ->
> > > > > > > >> Filter ->
> > > > > > > >> > Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b)
> > > switched
> > > > > from
> > > > > > > >> > RUNNING to FAILED.
> > > > > > > >> > java.lang.NullPointerException: null
> > > > > > > >> > at
> > com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
> > > > > > > >> > at
> > com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
> > > > > > > >> > at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> > > > > > > >> > at
> > > > > > > >> > org.apache.flink.streaming.runtime.io
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > >
> > >
> 

How to assign a UID to a KeyedStream?

2020-01-09 Thread Ken Krugler
Hi all,

[Of course, right after hitting send I realized I could just do 
rides.getTransformation().setUid(“blah”), ditto for the fares stream. Might be 
something to add to the docs, or provide a .uid() method on KeyedStreams for 
syntactic sugar]

Just for grins, I disabled auto-generated UIDs for the taxi rides/fares state 
example in the online tutorial. 

env.getConfig().disableAutoGeneratedUIDs();

I then added UIDs for all operators, sources & sinks. But I still get the 
following when calling env.getExecutionPlan() or env.execute():

java.lang.IllegalStateException: Auto generated UIDs have been disabled but no 
UID or hash has been assigned to operator Partition
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:297)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(StreamGraphGenerator.java:682)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:252)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1529)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1564)
at com.citi.flink.RidesAndFaresTool.main(RidesAndFaresTool.java:63)

The simple workflow is:

DataStream rides = env
.addSource(new CheckpointedTaxiRideSource(ridesFile, 
servingSpeedFactor))
.uid("source: taxi rides")
.name("taxi rides")
.filter((TaxiRide ride) -> ride.isStart)
.uid("filter: only start rides")
.name("only start rides")
.keyBy((TaxiRide ride) -> ride.rideId);

DataStream fares = env
.addSource(new CheckpointedTaxiFareSource(faresFile, 
servingSpeedFactor))
.uid("source: taxi fares")
.name("taxi fares")
.keyBy((TaxiFare fare) -> fare.rideId);

DataStreamSink> enriched = rides
.connect(fares)
.flatMap(new EnrichmentFunction())
.uid("function: enrich rides with fares")
.name("enrich rides with fares")
.addSink(sink)
.uid("sink: enriched taxi rides")
.name("enriched taxi rides");

Internally the exception is thrown when the EnrichFunction (a 
RichCoFlatMapFunction) is being transformed by 
StreamGraphGenerator.transformTwoInputTransform().

This calls StreamGraphGenerator.transform() with the two inputs, but the 
Transformation for each input is a PartitionTransformation.

I don’t see a way to set the UID following the keyBy(), as a KeyedStream 
creates the PartitionTransformation without a UID.

Any insight into setting the UID properly here? Or should 
StreamGraphGenerator.transform() skip the no-uid check for 
PartitionTransformation, since that’s not an operator with state?

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com 
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



How to assign a UID to a KeyedStream?

2020-01-09 Thread Ken Krugler
Hi all,

Just for grins, I disabled auto-generated UIDs for the taxi rides/fares state 
example in the online tutorial. 

env.getConfig().disableAutoGeneratedUIDs();

I then added UIDs for all operators, sources & sinks. But I still get the 
following when calling env.getExecutionPlan() or env.execute():

java.lang.IllegalStateException: Auto generated UIDs have been disabled but no 
UID or hash has been assigned to operator Partition
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:297)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(StreamGraphGenerator.java:682)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:252)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1529)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1564)
at com.citi.flink.RidesAndFaresTool.main(RidesAndFaresTool.java:63)

The simple workflow is:

DataStream rides = env
.addSource(new CheckpointedTaxiRideSource(ridesFile, 
servingSpeedFactor))
.uid("source: taxi rides")
.name("taxi rides")
.filter((TaxiRide ride) -> ride.isStart)
.uid("filter: only start rides")
.name("only start rides")
.keyBy((TaxiRide ride) -> ride.rideId);

DataStream fares = env
.addSource(new CheckpointedTaxiFareSource(faresFile, 
servingSpeedFactor))
.uid("source: taxi fares")
.name("taxi fares")
.keyBy((TaxiFare fare) -> fare.rideId);

DataStreamSink> enriched = rides
.connect(fares)
.flatMap(new EnrichmentFunction())
.uid("function: enrich rides with fares")
.name("enrich rides with fares")
.addSink(sink)
.uid("sink: enriched taxi rides")
.name("enriched taxi rides");

Internally the exception is thrown when the EnrichFunction (a 
RichCoFlatMapFunction) is being transformed by 
StreamGraphGenerator.transformTwoInputTransform().

This calls StreamGraphGenerator.transform() with the two inputs, but the 
Transformation for each input is a PartitionTransformation.

I don’t see a way to set the UID following the keyBy(), as a KeyedStream 
creates the PartitionTransformation without a UID.

Any insight into setting the UID properly here? Or should 
StreamGraphGenerator.transform() skip the no-uid check for 
PartitionTransformation, since that’s not an operator with state?

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: flink遇到 valueState 自身的 NPE

2020-01-09 Thread Yun Tang
Hi Kevin

帮你看了下代码,问题定位到了。
容易发生的原因在于你的可见性配置成了 
ReturnExpiredIfNotCleanedUp,而Flink会先发现数据expire,然后触发clear操作,之后再返回expire的数据 
[1]。建议修改一下第39行的判断条件,每次取数据时都做是否为null的判断。


[1] 
https://github.com/apache/flink/blob/dba5b9e0138b667c3ecd32f7b16645d531477720/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java#L96

祝好
唐云


From: Kevin Liao 
Sent: Friday, January 10, 2020 1:08
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: Re: flink遇到 valueState 自身的 NPE

谢答,首先贴的代码确实是运行的程序

此外刚刚又通过打印 log 确认了 uniqMark == null 是 false

我现在的怀疑点是这个地方

if (null != uniqMark.value() && uniqMark.value()) {

第一处uniqMark.value()可以取到结果(不为 null),同时由于 ttl 策略会触发
clear,进而导致第二个uniqMark.value()取出来的就是 null
了(看来编译器没有对这种写法做优化,就是真实的执行了两次value()函数),追了下代码似乎可以支持我的猜测,所以准备明天验证下这个猜测

明天有消息同步,谢谢

Yun Tang  于2020年1月10日周五 上午12:59写道:

> Hi Kevin
>
> State TTL 是清理的state中的数据条目entry,不是清理state在map函数中的对象本身。所以无论如何,作为value
> state对象的uniqMark 是不会因为TTL而变成null的。
>
>
> 我注意到你的作业即使发生failover之后,立刻恢复的时候,仍然遇到了这个NPE问题,我怀疑你实际运行的代码第39行并不是你贴出来的代码,很有可能是对应你的代码的第34行,也就是map方法的输入RDLog是null,这也符合作业成功restore之后,又再次立即遇到failover的场景,也就是处理到了非法“脏数据”,导致作业不断failover。建议你按照这个思路确认排除一下。
>
> 祝好
> 唐云
> --
> *From:* Kevin Liao 
> *Sent:* Thursday, January 9, 2020 23:17
> *To:* user-zh@flink.apache.org 
> *Subject:* Re: flink遇到 valueState 自身的 NPE
>
> 谢答
> 但还有一问题想请教:
>
> 当 valueState 触发 ttl 被回收后,这里的引用应该也会被 gc 掉,就会变成 null 了吧?难道是 operator 原来处理这个
> key 的线程会被一并回收掉,下次这个 key 再来时其实已经是另外新生成的线程提供服务了(这样肯定要重新调用一次 open
> 方法)?看了代码但还没搞明白,求解惑,谢谢
>
> Benchao Li  于2020年1月9日周四 下午8:59写道:
>
> > 我感觉这个地方好像没有道理会有`uniqMark`变成`null`,
> > 除非是什么地方反序列化出来`StreamMap`,并且没有调用`StreamMap.open()`.
> > 但是看起来`StreamTask`是可以保证先调用`open`,再调用operator的处理函数的。我也看不出来这个地方有什么问题。
> >
> > Kevin Liao  于2020年1月9日周四 下午8:15写道:
> >
> > > https://tva4.sinaimg.cn/large/63137227ly1gaqkn1nlykj20mm0wvgq8.jpg
> > >
> > > 抱歉,再试试这个
> > >
> > > Benchao Li  于2020年1月9日周四 下午8:13写道:
> > >
> > > > 我这边点开是 403 Forbidden
> > > >
> > > > Kevin Liao  于2020年1月9日周四 下午8:09写道:
> > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://gm1.ggpht.com/FZGtbLggyPPZ_BoU0gt2SQTv7fyhNOKu03ZjsKq7G6DtqWJ5DY0NmL-2s64P-LUzedbTm8DE_FeggNtPAb4VEmypAoPfW8VFSFxOWxMGBvMi5G6xHoZ3THKPYHnAj8KydQ02OjvV-R3IEwBvwIDnZRmwCv3ohyjPF76gbvOOYrKzgaLb_pykWsQDpvROHr3lgU2rezH33Jt3xJEOjXGjHsUFUxiil0PYkQFdA0BP77lypYQLw4RL8BxMz3HfaCiNAGb_q5w8JNmckHLU3g9EuPgtqj6WP3XDv07PBuCXMvmfNcFbAciMeJuOOeE8VBqDCacjuiDtJzVrK1boxcBnzFvT_QazOwaJ27SSuJ_u5KCerTURen2vLBF1RN-x9eOVz9wg6w1oXyMAF7LMjGsYsVzUu3It5AyzLkm-_znosNtAJp2AW_qGmGo-k02fcrMjUoELiGvqn6W1kScnFI4gNWi_dpZe0Uoq1zF2m1crww1oNGOeRjFlCK_-iC19CPfsTVCtwN3tdKnaKdLe2TbfVdFA0DnBUz8NrhV-mvmZlEwi9-ngK-WOy8yjA4fin1zaE2SJCf2zfBSZwGR2eY_E_WZQiFRmSBI2A7vpoyFvTV3E99MIi0MC5PUAeRiu4v4JIVDkV_yUGIUvoa7pxdf7LpZN_DbikQVk7yES8kxxL5qG2Eae8vftWJuBVi5mWTxvElBgInyUntobXHdxfb2YR4JdBgVPN5QionJiIc9g5i0ClGECZbyHPbsQy4pEVw=s0-l75-ft-l75-ft
> > > > >
> > > > > 谢谢,看看能否看见
> > > > >
> > > > > Benchao Li  于2020年1月9日周四 下午8:07写道:
> > > > >
> > > > > > hi Kevin,
> > > > > >
> > > > > > 邮件里面贴不了图片,如果要贴图片,需要用一些第三方的图床工具。
> > > > > > 或者你可以直接贴文字?
> > > > > >
> > > > > > Kevin Liao  于2020年1月9日周四 下午7:10写道:
> > > > > >
> > > > > > > [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]
> > > > > > >
> > > > > > > Benchao Li  于2020年1月9日周四 下午6:42写道:
> > > > > > >
> > > > > > >> hi Kevin,
> > > > > > >>
> > > > > > >> 能贴一下MyMapFunction2.java:39 这里的代码吗?
> 从上面的日志看不出来是valueState是null呢。
> > > > > > >>
> > > > > > >> Kevin Liao  于2020年1月9日周四 下午5:57写道:
> > > > > > >>
> > > > > > >> > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
> > > > > > >> >
> > > > > > >> > ```
> > > > > > >> > 2020-01-09 05:14:04.087
> > [flink-akka.actor.default-dispatcher-28]
> > > > > INFO
> > > > > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  -
> Map
> > > ->
> > > > > > >> Filter ->
> > > > > > >> > Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b)
> > switched
> > > > from
> > > > > > >> > RUNNING to FAILED.
> > > > > > >> > java.lang.NullPointerException: null
> > > > > > >> > at
> com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
> > > > > > >> > at
> com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
> > > > > > >> > at
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> > > > > > >> > at
> > > > > > >> > org.apache.flink.streaming.runtime.io
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > >
> > .StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> > > > > > >> > at
> > > > > > >> > org.apache.flink.streaming.runtime.io
> > > > > > >> >
> > > > > >
> > > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> > > > > > >> > at
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> > > > > > >> > at
> > > > > > >> >
> > > > > > >> >
> > 

Re: How can I find out which key group belongs to which subtask

2020-01-09 Thread 杨东晓
Thanks Congxian!
 My purpose is not only make data goes into one same subtask but the
specific subtask which belongs to same taskmanager with upstream record.
The key idea is to avoid shuffling  between taskmanagers.
I think the KeyGroupRangeAssignment.java

explained a lot about how to get keygroup and subtask context that can make
that happen.
Do you know if there are still  serialization happening while data
transferred between operator in same taskmanager?
Thanks.

Congxian Qiu  于2020年1月9日周四 上午1:55写道:

> Hi
>
> If you just want to make sure some key goes into the same subtask, does
> custom key selector[1] help?
>
> For the keygroup and subtask information, you can ref to
> KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you
> can ref to doc[3]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions
> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism
>
> Best,
> Congxian
>
>
> 杨东晓  于2020年1月9日周四 上午7:47写道:
>
>> Hi , I'm trying to do some optimize about Flink 'keyby' processfunction.
>> Is there any possible I can find out one key belongs to which key-group
>> and essentially find out one key-group belongs to which subtask.
>> The motivation I want to know that is we want to  force the data records
>> from upstream still goes to same taskmanager downstream subtask .Which
>> means even if we use a keyedstream function we still want no cross jvm
>> communication happened during run time.
>> And if we can achieve that , can we also avoid the expensive cost for
>> record serialization because data is only transferred in same taskmanager
>> jvm instance?
>>
>> Thanks.
>>
>


Running Flink on java 11

2020-01-09 Thread KristoffSC
Hi guys,
well We have requirement in our project to use Java 11, although we would
really like to use Flink because it seems to match our needs perfectly. 

We were testing it on java 1.8 and all looks fine. 
We tried to run it on Java 11 and also looks fine, at least for now.

We were also running this as a Job Cluster, and since those images [1] are
based on openjdk:8-jre-alpine we switch to java 13-jdk-alpine. Cluster
started and submitted the job. All seemed fine.

The Job and 3rd party library that this job is using were compiled with Java
11.
I was looking for any posts related to java 11 issues and I've found this
[2] one. 
We are also aware of ongoing FLINK-10725 [3] but this is assigned to 1.10
FLink version

Having all of this, I would like to ask few questions

1. Is there any release date planed for 1.10?
2. Are you aware of any issues regarding running Flink on Java 11?
3. If my Job code would not use any code features from java 11, would flink
handle it when running on java 11? Or they are some internal functionalities
that would not be working on Java 11 (things that are using unsafe or
reflections?)

Thanks,
Krzysztof


[1]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/UnsupportedOperationException-from-org-apache-flink-shaded-asm6-org-objectweb-asm-ClassVisitor-visit1-td28571.html
[3] https://issues.apache.org/jira/browse/FLINK-10725



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink遇到 valueState 自身的 NPE

2020-01-09 Thread Kevin Liao
谢答,首先贴的代码确实是运行的程序

此外刚刚又通过打印 log 确认了 uniqMark == null 是 false

我现在的怀疑点是这个地方

if (null != uniqMark.value() && uniqMark.value()) {

第一处uniqMark.value()可以取到结果(不为 null),同时由于 ttl 策略会触发
clear,进而导致第二个uniqMark.value()取出来的就是 null
了(看来编译器没有对这种写法做优化,就是真实的执行了两次value()函数),追了下代码似乎可以支持我的猜测,所以准备明天验证下这个猜测

明天有消息同步,谢谢

Yun Tang  于2020年1月10日周五 上午12:59写道:

> Hi Kevin
>
> State TTL 是清理的state中的数据条目entry,不是清理state在map函数中的对象本身。所以无论如何,作为value
> state对象的uniqMark 是不会因为TTL而变成null的。
>
>
> 我注意到你的作业即使发生failover之后,立刻恢复的时候,仍然遇到了这个NPE问题,我怀疑你实际运行的代码第39行并不是你贴出来的代码,很有可能是对应你的代码的第34行,也就是map方法的输入RDLog是null,这也符合作业成功restore之后,又再次立即遇到failover的场景,也就是处理到了非法“脏数据”,导致作业不断failover。建议你按照这个思路确认排除一下。
>
> 祝好
> 唐云
> --
> *From:* Kevin Liao 
> *Sent:* Thursday, January 9, 2020 23:17
> *To:* user-zh@flink.apache.org 
> *Subject:* Re: flink遇到 valueState 自身的 NPE
>
> 谢答
> 但还有一问题想请教:
>
> 当 valueState 触发 ttl 被回收后,这里的引用应该也会被 gc 掉,就会变成 null 了吧?难道是 operator 原来处理这个
> key 的线程会被一并回收掉,下次这个 key 再来时其实已经是另外新生成的线程提供服务了(这样肯定要重新调用一次 open
> 方法)?看了代码但还没搞明白,求解惑,谢谢
>
> Benchao Li  于2020年1月9日周四 下午8:59写道:
>
> > 我感觉这个地方好像没有道理会有`uniqMark`变成`null`,
> > 除非是什么地方反序列化出来`StreamMap`,并且没有调用`StreamMap.open()`.
> > 但是看起来`StreamTask`是可以保证先调用`open`,再调用operator的处理函数的。我也看不出来这个地方有什么问题。
> >
> > Kevin Liao  于2020年1月9日周四 下午8:15写道:
> >
> > > https://tva4.sinaimg.cn/large/63137227ly1gaqkn1nlykj20mm0wvgq8.jpg
> > >
> > > 抱歉,再试试这个
> > >
> > > Benchao Li  于2020年1月9日周四 下午8:13写道:
> > >
> > > > 我这边点开是 403 Forbidden
> > > >
> > > > Kevin Liao  于2020年1月9日周四 下午8:09写道:
> > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://gm1.ggpht.com/FZGtbLggyPPZ_BoU0gt2SQTv7fyhNOKu03ZjsKq7G6DtqWJ5DY0NmL-2s64P-LUzedbTm8DE_FeggNtPAb4VEmypAoPfW8VFSFxOWxMGBvMi5G6xHoZ3THKPYHnAj8KydQ02OjvV-R3IEwBvwIDnZRmwCv3ohyjPF76gbvOOYrKzgaLb_pykWsQDpvROHr3lgU2rezH33Jt3xJEOjXGjHsUFUxiil0PYkQFdA0BP77lypYQLw4RL8BxMz3HfaCiNAGb_q5w8JNmckHLU3g9EuPgtqj6WP3XDv07PBuCXMvmfNcFbAciMeJuOOeE8VBqDCacjuiDtJzVrK1boxcBnzFvT_QazOwaJ27SSuJ_u5KCerTURen2vLBF1RN-x9eOVz9wg6w1oXyMAF7LMjGsYsVzUu3It5AyzLkm-_znosNtAJp2AW_qGmGo-k02fcrMjUoELiGvqn6W1kScnFI4gNWi_dpZe0Uoq1zF2m1crww1oNGOeRjFlCK_-iC19CPfsTVCtwN3tdKnaKdLe2TbfVdFA0DnBUz8NrhV-mvmZlEwi9-ngK-WOy8yjA4fin1zaE2SJCf2zfBSZwGR2eY_E_WZQiFRmSBI2A7vpoyFvTV3E99MIi0MC5PUAeRiu4v4JIVDkV_yUGIUvoa7pxdf7LpZN_DbikQVk7yES8kxxL5qG2Eae8vftWJuBVi5mWTxvElBgInyUntobXHdxfb2YR4JdBgVPN5QionJiIc9g5i0ClGECZbyHPbsQy4pEVw=s0-l75-ft-l75-ft
> > > > >
> > > > > 谢谢,看看能否看见
> > > > >
> > > > > Benchao Li  于2020年1月9日周四 下午8:07写道:
> > > > >
> > > > > > hi Kevin,
> > > > > >
> > > > > > 邮件里面贴不了图片,如果要贴图片,需要用一些第三方的图床工具。
> > > > > > 或者你可以直接贴文字?
> > > > > >
> > > > > > Kevin Liao  于2020年1月9日周四 下午7:10写道:
> > > > > >
> > > > > > > [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]
> > > > > > >
> > > > > > > Benchao Li  于2020年1月9日周四 下午6:42写道:
> > > > > > >
> > > > > > >> hi Kevin,
> > > > > > >>
> > > > > > >> 能贴一下MyMapFunction2.java:39 这里的代码吗?
> 从上面的日志看不出来是valueState是null呢。
> > > > > > >>
> > > > > > >> Kevin Liao  于2020年1月9日周四 下午5:57写道:
> > > > > > >>
> > > > > > >> > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
> > > > > > >> >
> > > > > > >> > ```
> > > > > > >> > 2020-01-09 05:14:04.087
> > [flink-akka.actor.default-dispatcher-28]
> > > > > INFO
> > > > > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  -
> Map
> > > ->
> > > > > > >> Filter ->
> > > > > > >> > Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b)
> > switched
> > > > from
> > > > > > >> > RUNNING to FAILED.
> > > > > > >> > java.lang.NullPointerException: null
> > > > > > >> > at
> com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
> > > > > > >> > at
> com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
> > > > > > >> > at
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> > > > > > >> > at
> > > > > > >> > org.apache.flink.streaming.runtime.io
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > >
> > .StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> > > > > > >> > at
> > > > > > >> > org.apache.flink.streaming.runtime.io
> > > > > > >> >
> > > > > >
> > > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> > > > > > >> > at
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> > > > > > >> > at
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> > > > > > >> > at
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> > > > > > >> > at
> > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > > > > > >> > at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > > > > >> > at 

Re: flink遇到 valueState 自身的 NPE

2020-01-09 Thread Yun Tang
Hi Kevin

State TTL 是清理的state中的数据条目entry,不是清理state在map函数中的对象本身。所以无论如何,作为value 
state对象的uniqMark 是不会因为TTL而变成null的。

我注意到你的作业即使发生failover之后,立刻恢复的时候,仍然遇到了这个NPE问题,我怀疑你实际运行的代码第39行并不是你贴出来的代码,很有可能是对应你的代码的第34行,也就是map方法的输入RDLog是null,这也符合作业成功restore之后,又再次立即遇到failover的场景,也就是处理到了非法“脏数据”,导致作业不断failover。建议你按照这个思路确认排除一下。

祝好
唐云

From: Kevin Liao 
Sent: Thursday, January 9, 2020 23:17
To: user-zh@flink.apache.org 
Subject: Re: flink遇到 valueState 自身的 NPE

谢答
但还有一问题想请教:

当 valueState 触发 ttl 被回收后,这里的引用应该也会被 gc 掉,就会变成 null 了吧?难道是 operator 原来处理这个
key 的线程会被一并回收掉,下次这个 key 再来时其实已经是另外新生成的线程提供服务了(这样肯定要重新调用一次 open
方法)?看了代码但还没搞明白,求解惑,谢谢

Benchao Li  于2020年1月9日周四 下午8:59写道:

> 我感觉这个地方好像没有道理会有`uniqMark`变成`null`,
> 除非是什么地方反序列化出来`StreamMap`,并且没有调用`StreamMap.open()`.
> 但是看起来`StreamTask`是可以保证先调用`open`,再调用operator的处理函数的。我也看不出来这个地方有什么问题。
>
> Kevin Liao  于2020年1月9日周四 下午8:15写道:
>
> > https://tva4.sinaimg.cn/large/63137227ly1gaqkn1nlykj20mm0wvgq8.jpg
> >
> > 抱歉,再试试这个
> >
> > Benchao Li  于2020年1月9日周四 下午8:13写道:
> >
> > > 我这边点开是 403 Forbidden
> > >
> > > Kevin Liao  于2020年1月9日周四 下午8:09写道:
> > >
> > > >
> > > >
> > >
> >
> https://gm1.ggpht.com/FZGtbLggyPPZ_BoU0gt2SQTv7fyhNOKu03ZjsKq7G6DtqWJ5DY0NmL-2s64P-LUzedbTm8DE_FeggNtPAb4VEmypAoPfW8VFSFxOWxMGBvMi5G6xHoZ3THKPYHnAj8KydQ02OjvV-R3IEwBvwIDnZRmwCv3ohyjPF76gbvOOYrKzgaLb_pykWsQDpvROHr3lgU2rezH33Jt3xJEOjXGjHsUFUxiil0PYkQFdA0BP77lypYQLw4RL8BxMz3HfaCiNAGb_q5w8JNmckHLU3g9EuPgtqj6WP3XDv07PBuCXMvmfNcFbAciMeJuOOeE8VBqDCacjuiDtJzVrK1boxcBnzFvT_QazOwaJ27SSuJ_u5KCerTURen2vLBF1RN-x9eOVz9wg6w1oXyMAF7LMjGsYsVzUu3It5AyzLkm-_znosNtAJp2AW_qGmGo-k02fcrMjUoELiGvqn6W1kScnFI4gNWi_dpZe0Uoq1zF2m1crww1oNGOeRjFlCK_-iC19CPfsTVCtwN3tdKnaKdLe2TbfVdFA0DnBUz8NrhV-mvmZlEwi9-ngK-WOy8yjA4fin1zaE2SJCf2zfBSZwGR2eY_E_WZQiFRmSBI2A7vpoyFvTV3E99MIi0MC5PUAeRiu4v4JIVDkV_yUGIUvoa7pxdf7LpZN_DbikQVk7yES8kxxL5qG2Eae8vftWJuBVi5mWTxvElBgInyUntobXHdxfb2YR4JdBgVPN5QionJiIc9g5i0ClGECZbyHPbsQy4pEVw=s0-l75-ft-l75-ft
> > > >
> > > > 谢谢,看看能否看见
> > > >
> > > > Benchao Li  于2020年1月9日周四 下午8:07写道:
> > > >
> > > > > hi Kevin,
> > > > >
> > > > > 邮件里面贴不了图片,如果要贴图片,需要用一些第三方的图床工具。
> > > > > 或者你可以直接贴文字?
> > > > >
> > > > > Kevin Liao  于2020年1月9日周四 下午7:10写道:
> > > > >
> > > > > > [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]
> > > > > >
> > > > > > Benchao Li  于2020年1月9日周四 下午6:42写道:
> > > > > >
> > > > > >> hi Kevin,
> > > > > >>
> > > > > >> 能贴一下MyMapFunction2.java:39 这里的代码吗? 从上面的日志看不出来是valueState是null呢。
> > > > > >>
> > > > > >> Kevin Liao  于2020年1月9日周四 下午5:57写道:
> > > > > >>
> > > > > >> > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
> > > > > >> >
> > > > > >> > ```
> > > > > >> > 2020-01-09 05:14:04.087
> [flink-akka.actor.default-dispatcher-28]
> > > > INFO
> > > > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map
> > ->
> > > > > >> Filter ->
> > > > > >> > Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b)
> switched
> > > from
> > > > > >> > RUNNING to FAILED.
> > > > > >> > java.lang.NullPointerException: null
> > > > > >> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
> > > > > >> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
> > > > > >> > at
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> > > > > >> > at
> > > > > >> > org.apache.flink.streaming.runtime.io
> > > > > >> >
> > > > > >>
> > > > >
> > >
> .StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> > > > > >> > at
> > > > > >> > org.apache.flink.streaming.runtime.io
> > > > > >> >
> > > > >
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> > > > > >> > at
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> > > > > >> > at
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> > > > > >> > at
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> > > > > >> > at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > > > > >> > at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > > > >> > at java.lang.Thread.run(Thread.java:748)
> > > > > >> > 2020-01-09 05:14:04.088
> [flink-akka.actor.default-dispatcher-28]
> > > > INFO
> > > > > >> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  -
> > > > > Calculating
> > > > > >> > tasks to restart to recover the failed task
> > > > > >> > 90bea66de1c231edf33913ecd54406c1_2.
> > > > > >> > 2020-01-09 05:14:04.088
> [flink-akka.actor.default-dispatcher-28]
> > > > INFO
> > > > > >> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - 12
> > > tasks
> > > > > >> should
> > > > > >> > be restarted to recover the failed task
> > > > > >> 

Re: Flink logging issue with logback

2020-01-09 Thread Maximilian Michels

FYI, there is also a PR: https://github.com/apache/flink/pull/10811

On 09.01.20 01:53, Bajaj, Abhinav wrote:

Thanks Dawid, Max and Yang for confirming the issue and providing potential 
workaround.

On 1/8/20, 3:24 AM, "Maximilian Michels"  wrote:

 Interesting that we came across this problem at the same time. We have
 observed this with Lyft's K8s operator which uses the Rest API for job
 submission, much like the Flink dashboard.
 
 Note that you can restore the original stdout/stderr in your program:
 
private static void restoreStdOutAndStdErr() {

  System.setOut(new PrintStream(
  new FileOutputStream(FileDescriptor.out)));
  System.setErr(new PrintStream(
  new FileOutputStream(FileDescriptor.err)));
}
 
 Just call restoreStdOutAndStdErr() before you start building the Flink

 job. Of course, this is just meant to be a workaround.
 
 I think an acceptable solution is to always print upon execution. For

 the plan preview we may keep the existing behavior.
 
 Cheers,

 Max
 
 On 07.01.20 17:39, Dawid Wysakowicz wrote:

 > A quick update. The suppression of stdout/stderr actually might soon be
 > dropped, see: 
https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15504data=01%7C01%7C%7Ce78cef8e4589a72d08d7942d53eb%7C6d4034cd72254f72b85391feaea64919%7C1sdata=a2dpid%2Fm1SN8%2F5sx09%2FbuVuk%2FI7UM%2BWMZtNALaYf8rU%3Dreserved=0
 >
 > Best,
 >
 > Dawid
 >
 > On 07/01/2020 07:17, Yang Wang wrote:
 >> Hi Bajaj,
 >>
 >> I have tested just as you say, and find that the logs in the user
 >> class could not show up when
 >> using ConsoleAppender. If using FileAppender instead, everything goes
 >> well.
 >>
 >> It is so weird and i have no idea how to debug it.
 >> Best,
 >> Yang
 >>
 >> Bajaj, Abhinav > > 于2020年1月7日周二 上午4:28写道:
 >>
 >> Hi,
 >>
 >> Thanks much for the responses.
 >>
 >> Let me add some more details and clarify my question.
 >>
 >> _Setup_
 >>
 >>   * I used the WikipediaAnalysis example and added a log in main
 >> method.
 >>
 >> ……
 >>
 >> public static void main(String[] args) throws Exception {
 >>   StreamExecutionEnvironment see =
 >> StreamExecutionEnvironment./getExecutionEnvironment/();
 >> /LOG/.info("Info log for test");
 >>
 >> DataStream edits = see.addSource(new
 >> WikipediaEditsSource());
 >>
 >> ……
 >>
 >>   * I am using the Flink 1.7.1 distribution and starting
 >> jobmanager and taskmanager locally using the below commands –
 >>   o ./bin/jobmanager.sh start-foreground
 >>   o ./bin/taskmanager.sh start-foreground
 >>   o Both jobmanager and taskmanager log in the console now
 >>   o JVM options are correctly set and verified from jobmanager
 >> & taskmanager logs
 >>
 >>   * I submit the WikipediaAnalysis job from Flink dashboard and
 >> checked the jobmanager logs
 >>
 >> _Run 1_: Flink is using the default log4j logging
 >>
 >>   * Jobmanager logs the added info log from the job
 >>   o 2020-01-06 11:55:37,422 INFO wikiedits.WikipediaAnalysis -
 >> Info log for test
 >>
 >> _Run 2_: Flink is setup to use logback as suggested in Flink
 >> documentation here
 >> 

 >>
 >>   * Jobmanger does not log the added info log from the job
 >>
 >> So, it seems there is a logging behavior difference between using
 >> log4j & logback in Flink.
 >>
 >> Is this expected or a known difference?
 >>
 >> Thanks again,
 >>
 >> Abhinav Bajaj
 >>
 >> _PS_: Ahh. I see how my email was confusing the first time.
 >> Hopefully this one is better :P
 >>
 >> *From: *Dawid Wysakowicz > >
 >> *Date: *Monday, January 6, 2020 at 5:13 AM
 >> *Cc: *"Bajaj, Abhinav" > >, "user@flink.apache.org
 >> " > >
 >> *Subject: *Re: Flink logging issue with logback
 >>
 >> Hi Bajaj,
 >>
 >> I am not entirely sure what is the actual issue you are seeking
 >> 

Re: Completed job wasn't saved to archive

2020-01-09 Thread Rong Rong
Hi Pavel,

Sorry for bringing this thread up so late. I was digging into the usage of
the Flink history server and I found one situation where there would be no
logs and no failure/success message from the cluster:
In very rare case in our Flink-YARN session cluster: if an application
master (JobManager running container) fails and being restarted as a YARN
2nd attempt (we haven't enable HA) - then there will be no logs of
archiving being logged whatsoever. However in this case the there would be
a completely new AM container brought up running the JM again (e.g. new log
files)

I am not exactly sure whether this suites your scenarios. Could you
describe a bit more on how your cluster was configured?

Thanks,
Rong

On Mon, Nov 25, 2019 at 10:49 AM Chesnay Schepler 
wrote:

> I'm afraid I can't think of a solution. I don't see a way how this
> operation can succeed or fail without anything being logged.
>
> Is the cluster behaving normally afterwards? Could you check whether the 
> numRunningJobs
> ticks down properly after the job was canceled?
>
>
> On 22/11/2019 13:27, Pavel Potseluev wrote:
>
> Hi Chesnay,
>
> We archive jobs on s3 file system. We don't configure a throttling for
> write operations and afaik it isn't possible now and will be implemented in
> FLINK-13251 . And
> other write operations (like checkpoints saving) work fine. But I don't see
> archived job or message about archiving failure at all. It looks like Flink
> just didn't try to save job to archive.
>
> 21.11.2019, 17:17, "Chesnay Schepler" 
> :
>
> If the archiving fails there should be some log message, like "Failed to
> archive job" or "Could not archive completed job..." .
> If nothing of the sort is logged my first instinct would be that the
> operation is being slowed down, _a lot_.
>
> Where are you archiving them to? Could it be the write operation is being
> throttled heavily?
>
> On 21/11/2019 13:48, Pavel Potseluev wrote:
>
> Hi Vino,
>
> Usually Flink archives jobs correctly and the problem is rarely
> reproduced. So I think it isn't a problem with configuration.
>
> Job Manager log when job 5ec264a20bb5005cdbd8e23a5e59f136 was canceled:
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:52:13.294 [Checkpoint
> Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  -
> Triggering checkpoint 1872 @ 1574092333218 for job
> 5ec264a20bb5005cdbd8e23a5e59f136.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:52:37.260
> [flink-akka.actor.default-dispatcher-30] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 1872 for job 5ec264a20bb5005cdbd8e23a5e59f136 (568048140 bytes
> in 23541 ms).
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:13.314 [Checkpoint
> Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  -
> Triggering checkpoint 1873 @ 1574092393218 for job
> 5ec264a20bb5005cdbd8e23a5e59f136.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.279
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
> bureau-user-offers-statistics-AUTORU-USERS_AUTORU
> (5ec264a20bb5005cdbd8e23a5e59f136) switched from state RUNNING to
> CANCELLING.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.279
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> File Source (1/1) (934d89cf3d7999b40225dd8009b5493c) switched from RUNNING
> to CANCELING.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> kafka-source-moderation-update-journal-autoru -> Filter -> Flat Map (1/2)
> (47656a3c4fc70e19622acca31267e41f) switched from RUNNING to CANCELING.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> kafka-source-moderation-update-journal-autoru -> Filter -> Flat Map (2/2)
> (be3c4562e65d3d6bdfda4f1632017c6c) switched from RUNNING to CANCELING.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  -
> user-offers-statistics-init-from-file -> Map (1/2)
> (4a45ed43b05e4d444e190a44b33514ac) switched from RUNNING to CANCELING.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  -
> user-offers-statistics-init-from-file -> Map (2/2)
> (bb3be311c5e53abaedb06b4d0148c23f) switched from RUNNING to CANCELING.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - 

Re: flink遇到 valueState 自身的 NPE

2020-01-09 Thread Kevin Liao
谢答
但还有一问题想请教:

当 valueState 触发 ttl 被回收后,这里的引用应该也会被 gc 掉,就会变成 null 了吧?难道是 operator 原来处理这个
key 的线程会被一并回收掉,下次这个 key 再来时其实已经是另外新生成的线程提供服务了(这样肯定要重新调用一次 open
方法)?看了代码但还没搞明白,求解惑,谢谢

Benchao Li  于2020年1月9日周四 下午8:59写道:

> 我感觉这个地方好像没有道理会有`uniqMark`变成`null`,
> 除非是什么地方反序列化出来`StreamMap`,并且没有调用`StreamMap.open()`.
> 但是看起来`StreamTask`是可以保证先调用`open`,再调用operator的处理函数的。我也看不出来这个地方有什么问题。
>
> Kevin Liao  于2020年1月9日周四 下午8:15写道:
>
> > https://tva4.sinaimg.cn/large/63137227ly1gaqkn1nlykj20mm0wvgq8.jpg
> >
> > 抱歉,再试试这个
> >
> > Benchao Li  于2020年1月9日周四 下午8:13写道:
> >
> > > 我这边点开是 403 Forbidden
> > >
> > > Kevin Liao  于2020年1月9日周四 下午8:09写道:
> > >
> > > >
> > > >
> > >
> >
> https://gm1.ggpht.com/FZGtbLggyPPZ_BoU0gt2SQTv7fyhNOKu03ZjsKq7G6DtqWJ5DY0NmL-2s64P-LUzedbTm8DE_FeggNtPAb4VEmypAoPfW8VFSFxOWxMGBvMi5G6xHoZ3THKPYHnAj8KydQ02OjvV-R3IEwBvwIDnZRmwCv3ohyjPF76gbvOOYrKzgaLb_pykWsQDpvROHr3lgU2rezH33Jt3xJEOjXGjHsUFUxiil0PYkQFdA0BP77lypYQLw4RL8BxMz3HfaCiNAGb_q5w8JNmckHLU3g9EuPgtqj6WP3XDv07PBuCXMvmfNcFbAciMeJuOOeE8VBqDCacjuiDtJzVrK1boxcBnzFvT_QazOwaJ27SSuJ_u5KCerTURen2vLBF1RN-x9eOVz9wg6w1oXyMAF7LMjGsYsVzUu3It5AyzLkm-_znosNtAJp2AW_qGmGo-k02fcrMjUoELiGvqn6W1kScnFI4gNWi_dpZe0Uoq1zF2m1crww1oNGOeRjFlCK_-iC19CPfsTVCtwN3tdKnaKdLe2TbfVdFA0DnBUz8NrhV-mvmZlEwi9-ngK-WOy8yjA4fin1zaE2SJCf2zfBSZwGR2eY_E_WZQiFRmSBI2A7vpoyFvTV3E99MIi0MC5PUAeRiu4v4JIVDkV_yUGIUvoa7pxdf7LpZN_DbikQVk7yES8kxxL5qG2Eae8vftWJuBVi5mWTxvElBgInyUntobXHdxfb2YR4JdBgVPN5QionJiIc9g5i0ClGECZbyHPbsQy4pEVw=s0-l75-ft-l75-ft
> > > >
> > > > 谢谢,看看能否看见
> > > >
> > > > Benchao Li  于2020年1月9日周四 下午8:07写道:
> > > >
> > > > > hi Kevin,
> > > > >
> > > > > 邮件里面贴不了图片,如果要贴图片,需要用一些第三方的图床工具。
> > > > > 或者你可以直接贴文字?
> > > > >
> > > > > Kevin Liao  于2020年1月9日周四 下午7:10写道:
> > > > >
> > > > > > [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]
> > > > > >
> > > > > > Benchao Li  于2020年1月9日周四 下午6:42写道:
> > > > > >
> > > > > >> hi Kevin,
> > > > > >>
> > > > > >> 能贴一下MyMapFunction2.java:39 这里的代码吗? 从上面的日志看不出来是valueState是null呢。
> > > > > >>
> > > > > >> Kevin Liao  于2020年1月9日周四 下午5:57写道:
> > > > > >>
> > > > > >> > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
> > > > > >> >
> > > > > >> > ```
> > > > > >> > 2020-01-09 05:14:04.087
> [flink-akka.actor.default-dispatcher-28]
> > > > INFO
> > > > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map
> > ->
> > > > > >> Filter ->
> > > > > >> > Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b)
> switched
> > > from
> > > > > >> > RUNNING to FAILED.
> > > > > >> > java.lang.NullPointerException: null
> > > > > >> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
> > > > > >> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
> > > > > >> > at
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> > > > > >> > at
> > > > > >> > org.apache.flink.streaming.runtime.io
> > > > > >> >
> > > > > >>
> > > > >
> > >
> .StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> > > > > >> > at
> > > > > >> > org.apache.flink.streaming.runtime.io
> > > > > >> >
> > > > >
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> > > > > >> > at
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> > > > > >> > at
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> > > > > >> > at
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> > > > > >> > at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > > > > >> > at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > > > >> > at java.lang.Thread.run(Thread.java:748)
> > > > > >> > 2020-01-09 05:14:04.088
> [flink-akka.actor.default-dispatcher-28]
> > > > INFO
> > > > > >> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  -
> > > > > Calculating
> > > > > >> > tasks to restart to recover the failed task
> > > > > >> > 90bea66de1c231edf33913ecd54406c1_2.
> > > > > >> > 2020-01-09 05:14:04.088
> [flink-akka.actor.default-dispatcher-28]
> > > > INFO
> > > > > >> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - 12
> > > tasks
> > > > > >> should
> > > > > >> > be restarted to recover the failed task
> > > > > >> 90bea66de1c231edf33913ecd54406c1_2.
> > > > > >> > 2020-01-09 05:14:04.089
> [flink-akka.actor.default-dispatcher-28]
> > > > INFO
> > > > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  -
> > Source:
> > > > > >> Custom
> > > > > >> > Source -> Filter (6/6) (ac52050e60236cd1efcd262c8240cd25)
> > switched
> > > > > from
> > > > > >> > RUNNING to CANCELING.
> > > > > >> > 2020-01-09 05:14:04.089
> [flink-akka.actor.default-dispatcher-28]
> > > > INFO
> > > > > >> >  

Re: UnsupportedOperationException from org.apache.flink.shaded.asm6.org.objectweb.asm.ClassVisitor.visitNestHostExperimental using Java 11

2020-01-09 Thread KristoffSC
Hi,
are there any plans to support Java 11?

Thanks,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Job Cluster vs Session Cluster deploying and configuration

2020-01-09 Thread KristoffSC
Hi all,
I'm researching docker/k8s deployment possibilities for Flink 1.9.1.

I'm after reading/watching [1][2][3][4].

Currently we do think that we will try go with Job Cluster approach although
we would like to know what is the community trend with this? We would rather
not deploy more than one job per Flink cluster.

Anyways, I was wondering about few things:

1. How can I change the number of task slots per task manager for Job and
Session Cluster? In my case I'm running docker on VirtualBox where I have 4
CPUs assigned to this machine. However each task manager is spawned with
only one task slot for Job Cluster. With Session Cluster however, on the
same machine, each task manager is spawned with 4 task slots.

In both cases Flink's UI shows that each Task manager has 4 CPUs.


2. How can I resubmit job if I'm using a Job Cluster. I'm referring this use
case [5]. You may say that I have to start the job again but with different
arguments. What is the procedure for this? I'm using checkpoints btw.

Should I kill all task manager containers and rerun them with different
parameters?

3. How I can resubmit job using Session Cluster?

4. How I can provide log config for Job/Session cluster?
I have a case, where I changed log level and log format in log4j.properties
and this is working fine on local (IDE) environment. However when I build
the fat jar, and ran a Job Cluster based on this jar it seams that my log4j
properties are not passed to the cluster. I see the original format and
original (INFO) level.

Thanks,


[1] https://youtu.be/w721NI-mtAA
[2] https://youtu.be/WeHuTRwicSw
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html
[4]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
[5]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Job-claster-scalability-td32027.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Job claster scalability

2020-01-09 Thread KristoffSC
Thank you David and Zhu Zhu,
this helps a lot.

I have follow up questions though.

Having this
/"Instead the Job must be stopped via a savepoint and restarted with a new
parallelism"/

and slot sharing [1] feature, I got the impression that if I would start my
cluster with more than 6 task slots, Flink will try deploy tasks across all
resources, trying to use all available resources during job submission

I did a two tests having my original task.
1. I started a Job Cluster with 7 task slots (7 task manager since in this
case 1 task manager has one task slot).
2. I started a Session cluster with 28 task slots in total. In this case I
had 7 task managers, 4 task slot each. 

For case 1, I use "FLINK_JOB" variable as stated in [2]. For case 2, I
submitted my job from UI after Flink started to be operative. 


For both cases it used only 6 task slots, so it was still reusing task
slots. I got the impression that it will try to use as much available
resources as it can.

What do you think about this?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
[2]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink遇到 valueState 自身的 NPE

2020-01-09 Thread Benchao Li
我感觉这个地方好像没有道理会有`uniqMark`变成`null`,
除非是什么地方反序列化出来`StreamMap`,并且没有调用`StreamMap.open()`.
但是看起来`StreamTask`是可以保证先调用`open`,再调用operator的处理函数的。我也看不出来这个地方有什么问题。

Kevin Liao  于2020年1月9日周四 下午8:15写道:

> https://tva4.sinaimg.cn/large/63137227ly1gaqkn1nlykj20mm0wvgq8.jpg
>
> 抱歉,再试试这个
>
> Benchao Li  于2020年1月9日周四 下午8:13写道:
>
> > 我这边点开是 403 Forbidden
> >
> > Kevin Liao  于2020年1月9日周四 下午8:09写道:
> >
> > >
> > >
> >
> https://gm1.ggpht.com/FZGtbLggyPPZ_BoU0gt2SQTv7fyhNOKu03ZjsKq7G6DtqWJ5DY0NmL-2s64P-LUzedbTm8DE_FeggNtPAb4VEmypAoPfW8VFSFxOWxMGBvMi5G6xHoZ3THKPYHnAj8KydQ02OjvV-R3IEwBvwIDnZRmwCv3ohyjPF76gbvOOYrKzgaLb_pykWsQDpvROHr3lgU2rezH33Jt3xJEOjXGjHsUFUxiil0PYkQFdA0BP77lypYQLw4RL8BxMz3HfaCiNAGb_q5w8JNmckHLU3g9EuPgtqj6WP3XDv07PBuCXMvmfNcFbAciMeJuOOeE8VBqDCacjuiDtJzVrK1boxcBnzFvT_QazOwaJ27SSuJ_u5KCerTURen2vLBF1RN-x9eOVz9wg6w1oXyMAF7LMjGsYsVzUu3It5AyzLkm-_znosNtAJp2AW_qGmGo-k02fcrMjUoELiGvqn6W1kScnFI4gNWi_dpZe0Uoq1zF2m1crww1oNGOeRjFlCK_-iC19CPfsTVCtwN3tdKnaKdLe2TbfVdFA0DnBUz8NrhV-mvmZlEwi9-ngK-WOy8yjA4fin1zaE2SJCf2zfBSZwGR2eY_E_WZQiFRmSBI2A7vpoyFvTV3E99MIi0MC5PUAeRiu4v4JIVDkV_yUGIUvoa7pxdf7LpZN_DbikQVk7yES8kxxL5qG2Eae8vftWJuBVi5mWTxvElBgInyUntobXHdxfb2YR4JdBgVPN5QionJiIc9g5i0ClGECZbyHPbsQy4pEVw=s0-l75-ft-l75-ft
> > >
> > > 谢谢,看看能否看见
> > >
> > > Benchao Li  于2020年1月9日周四 下午8:07写道:
> > >
> > > > hi Kevin,
> > > >
> > > > 邮件里面贴不了图片,如果要贴图片,需要用一些第三方的图床工具。
> > > > 或者你可以直接贴文字?
> > > >
> > > > Kevin Liao  于2020年1月9日周四 下午7:10写道:
> > > >
> > > > > [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]
> > > > >
> > > > > Benchao Li  于2020年1月9日周四 下午6:42写道:
> > > > >
> > > > >> hi Kevin,
> > > > >>
> > > > >> 能贴一下MyMapFunction2.java:39 这里的代码吗? 从上面的日志看不出来是valueState是null呢。
> > > > >>
> > > > >> Kevin Liao  于2020年1月9日周四 下午5:57写道:
> > > > >>
> > > > >> > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
> > > > >> >
> > > > >> > ```
> > > > >> > 2020-01-09 05:14:04.087 [flink-akka.actor.default-dispatcher-28]
> > > INFO
> > > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map
> ->
> > > > >> Filter ->
> > > > >> > Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b) switched
> > from
> > > > >> > RUNNING to FAILED.
> > > > >> > java.lang.NullPointerException: null
> > > > >> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
> > > > >> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> > > > >> > at
> > > > >> > org.apache.flink.streaming.runtime.io
> > > > >> >
> > > > >>
> > > >
> > .StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> > > > >> > at
> > > > >> > org.apache.flink.streaming.runtime.io
> > > > >> >
> > > >
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> > > > >> > at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > > > >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > > >> > at java.lang.Thread.run(Thread.java:748)
> > > > >> > 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28]
> > > INFO
> > > > >> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  -
> > > > Calculating
> > > > >> > tasks to restart to recover the failed task
> > > > >> > 90bea66de1c231edf33913ecd54406c1_2.
> > > > >> > 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28]
> > > INFO
> > > > >> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - 12
> > tasks
> > > > >> should
> > > > >> > be restarted to recover the failed task
> > > > >> 90bea66de1c231edf33913ecd54406c1_2.
> > > > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> > > INFO
> > > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  -
> Source:
> > > > >> Custom
> > > > >> > Source -> Filter (6/6) (ac52050e60236cd1efcd262c8240cd25)
> switched
> > > > from
> > > > >> > RUNNING to CANCELING.
> > > > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> > > INFO
> > > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  -
> Source:
> > > > >> Custom
> > > > >> > Source -> Filter (5/6) (cf4ff0c189315b27e7e2178d8c60e49f)
> switched
> > > > from
> > > > >> > RUNNING to CANCELING.
> > > > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> > > INFO
> > > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  -
> Source:
> > > > >> Custom
> > > > >> > Source -> Filter (4/6) (8c8b07cb39a3f682f41f102e614765e6)
> switched
> > > > from

Re: flink遇到 valueState 自身的 NPE

2020-01-09 Thread Kevin Liao
https://tva4.sinaimg.cn/large/63137227ly1gaqkn1nlykj20mm0wvgq8.jpg

抱歉,再试试这个

Benchao Li  于2020年1月9日周四 下午8:13写道:

> 我这边点开是 403 Forbidden
>
> Kevin Liao  于2020年1月9日周四 下午8:09写道:
>
> >
> >
> https://gm1.ggpht.com/FZGtbLggyPPZ_BoU0gt2SQTv7fyhNOKu03ZjsKq7G6DtqWJ5DY0NmL-2s64P-LUzedbTm8DE_FeggNtPAb4VEmypAoPfW8VFSFxOWxMGBvMi5G6xHoZ3THKPYHnAj8KydQ02OjvV-R3IEwBvwIDnZRmwCv3ohyjPF76gbvOOYrKzgaLb_pykWsQDpvROHr3lgU2rezH33Jt3xJEOjXGjHsUFUxiil0PYkQFdA0BP77lypYQLw4RL8BxMz3HfaCiNAGb_q5w8JNmckHLU3g9EuPgtqj6WP3XDv07PBuCXMvmfNcFbAciMeJuOOeE8VBqDCacjuiDtJzVrK1boxcBnzFvT_QazOwaJ27SSuJ_u5KCerTURen2vLBF1RN-x9eOVz9wg6w1oXyMAF7LMjGsYsVzUu3It5AyzLkm-_znosNtAJp2AW_qGmGo-k02fcrMjUoELiGvqn6W1kScnFI4gNWi_dpZe0Uoq1zF2m1crww1oNGOeRjFlCK_-iC19CPfsTVCtwN3tdKnaKdLe2TbfVdFA0DnBUz8NrhV-mvmZlEwi9-ngK-WOy8yjA4fin1zaE2SJCf2zfBSZwGR2eY_E_WZQiFRmSBI2A7vpoyFvTV3E99MIi0MC5PUAeRiu4v4JIVDkV_yUGIUvoa7pxdf7LpZN_DbikQVk7yES8kxxL5qG2Eae8vftWJuBVi5mWTxvElBgInyUntobXHdxfb2YR4JdBgVPN5QionJiIc9g5i0ClGECZbyHPbsQy4pEVw=s0-l75-ft-l75-ft
> >
> > 谢谢,看看能否看见
> >
> > Benchao Li  于2020年1月9日周四 下午8:07写道:
> >
> > > hi Kevin,
> > >
> > > 邮件里面贴不了图片,如果要贴图片,需要用一些第三方的图床工具。
> > > 或者你可以直接贴文字?
> > >
> > > Kevin Liao  于2020年1月9日周四 下午7:10写道:
> > >
> > > > [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]
> > > >
> > > > Benchao Li  于2020年1月9日周四 下午6:42写道:
> > > >
> > > >> hi Kevin,
> > > >>
> > > >> 能贴一下MyMapFunction2.java:39 这里的代码吗? 从上面的日志看不出来是valueState是null呢。
> > > >>
> > > >> Kevin Liao  于2020年1月9日周四 下午5:57写道:
> > > >>
> > > >> > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
> > > >> >
> > > >> > ```
> > > >> > 2020-01-09 05:14:04.087 [flink-akka.actor.default-dispatcher-28]
> > INFO
> > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
> > > >> Filter ->
> > > >> > Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b) switched
> from
> > > >> > RUNNING to FAILED.
> > > >> > java.lang.NullPointerException: null
> > > >> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
> > > >> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> > > >> > at
> > > >> > org.apache.flink.streaming.runtime.io
> > > >> >
> > > >>
> > >
> .StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> > > >> > at
> > > >> > org.apache.flink.streaming.runtime.io
> > > >> >
> > > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> > > >> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > > >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > >> > at java.lang.Thread.run(Thread.java:748)
> > > >> > 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28]
> > INFO
> > > >> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  -
> > > Calculating
> > > >> > tasks to restart to recover the failed task
> > > >> > 90bea66de1c231edf33913ecd54406c1_2.
> > > >> > 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28]
> > INFO
> > > >> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - 12
> tasks
> > > >> should
> > > >> > be restarted to recover the failed task
> > > >> 90bea66de1c231edf33913ecd54406c1_2.
> > > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> > INFO
> > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> > > >> Custom
> > > >> > Source -> Filter (6/6) (ac52050e60236cd1efcd262c8240cd25) switched
> > > from
> > > >> > RUNNING to CANCELING.
> > > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> > INFO
> > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> > > >> Custom
> > > >> > Source -> Filter (5/6) (cf4ff0c189315b27e7e2178d8c60e49f) switched
> > > from
> > > >> > RUNNING to CANCELING.
> > > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> > INFO
> > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> > > >> Custom
> > > >> > Source -> Filter (4/6) (8c8b07cb39a3f682f41f102e614765e6) switched
> > > from
> > > >> > RUNNING to CANCELING.
> > > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> > INFO
> > > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> > > >> Custom
> > > >> > Source -> Filter (3/6) (34aadddbffe9f61b1916bcd1427ced96) switched
> > > from
> > > >> > RUNNING to CANCELING.
> > > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> > INFO
> > > >> >  

Re: flink遇到 valueState 自身的 NPE

2020-01-09 Thread Benchao Li
我这边点开是 403 Forbidden

Kevin Liao  于2020年1月9日周四 下午8:09写道:

>
> https://gm1.ggpht.com/FZGtbLggyPPZ_BoU0gt2SQTv7fyhNOKu03ZjsKq7G6DtqWJ5DY0NmL-2s64P-LUzedbTm8DE_FeggNtPAb4VEmypAoPfW8VFSFxOWxMGBvMi5G6xHoZ3THKPYHnAj8KydQ02OjvV-R3IEwBvwIDnZRmwCv3ohyjPF76gbvOOYrKzgaLb_pykWsQDpvROHr3lgU2rezH33Jt3xJEOjXGjHsUFUxiil0PYkQFdA0BP77lypYQLw4RL8BxMz3HfaCiNAGb_q5w8JNmckHLU3g9EuPgtqj6WP3XDv07PBuCXMvmfNcFbAciMeJuOOeE8VBqDCacjuiDtJzVrK1boxcBnzFvT_QazOwaJ27SSuJ_u5KCerTURen2vLBF1RN-x9eOVz9wg6w1oXyMAF7LMjGsYsVzUu3It5AyzLkm-_znosNtAJp2AW_qGmGo-k02fcrMjUoELiGvqn6W1kScnFI4gNWi_dpZe0Uoq1zF2m1crww1oNGOeRjFlCK_-iC19CPfsTVCtwN3tdKnaKdLe2TbfVdFA0DnBUz8NrhV-mvmZlEwi9-ngK-WOy8yjA4fin1zaE2SJCf2zfBSZwGR2eY_E_WZQiFRmSBI2A7vpoyFvTV3E99MIi0MC5PUAeRiu4v4JIVDkV_yUGIUvoa7pxdf7LpZN_DbikQVk7yES8kxxL5qG2Eae8vftWJuBVi5mWTxvElBgInyUntobXHdxfb2YR4JdBgVPN5QionJiIc9g5i0ClGECZbyHPbsQy4pEVw=s0-l75-ft-l75-ft
>
> 谢谢,看看能否看见
>
> Benchao Li  于2020年1月9日周四 下午8:07写道:
>
> > hi Kevin,
> >
> > 邮件里面贴不了图片,如果要贴图片,需要用一些第三方的图床工具。
> > 或者你可以直接贴文字?
> >
> > Kevin Liao  于2020年1月9日周四 下午7:10写道:
> >
> > > [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]
> > >
> > > Benchao Li  于2020年1月9日周四 下午6:42写道:
> > >
> > >> hi Kevin,
> > >>
> > >> 能贴一下MyMapFunction2.java:39 这里的代码吗? 从上面的日志看不出来是valueState是null呢。
> > >>
> > >> Kevin Liao  于2020年1月9日周四 下午5:57写道:
> > >>
> > >> > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
> > >> >
> > >> > ```
> > >> > 2020-01-09 05:14:04.087 [flink-akka.actor.default-dispatcher-28]
> INFO
> > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
> > >> Filter ->
> > >> > Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b) switched from
> > >> > RUNNING to FAILED.
> > >> > java.lang.NullPointerException: null
> > >> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
> > >> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> > >> > at
> > >> > org.apache.flink.streaming.runtime.io
> > >> >
> > >>
> > .StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> > >> > at
> > >> > org.apache.flink.streaming.runtime.io
> > >> >
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> > >> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > >> > at java.lang.Thread.run(Thread.java:748)
> > >> > 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28]
> INFO
> > >> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  -
> > Calculating
> > >> > tasks to restart to recover the failed task
> > >> > 90bea66de1c231edf33913ecd54406c1_2.
> > >> > 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28]
> INFO
> > >> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - 12 tasks
> > >> should
> > >> > be restarted to recover the failed task
> > >> 90bea66de1c231edf33913ecd54406c1_2.
> > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> INFO
> > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> > >> Custom
> > >> > Source -> Filter (6/6) (ac52050e60236cd1efcd262c8240cd25) switched
> > from
> > >> > RUNNING to CANCELING.
> > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> INFO
> > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> > >> Custom
> > >> > Source -> Filter (5/6) (cf4ff0c189315b27e7e2178d8c60e49f) switched
> > from
> > >> > RUNNING to CANCELING.
> > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> INFO
> > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> > >> Custom
> > >> > Source -> Filter (4/6) (8c8b07cb39a3f682f41f102e614765e6) switched
> > from
> > >> > RUNNING to CANCELING.
> > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> INFO
> > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> > >> Custom
> > >> > Source -> Filter (3/6) (34aadddbffe9f61b1916bcd1427ced96) switched
> > from
> > >> > RUNNING to CANCELING.
> > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> INFO
> > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
> > >> Filter ->
> > >> > Sink: Unnamed (6/6) (de624cf5c9d4dec6fe68d4800c701457) switched from
> > >> > RUNNING to CANCELING.
> > >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28]
> INFO
> > >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
> > >> Filter ->
> > >> > Sink: Unnamed (4/6) 

Re: flink遇到 valueState 自身的 NPE

2020-01-09 Thread Kevin Liao
https://gm1.ggpht.com/FZGtbLggyPPZ_BoU0gt2SQTv7fyhNOKu03ZjsKq7G6DtqWJ5DY0NmL-2s64P-LUzedbTm8DE_FeggNtPAb4VEmypAoPfW8VFSFxOWxMGBvMi5G6xHoZ3THKPYHnAj8KydQ02OjvV-R3IEwBvwIDnZRmwCv3ohyjPF76gbvOOYrKzgaLb_pykWsQDpvROHr3lgU2rezH33Jt3xJEOjXGjHsUFUxiil0PYkQFdA0BP77lypYQLw4RL8BxMz3HfaCiNAGb_q5w8JNmckHLU3g9EuPgtqj6WP3XDv07PBuCXMvmfNcFbAciMeJuOOeE8VBqDCacjuiDtJzVrK1boxcBnzFvT_QazOwaJ27SSuJ_u5KCerTURen2vLBF1RN-x9eOVz9wg6w1oXyMAF7LMjGsYsVzUu3It5AyzLkm-_znosNtAJp2AW_qGmGo-k02fcrMjUoELiGvqn6W1kScnFI4gNWi_dpZe0Uoq1zF2m1crww1oNGOeRjFlCK_-iC19CPfsTVCtwN3tdKnaKdLe2TbfVdFA0DnBUz8NrhV-mvmZlEwi9-ngK-WOy8yjA4fin1zaE2SJCf2zfBSZwGR2eY_E_WZQiFRmSBI2A7vpoyFvTV3E99MIi0MC5PUAeRiu4v4JIVDkV_yUGIUvoa7pxdf7LpZN_DbikQVk7yES8kxxL5qG2Eae8vftWJuBVi5mWTxvElBgInyUntobXHdxfb2YR4JdBgVPN5QionJiIc9g5i0ClGECZbyHPbsQy4pEVw=s0-l75-ft-l75-ft

谢谢,看看能否看见

Benchao Li  于2020年1月9日周四 下午8:07写道:

> hi Kevin,
>
> 邮件里面贴不了图片,如果要贴图片,需要用一些第三方的图床工具。
> 或者你可以直接贴文字?
>
> Kevin Liao  于2020年1月9日周四 下午7:10写道:
>
> > [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]
> >
> > Benchao Li  于2020年1月9日周四 下午6:42写道:
> >
> >> hi Kevin,
> >>
> >> 能贴一下MyMapFunction2.java:39 这里的代码吗? 从上面的日志看不出来是valueState是null呢。
> >>
> >> Kevin Liao  于2020年1月9日周四 下午5:57写道:
> >>
> >> > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
> >> >
> >> > ```
> >> > 2020-01-09 05:14:04.087 [flink-akka.actor.default-dispatcher-28] INFO
> >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
> >> Filter ->
> >> > Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b) switched from
> >> > RUNNING to FAILED.
> >> > java.lang.NullPointerException: null
> >> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
> >> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> >> > at
> >> > org.apache.flink.streaming.runtime.io
> >> >
> >>
> .StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> >> > at
> >> > org.apache.flink.streaming.runtime.io
> >> >
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> >> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> >> > at java.lang.Thread.run(Thread.java:748)
> >> > 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28] INFO
> >> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  -
> Calculating
> >> > tasks to restart to recover the failed task
> >> > 90bea66de1c231edf33913ecd54406c1_2.
> >> > 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28] INFO
> >> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - 12 tasks
> >> should
> >> > be restarted to recover the failed task
> >> 90bea66de1c231edf33913ecd54406c1_2.
> >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> >> Custom
> >> > Source -> Filter (6/6) (ac52050e60236cd1efcd262c8240cd25) switched
> from
> >> > RUNNING to CANCELING.
> >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> >> Custom
> >> > Source -> Filter (5/6) (cf4ff0c189315b27e7e2178d8c60e49f) switched
> from
> >> > RUNNING to CANCELING.
> >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> >> Custom
> >> > Source -> Filter (4/6) (8c8b07cb39a3f682f41f102e614765e6) switched
> from
> >> > RUNNING to CANCELING.
> >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> >> Custom
> >> > Source -> Filter (3/6) (34aadddbffe9f61b1916bcd1427ced96) switched
> from
> >> > RUNNING to CANCELING.
> >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
> >> Filter ->
> >> > Sink: Unnamed (6/6) (de624cf5c9d4dec6fe68d4800c701457) switched from
> >> > RUNNING to CANCELING.
> >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
> >> Filter ->
> >> > Sink: Unnamed (4/6) (e52c1e70884a6599205f9e0f5b092bc0) switched from
> >> > RUNNING to CANCELING.
> >> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
> >> Filter ->
> >> > Sink: Unnamed (5/6) 

Re: flink遇到 valueState 自身的 NPE

2020-01-09 Thread Benchao Li
hi Kevin,

邮件里面贴不了图片,如果要贴图片,需要用一些第三方的图床工具。
或者你可以直接贴文字?

Kevin Liao  于2020年1月9日周四 下午7:10写道:

> [image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]
>
> Benchao Li  于2020年1月9日周四 下午6:42写道:
>
>> hi Kevin,
>>
>> 能贴一下MyMapFunction2.java:39 这里的代码吗? 从上面的日志看不出来是valueState是null呢。
>>
>> Kevin Liao  于2020年1月9日周四 下午5:57写道:
>>
>> > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
>> >
>> > ```
>> > 2020-01-09 05:14:04.087 [flink-akka.actor.default-dispatcher-28] INFO
>> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
>> Filter ->
>> > Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b) switched from
>> > RUNNING to FAILED.
>> > java.lang.NullPointerException: null
>> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
>> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
>> > at
>> >
>> >
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>> > at
>> > org.apache.flink.streaming.runtime.io
>> >
>> .StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>> > at
>> > org.apache.flink.streaming.runtime.io
>> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>> > at
>> >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>> > at
>> >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
>> > at
>> >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> > at java.lang.Thread.run(Thread.java:748)
>> > 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28] INFO
>> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - Calculating
>> > tasks to restart to recover the failed task
>> > 90bea66de1c231edf33913ecd54406c1_2.
>> > 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28] INFO
>> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - 12 tasks
>> should
>> > be restarted to recover the failed task
>> 90bea66de1c231edf33913ecd54406c1_2.
>> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
>> Custom
>> > Source -> Filter (6/6) (ac52050e60236cd1efcd262c8240cd25) switched from
>> > RUNNING to CANCELING.
>> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
>> Custom
>> > Source -> Filter (5/6) (cf4ff0c189315b27e7e2178d8c60e49f) switched from
>> > RUNNING to CANCELING.
>> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
>> Custom
>> > Source -> Filter (4/6) (8c8b07cb39a3f682f41f102e614765e6) switched from
>> > RUNNING to CANCELING.
>> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
>> Custom
>> > Source -> Filter (3/6) (34aadddbffe9f61b1916bcd1427ced96) switched from
>> > RUNNING to CANCELING.
>> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
>> Filter ->
>> > Sink: Unnamed (6/6) (de624cf5c9d4dec6fe68d4800c701457) switched from
>> > RUNNING to CANCELING.
>> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
>> Filter ->
>> > Sink: Unnamed (4/6) (e52c1e70884a6599205f9e0f5b092bc0) switched from
>> > RUNNING to CANCELING.
>> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
>> Filter ->
>> > Sink: Unnamed (5/6) (60496dddb4bc885ee37a6025662080ad) switched from
>> > RUNNING to CANCELING.
>> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
>> Filter ->
>> > Sink: Unnamed (2/6) (bf8515b4f9e852182a5519102fe4fdf3) switched from
>> > RUNNING to CANCELING.
>> > 2020-01-09 05:14:04.090 [flink-akka.actor.default-dispatcher-28] INFO
>> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
>> Custom
>> > Source -> Filter (2/6) (bb14d5776c53babcc57edd65bf7159b0) switched from
>> > RUNNING to CANCELING.
>> > 2020-01-09 05:14:04.090 [flink-akka.actor.default-dispatcher-28] INFO
>> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
>> Custom
>> > Source -> Filter (1/6) (4c7cd6eaf5c3ca9c2b0db73e7d230a9e) switched from
>> > RUNNING to CANCELING.
>> > 2020-01-09 05:14:04.090 [flink-akka.actor.default-dispatcher-28] INFO
>> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map ->
>> Filter ->
>> > Sink: Unnamed (1/6) (4a157d98db2f2efad72035af279433ff) switched from
>> > 

Re: Flink Job claster scalability

2020-01-09 Thread David Maddison
Hi KristoffSC,

As Zhu Zhu explained, Flink does not currently auto-scale a Job as new
resources become available. Instead the Job must be stopped via a savepoint
and restarted with a new parallelism (the old rescale CLI experiment use to
perform this).

Making Flink reactive to new resources and auto scaling jobs is something
I'm currently very interested in. An approach on how to change Flink to
support this has been previously outlined/discussed in FLINK-10407 (
https://issues.apache.org/jira/browse/FLINK-10407)

/David/

On Thu, Jan 9, 2020 at 7:38 AM Zhu Zhu  wrote:

> Hi KristoffSC,
>
> Each task needs a slot to run. However, Flink enables slot sharing[1] by
> default so that one slot can host one parallel instance of each task in a
> job. That's why your job can start with 6 slots.
> However, different parallel instances of the same task cannot share a
> slot. That's why you need at least 6 slots to run your job.
>
> You can set tasks to be in different slot sharing group via
> '.slotSharingGroup(xxx)' to force certain tasks to not share slots. This
> allows the tasks to not burden each other. However, in this way the job
> will need more slots to start.
>
> So for your questions:
> #1 yes
> #2 ATM, you will need to resubmit your job with the adjusted parallelism.
> The rescale cli was experimental and was temporarily removed [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
> [2]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Temporarily-remove-support-for-job-rescaling-via-CLI-action-quot-modify-quot-td27447.html
>
> Thanks,
> Zhu Zhu
>
> KristoffSC  于2020年1月9日周四 上午1:05写道:
>
>> Hi all,
>> I must say I'm very impressed by Flink and what it can do.
>>
>> I was trying to play around with Flink operator parallelism and
>> scalability
>> and I have few questions regarding this subject.
>>
>> My setup is:
>> 1. Flink 1.9.1
>> 2. Docker Job Cluster, where each Task manager has only one task slot. I'm
>> following [1]
>> 3. env setup:
>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
>> env.setParallelism(1);
>> env.setMaxParallelism(128);
>> env.enableCheckpointing(10 * 60 * 1000);
>>
>> Please mind that I am using operator chaining here.
>>
>> My pipeline setup:
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture2.png>
>>
>>
>>
>> As you can see I have 7 operators (few of them were actually chained and
>> this is ok), with different parallelism level. This all gives me 23 tasks
>> total.
>>
>>
>> I've noticed that with "one task manager = one task slot" approach I have
>> to
>> have 6 task slots/task managers to be able to start this pipeline.
>>
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture1.png>
>>
>>
>> If number of task slots is lower than 6, job is scheduled but not
>> started.
>>
>> With 6 task slots everything is working fine and I've must say that I'm
>> very
>> impressed with a way that Flinks balanced data between task slots. Data
>> was
>> distributed very evenly between operator instances/tasks.
>>
>> In this setup (7 operators, 23 tasks and 6 task slots), some task slots
>> have
>> to be reused by more than one operator. While inspecting UI I've found
>> examples such operators. This is what I was expecting though.
>>
>> However I was surprised a little bit after I added one additional task
>> manager (hence one new task slot)
>>
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture3.png>
>>
>>
>> After adding new resources, Flink did not re balanced/redistributed the
>> graph. So this host was sitting there and doing nothing. Even after
>> putting
>> some load on the cluster, still this node was not used.
>>
>>
>> *After doing this exercise I have few questions:*
>>
>> 1. It seems that number of task slots must be equal or greater than max
>> number of parallelism used in the pipeline. In my case it was 6. When I
>> changed parallelism for one of the operator to 7, I had to have 7 task
>> slots
>> (task managers in my setup) to be able to even start the job.
>> Is this the case?
>>
>> 2. What I can do to use the extra node that was spanned while job was
>> running?
>> In other words, If I would see that one of my nodes has to much load what
>> I
>> can do? Please mind that I'm using keyBy/hashing function in my pipeline
>> and
>> in my tests I had around 5000 unique keys.
>>
>> I've try to use REST API to call "rescale" but I got this response:
>> /302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/
>>
>> Thanks.
>>
>> [1]
>>
>> https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: flink遇到 valueState 自身的 NPE

2020-01-09 Thread Kevin Liao
[image: B40C260D-DCC3-4B7D-A024-3839803C2234.png]

Benchao Li  于2020年1月9日周四 下午6:42写道:

> hi Kevin,
>
> 能贴一下MyMapFunction2.java:39 这里的代码吗? 从上面的日志看不出来是valueState是null呢。
>
> Kevin Liao  于2020年1月9日周四 下午5:57写道:
>
> > 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
> >
> > ```
> > 2020-01-09 05:14:04.087 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter
> ->
> > Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b) switched from
> > RUNNING to FAILED.
> > java.lang.NullPointerException: null
> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
> > at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
> > at
> >
> >
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> > at
> > org.apache.flink.streaming.runtime.io
> > .StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> > at
> > org.apache.flink.streaming.runtime.io
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > at java.lang.Thread.run(Thread.java:748)
> > 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28] INFO
> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - Calculating
> > tasks to restart to recover the failed task
> > 90bea66de1c231edf33913ecd54406c1_2.
> > 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28] INFO
> >  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - 12 tasks
> should
> > be restarted to recover the failed task
> 90bea66de1c231edf33913ecd54406c1_2.
> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> > Source -> Filter (6/6) (ac52050e60236cd1efcd262c8240cd25) switched from
> > RUNNING to CANCELING.
> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> > Source -> Filter (5/6) (cf4ff0c189315b27e7e2178d8c60e49f) switched from
> > RUNNING to CANCELING.
> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> > Source -> Filter (4/6) (8c8b07cb39a3f682f41f102e614765e6) switched from
> > RUNNING to CANCELING.
> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> > Source -> Filter (3/6) (34aadddbffe9f61b1916bcd1427ced96) switched from
> > RUNNING to CANCELING.
> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter
> ->
> > Sink: Unnamed (6/6) (de624cf5c9d4dec6fe68d4800c701457) switched from
> > RUNNING to CANCELING.
> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter
> ->
> > Sink: Unnamed (4/6) (e52c1e70884a6599205f9e0f5b092bc0) switched from
> > RUNNING to CANCELING.
> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter
> ->
> > Sink: Unnamed (5/6) (60496dddb4bc885ee37a6025662080ad) switched from
> > RUNNING to CANCELING.
> > 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter
> ->
> > Sink: Unnamed (2/6) (bf8515b4f9e852182a5519102fe4fdf3) switched from
> > RUNNING to CANCELING.
> > 2020-01-09 05:14:04.090 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> > Source -> Filter (2/6) (bb14d5776c53babcc57edd65bf7159b0) switched from
> > RUNNING to CANCELING.
> > 2020-01-09 05:14:04.090 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> > Source -> Filter (1/6) (4c7cd6eaf5c3ca9c2b0db73e7d230a9e) switched from
> > RUNNING to CANCELING.
> > 2020-01-09 05:14:04.090 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter
> ->
> > Sink: Unnamed (1/6) (4a157d98db2f2efad72035af279433ff) switched from
> > RUNNING to CANCELING.
> > 2020-01-09 05:14:04.096 [flink-akka.actor.default-dispatcher-28] INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> > Source -> Filter (1/6) 

Re: 流处理任务失败该如何追回之前的数据

2020-01-09 Thread Px New
rollback 后
taskManager 会去获取持久化存储的snapshot , Source 也会回放到 做CheckPoint 时的那个点上
不论你使用的是是什么时间吧 -


Dian Fu  于2019年11月14日周四 下午1:14写道:

> 如果使用的event
> time,watermark是根据event计算出来的,和系统时间没有关系,所以从最后一次checkpoint恢复即可。为什么你会觉得有问题?
>
> > 在 2019年11月13日,下午8:29,柯桂强  写道:
> >
> >
> 我现在有一个流处理任务失败了,并且保留了checkpoint或者savepoint,我希望从最后一次checkpoint恢复,但是任务使用的是事件时间,超过窗口的数据就会被丢弃,我想到一个方法是,重启之前的数据通过批处理完成然后跑流处理,想问问大家这个方案是否可行,但是感觉如何限定批处理的范围并且和之后的流处理完美拼接是一个比较难的问题
>
>


Re: flink遇到 valueState 自身的 NPE

2020-01-09 Thread Benchao Li
hi Kevin,

能贴一下MyMapFunction2.java:39 这里的代码吗? 从上面的日志看不出来是valueState是null呢。

Kevin Liao  于2020年1月9日周四 下午5:57写道:

> 早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的
>
> ```
> 2020-01-09 05:14:04.087 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter ->
> Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b) switched from
> RUNNING to FAILED.
> java.lang.NullPointerException: null
> at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
> at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
> at
>
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
> at
> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28] INFO
>  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - Calculating
> tasks to restart to recover the failed task
> 90bea66de1c231edf33913ecd54406c1_2.
> 2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28] INFO
>  o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - 12 tasks should
> be restarted to recover the failed task 90bea66de1c231edf33913ecd54406c1_2.
> 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> Source -> Filter (6/6) (ac52050e60236cd1efcd262c8240cd25) switched from
> RUNNING to CANCELING.
> 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> Source -> Filter (5/6) (cf4ff0c189315b27e7e2178d8c60e49f) switched from
> RUNNING to CANCELING.
> 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> Source -> Filter (4/6) (8c8b07cb39a3f682f41f102e614765e6) switched from
> RUNNING to CANCELING.
> 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> Source -> Filter (3/6) (34aadddbffe9f61b1916bcd1427ced96) switched from
> RUNNING to CANCELING.
> 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter ->
> Sink: Unnamed (6/6) (de624cf5c9d4dec6fe68d4800c701457) switched from
> RUNNING to CANCELING.
> 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter ->
> Sink: Unnamed (4/6) (e52c1e70884a6599205f9e0f5b092bc0) switched from
> RUNNING to CANCELING.
> 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter ->
> Sink: Unnamed (5/6) (60496dddb4bc885ee37a6025662080ad) switched from
> RUNNING to CANCELING.
> 2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter ->
> Sink: Unnamed (2/6) (bf8515b4f9e852182a5519102fe4fdf3) switched from
> RUNNING to CANCELING.
> 2020-01-09 05:14:04.090 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> Source -> Filter (2/6) (bb14d5776c53babcc57edd65bf7159b0) switched from
> RUNNING to CANCELING.
> 2020-01-09 05:14:04.090 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> Source -> Filter (1/6) (4c7cd6eaf5c3ca9c2b0db73e7d230a9e) switched from
> RUNNING to CANCELING.
> 2020-01-09 05:14:04.090 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter ->
> Sink: Unnamed (1/6) (4a157d98db2f2efad72035af279433ff) switched from
> RUNNING to CANCELING.
> 2020-01-09 05:14:04.096 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> Source -> Filter (1/6) (4c7cd6eaf5c3ca9c2b0db73e7d230a9e) switched from
> CANCELING to CANCELED.
> 2020-01-09 05:14:04.101 [flink-akka.actor.default-dispatcher-28] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> Source -> Filter (2/6) (bb14d5776c53babcc57edd65bf7159b0) switched from
> CANCELING to 

flink遇到 valueState 自身的 NPE

2020-01-09 Thread Kevin Liao
早上发现任务异常,task 在不停重启,遂查看 jm 日志,最开始的报错是这样的

```
2020-01-09 05:14:04.087 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter ->
Sink: Unnamed (3/6) (d0e6c4a05d0274c18a4a3df41ab5ff1b) switched from
RUNNING to FAILED.
java.lang.NullPointerException: null
at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:39)
at com.sogou.qidian.MyMapFunction2.map(MyMapFunction2.java:25)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28] INFO
 o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - Calculating
tasks to restart to recover the failed task
90bea66de1c231edf33913ecd54406c1_2.
2020-01-09 05:14:04.088 [flink-akka.actor.default-dispatcher-28] INFO
 o.a.f.r.e.failover.flip1.RestartPipelinedRegionStrategy  - 12 tasks should
be restarted to recover the failed task 90bea66de1c231edf33913ecd54406c1_2.
2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
Source -> Filter (6/6) (ac52050e60236cd1efcd262c8240cd25) switched from
RUNNING to CANCELING.
2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
Source -> Filter (5/6) (cf4ff0c189315b27e7e2178d8c60e49f) switched from
RUNNING to CANCELING.
2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
Source -> Filter (4/6) (8c8b07cb39a3f682f41f102e614765e6) switched from
RUNNING to CANCELING.
2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
Source -> Filter (3/6) (34aadddbffe9f61b1916bcd1427ced96) switched from
RUNNING to CANCELING.
2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter ->
Sink: Unnamed (6/6) (de624cf5c9d4dec6fe68d4800c701457) switched from
RUNNING to CANCELING.
2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter ->
Sink: Unnamed (4/6) (e52c1e70884a6599205f9e0f5b092bc0) switched from
RUNNING to CANCELING.
2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter ->
Sink: Unnamed (5/6) (60496dddb4bc885ee37a6025662080ad) switched from
RUNNING to CANCELING.
2020-01-09 05:14:04.089 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter ->
Sink: Unnamed (2/6) (bf8515b4f9e852182a5519102fe4fdf3) switched from
RUNNING to CANCELING.
2020-01-09 05:14:04.090 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
Source -> Filter (2/6) (bb14d5776c53babcc57edd65bf7159b0) switched from
RUNNING to CANCELING.
2020-01-09 05:14:04.090 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
Source -> Filter (1/6) (4c7cd6eaf5c3ca9c2b0db73e7d230a9e) switched from
RUNNING to CANCELING.
2020-01-09 05:14:04.090 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Map -> Filter ->
Sink: Unnamed (1/6) (4a157d98db2f2efad72035af279433ff) switched from
RUNNING to CANCELING.
2020-01-09 05:14:04.096 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
Source -> Filter (1/6) (4c7cd6eaf5c3ca9c2b0db73e7d230a9e) switched from
CANCELING to CANCELED.
2020-01-09 05:14:04.101 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
Source -> Filter (2/6) (bb14d5776c53babcc57edd65bf7159b0) switched from
CANCELING to CANCELED.
2020-01-09 05:14:04.103 [flink-akka.actor.default-dispatcher-28] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
Source -> Filter (5/6) (cf4ff0c189315b27e7e2178d8c60e49f) switched from
CANCELING to CANCELED.
2020-01-09 05:14:04.115 

Re: How can I find out which key group belongs to which subtask

2020-01-09 Thread Congxian Qiu
Hi

If you just want to make sure some key goes into the same subtask, does
custom key selector[1] help?

For the keygroup and subtask information, you can ref to
KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you
can ref to doc[3]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions
[2]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism

Best,
Congxian


杨东晓  于2020年1月9日周四 上午7:47写道:

> Hi , I'm trying to do some optimize about Flink 'keyby' processfunction.
> Is there any possible I can find out one key belongs to which key-group
> and essentially find out one key-group belongs to which subtask.
> The motivation I want to know that is we want to  force the data records
> from upstream still goes to same taskmanager downstream subtask .Which
> means even if we use a keyedstream function we still want no cross jvm
> communication happened during run time.
> And if we can achieve that , can we also avoid the expensive cost for
> record serialization because data is only transferred in same taskmanager
> jvm instance?
>
> Thanks.
>


When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-09 Thread ouywl







Hi all:  When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was happen as log-1,code is::input.addSink(new FlinkKafkaProducer(parameterTool.getRequired("bootstrap.servers"),parameterTool.getRequired("output-topic"),new KafkaEventDeSchema()));Log-1:2020-01-09 09:13:44,476 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1578561224466 for job d8827b3f4165b6ba27c8b59c7aa1a400.2020-01-09 09:15:33,069 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1 by task f643244ff791dbd3fbfb88bfafdf1872 of job d8827b3f4165b6ba27c8b59c7aa1a400 at ee8e6d8e92f9a59f578b1de2edd73537 @ producedata-taskmanager-d59d5cb7c-pv27j (dataPort=33361).2020-01-09 09:15:33,070 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 of job d8827b3f4165b6ba27c8b59c7aa1a400.org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: Checkpoint was declined.at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113)at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:120018 ms has passed since batch creationat org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:311)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)... 17 moreCaused by: org.apache.kafka.common.errors.TimeoutException: Expiring 58 record(s) for k8s-test-data-0:120018 ms has passed since batch creation2020-01-09 09:15:33,074 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job producer data frequece (d8827b3f4165b6ba27c8b59c7aa1a400) switched from state RUNNING to FAILING.org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)at 

Data overflow in SpillingResettableMutableObjectIterator

2020-01-09 Thread Jian Cao
Hi all:
We are using flink's iteration,and find the
SpillingResettableMutableObjectIterator has a data overflow problem if
the number of elements in a single input exceeds Integer.MAX_VALUE.

The reason is inside the SpillingResettableMutableObjectIterator, it
track the total number of elements and the number of elements
currently read with two int type fileds (elementCount and
currentElementNum), and if the number of elements exceeds
Integer.MAX_VALUE, it will overflow.

If there is an overflow, then in the next iteration, after reset the
input , the data will not be read or only part of the data will be
read.

Therefore, I suggest changing the type of these two fields of
SpillingResettableMutableObjectIterator
from int to long.

Best regards.