Re: Flink ML feature

2019-12-09 Thread vino yang
Hi Chandu,

AFAIK, there is a project named Alink[1] which is the Machine Learning
algorithm platform based on Flink, developed by the PAI team of Alibaba
computing platform. FYI

Best,
Vino

[1]: https://github.com/alibaba/Alink

Tom Blackwood  于2019年12月10日周二 下午2:07写道:

> You may try Spark ML, which is a production ready library for ML stuff.
>
> regards.
>
> On Tue, Dec 10, 2019 at 1:04 PM chandu soa  wrote:
>
>> Hello Community,
>>
>> Can you please give me some pointers for implementing Machine Learning
>> using Flink.
>>
>> I see Flink ML libraries were dropped in v1.9. It looks like ML feature
>> in Flink going to be enhanced.
>>
>> What is the recommended approach for implementing production grade ML
>> based apps using Flink? v1.9 is ok?or should wait for 1.10?
>>
>> Thanks,
>> Chandu
>>
>


Re: Flink ML feature

2019-12-09 Thread Tom Blackwood
You may try Spark ML, which is a production ready library for ML stuff.

regards.

On Tue, Dec 10, 2019 at 1:04 PM chandu soa  wrote:

> Hello Community,
>
> Can you please give me some pointers for implementing Machine Learning
> using Flink.
>
> I see Flink ML libraries were dropped in v1.9. It looks like ML feature in
> Flink going to be enhanced.
>
> What is the recommended approach for implementing production grade ML
> based apps using Flink? v1.9 is ok?or should wait for 1.10?
>
> Thanks,
> Chandu
>


Flink SQL Kafka topic DDL ,the kafka' json field conflict with flink SQL Keywords

2019-12-09 Thread LakeShen
Hi community, when I use Flink SQL DDL ,the kafka' json field conflict with
flink SQL Keywords,my thought is that using the UDTF to solve it . Is there
graceful way to solve this problem?


Re: SQL for Avro GenericRecords on Parquet

2019-12-09 Thread Peter Huang
Hi Hanan,

I created a fix for the problem. Would you please try it from your side?
https://github.com/apache/flink/pull/10371


Best Regards
Peter Huang

On Tue, Nov 26, 2019 at 8:07 AM Peter Huang 
wrote:

> Hi Hanan,
>
> After investigating the issue by using the test case you provided, I think
> there is a big in it. Currently, the parquet predicts push down use the
> predicate literal type to construct the FilterPredicate.
> The issue happens when the data type of value in predicate inferred from
> SQL doesn't match the parquet schema. For example, foo is a long type, foo
> < 1 is the predicate. Literal will be recognized as an integration. It
> causes the parquet FilterPredicate is mistakenly created for the column of
> Integer type. I created a ticket for the issue.
> https://issues.apache.org/jira/browse/FLINK-14953. Please also add more
> insight by comment directly on it.
>
>
> Best Regards
> Peter Huang
>
> On Mon, Nov 18, 2019 at 12:40 PM Hanan Yehudai 
> wrote:
>
>> HI Peter.  Thanks.
>>
>> This is my code .  I used one of the parquet / avro tests as a reference.
>>
>>
>>
>> The code will fail on
>>
>> *Test testScan(ParquetTestCase) failed with:*
>>
>> *java.lang.UnsupportedOperationException*
>>
>> *   at
>> org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)*
>>
>> *   at
>> org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)*
>>
>> *   at
>> org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)*
>>
>>
>>
>>
>>
>> CODE :
>>
>>
>>
>> import org.apache.avro.Schema;
>>
>> import org.apache.avro.generic.GenericRecord;
>>
>> import org.apache.avro.generic.GenericRecordBuilder;
>>
>> import org.apache.avro.specific.SpecificRecord;
>>
>> import org.apache.avro.specific.SpecificRecordBuilderBase;
>>
>> import org.apache.flink.api.common.typeinfo.Types;
>>
>> import org.apache.flink.api.java.DataSet;
>>
>> import org.apache.flink.api.java.ExecutionEnvironment;
>>
>> import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
>>
>> import org.apache.flink.api.java.io.TupleCsvInputFormat;
>>
>> import org.apache.flink.api.java.tuple.Tuple;
>>
>> import org.apache.flink.core.fs.FileSystem;
>>
>> import org.apache.flink.core.fs.Path;
>>
>>
>>
>> import org.apache.flink.formats.parquet.ParquetTableSource;
>>
>> import org.apache.flink.streaming.api.datastream.DataStream;
>>
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>
>> import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
>>
>> import org.apache.flink.table.api.Table;
>>
>> import org.apache.flink.table.api.TableEnvironment;
>>
>> import org.apache.flink.table.api.java.BatchTableEnvironment;
>>
>>
>>
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>
>> import org.apache.flink.table.sinks.CsvTableSink;
>>
>> import org.apache.flink.table.sinks.TableSink;
>>
>> import org.apache.flink.test.util.MultipleProgramsTestBase;
>>
>> import org.apache.flink.types.Row;
>>
>>
>>
>> import org.apache.avro.generic.IndexedRecord;
>>
>> import org.apache.parquet.avro.AvroSchemaConverter;
>>
>> import org.apache.parquet.schema.MessageType;
>>
>> import org.junit.BeforeClass;
>>
>> import org.junit.ClassRule;
>>
>> import org.junit.Test;
>>
>> import org.junit.rules.TemporaryFolder;
>>
>>
>>
>> import java.io.IOException;
>>
>> import java.util.ArrayList;
>>
>> import java.util.List;
>>
>> import java.util.UUID;
>>
>>
>>
>> import static org.junit.Assert.assertEquals;
>>
>>
>>
>> import org.apache.parquet.avro.AvroParquetWriter;
>>
>> import org.apache.parquet.hadoop.ParquetWriter;
>>
>>
>>
>>
>>
>> public class  ParquetTestCase extends MultipleProgramsTestBase {
>>
>>
>>
>> private static String avroSchema = "{\n" +
>>
>> "  \"name\": \"SimpleRecord\",\n" +
>>
>> "  \"type\": \"record\",\n" +
>>
>> "  \"fields\": [\n" +
>>
>> "{ \"default\": null, \"name\": \"timestamp_edr\",
>> \"type\": [ \"null\", \"long\" ]},\n" +
>>
>> "{ \"default\": null, \"name\": \"id\", \"type\": [
>> \"null\", \"long\" ]},\n" +
>>
>> "{ \"default\": null, \"name\": \"recordType_\",
>> \"type\": [ \"null\", \"string\"]}\n" +
>>
>> "  ],\n" +
>>
>> "  \"schema_id\": 1,\n" +
>>
>> "  \"type\": \"record\"\n" +
>>
>> "}";
>>
>>
>>
>> private static final AvroSchemaConverter SCHEMA_CONVERTER = new
>> AvroSchemaConverter();
>>
>> private static Schema schm = new Schema.Parser().parse(avroSchema);
>>
>> private static Path testPath;
>>
>>
>>
>>
>>
>> public ParquetTestCase() {
>>
>> super(TestExecutionMode.COLLECTION);
>>
>> }
>>
>>
>>
>>
>>
>> @BeforeClass
>>
>> public static void setup() throws Exception {
>>
>>
>>
>> 

Flink ML feature

2019-12-09 Thread chandu soa
Hello Community,

Can you please give me some pointers for implementing Machine Learning
using Flink.

I see Flink ML libraries were dropped in v1.9. It looks like ML feature in
Flink going to be enhanced.

What is the recommended approach for implementing production grade ML based
apps using Flink? v1.9 is ok?or should wait for 1.10?

Thanks,
Chandu


Re: Emit intermediate accumulator state of a session window

2019-12-09 Thread chandu soa
Thank you all for your responses.

I've created a custom trigger similar to flink provided EventTimeTrigger,
with few changes. Fire event on onElement(), and do not fire event on
onEventTime() to satisfy my requirement - whenever new event arrives fire
incremental result(result of AggregateFunction#add()) immediately. Find
below changed code block.

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow
window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.*FIRE*; // instead of CONTINUE
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.*CONTINUE* : // instead of FIRE
TriggerResult.CONTINUE;
}

Thanks,
Chandu


On Sun, Dec 8, 2019 at 8:22 AM Rafi Aroch  wrote:

> Hi Chandu,
>
> Maybe you can use a custom trigger:
> * .trigger(**ContinuousEventTimeTrigger.of(Time.minutes(15)))*
>
> This would continuously trigger your aggregate every period of time.
>
> Thanks,
> Rafi
>
>
> On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin 
> wrote:
>
>> Hi Chandu,
>>
>> I am not sure whether using the windowing API is helpful in this case at
>> all.
>>
>> At least, you could try to consume the data not only by windowing but
>> also by a custom stateful function.
>> You look into the AggregatingState [1]. Then you could do whatever you
>> want with the current aggregated value.
>> If you still need to do something with the result of windowing, you could
>> do it as now or simulate it with timers [2] in that same stateful function.
>>
>> Best,
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example
>>
>> On Tue, Dec 3, 2019 at 12:21 AM chandu soa  wrote:
>>
>>> *Emit intermediate accumulator(AggregateFunction ACC value) state of a
>>> session window when new event arrives*
>>>
>>>
>>>
>>> AggregateFunction#getResults() is called only when window completes. My
>>> need is emit intermediate accumulator values(result of
>>> AggregateFunction#add()) as well and write them to Sink. Both
>>> AggregateFunction#getResult() and ProcessWindowFunction() provides
>>> aggregated result, only when the window is closed.
>>>
>>> *Any thoughts please, how to emit or stream intermediate accumulator
>>> state as soon as new event arrive when window is open? Need to implement
>>> custom trigger or Assigner?*
>>>
>>>
>>>
>>> To give you some background, when user watches a video we get events -
>>> when clicked, thereafter every ~ 15minutes, and finally when user close the
>>> video.
>>>
>>> I need to aggregate them as soon as they arrive and post it to
>>> destination. For example, if user watching a two-hour movie I get events
>>> for 15 min interval(0,15,30,...,120), whenever I get a event need to
>>> aggregate watched percentage so far and write it to sink(0%, 12.5%,
>>> 25%,...,100%). The below implementation emitting(getResult()) a single
>>> event 20 minutes after watching a video.
>>>
>>>
>>>
>>>
>>>
>>> .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*)
>>>
>>>
>>> .aggregate(new EventAggregator())
>>>
>>>
>>> .filter(new FinalFilter())
>>>
>>>
>>> .addSink(...)
>>>
>>>
>>> Appreciate your help.
>>>
>>>
>>> Thanks,
>>>
>>> chandu
>>>
>>


Re: Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-09 Thread JingsongLee
Hi hjxhainan,

如果你要取消订阅。
请发送邮件到user-zh-unsubscr...@flink.apache.org

Best,
Jingsong Lee


--
From:hjxhai...@163.com 
Send Time:2019年12月10日(星期二) 10:52
To:user-zh ; JingsongLee ; 
陈帅 
Subject:Re: Re: Flink实时数仓落Hive一般用哪种方式好?


怎么退出邮件订阅




hjxhai...@163.com 
发件人: JingsongLee
发送时间: 2019-12-10 10:48
收件人: 陈帅; user-zh@flink.apache.org
主题: Re: Flink实时数仓落Hive一般用哪种方式好?
Hi 陈帅,
1.BulkWriter.Factory接口不适合ORC, 正如yue ma所说,你需要一些改动
2.StreamingFileSink整个机制都是基于做checkpoint才会真正move文件的,不知道你所想的streaming写是什么,以及对你的业务场景有什么要求吗?
Best,
Jingsong Lee
--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:21
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink实时数仓落Hive一般用哪种方式好?
1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢?
2. BulkWriter是不是攒微批写文件的?
JingsongLee  于2019年12月9日周一 下午3:24写道:
Hi 帅,
 - 目前可以通过改写StreamingFileSink的方式来支持Parquet。
 (但是目前StreamingFileSink支持ORC比较难)
 - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
 - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。
 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]
 [1] https://issues.apache.org/jira/browse/FLINK-14249
 Best,
 Jingsong Lee
 --
 From:陈帅 
 Send Time:2019年12月8日(星期日) 10:04
To:user-zh@flink.apache.org 
 Subject:Flink实时数仓落Hive一般用哪种方式好?
 有人说直接写到HBase,再在Hive关联Hbase表
 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:
 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
 写的话,目前来看没有现成的Streaming
 Writer,官方提供的都是
 BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
 业务上的Update和Delete操作 数据一般是如何sync进Hive的?
 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?  

?????? Flink??????????Hive??????????????????

2019-12-09 Thread hiliuxg
https://github.com/hiliuxg/flink-orc-sink hdfs orcexectly 
once




----
??: "jingwen jingwen"https://issues.apache.org/jira/browse/FLINK-14249
 
  Best,
  Jingsong Lee
 
 
  --
  From: 

Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-09 Thread jingwen jingwen
hbase具备很好的支持随机,实时读取/写入的性能支持,同流计算引擎结合是一个比较好的选择
实时计算直接hdfs,会存在很多性能问题

陈帅  于2019年12月10日周二 上午8:21写道:

> 1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢?
> 2. BulkWriter是不是攒微批写文件的?
>
> JingsongLee  于2019年12月9日周一 下午3:24写道:
>
> > Hi 帅,
> > - 目前可以通过改写StreamingFileSink的方式来支持Parquet。
> > (但是目前StreamingFileSink支持ORC比较难)
> > - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
> > - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。
> >
> > 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-14249
> >
> > Best,
> > Jingsong Lee
> >
> >
> > --
> > From:陈帅 
> > Send Time:2019年12月8日(星期日) 10:04
> > To:user-zh@flink.apache.org 
> > Subject:Flink实时数仓落Hive一般用哪种方式好?
> >
> > 有人说直接写到HBase,再在Hive关联Hbase表
> > 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:
> >
> > 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
> > 写的话,目前来看没有现成的Streaming
> > Writer,官方提供的都是
> > BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
> > 业务上的Update和Delete操作 数据一般是如何sync进Hive的?
> >
> > 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?
> >
>


Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-09 Thread Yang Wang
Hi Li Peng,

You are running standalone session cluster or per-job cluster on
kubernetes. Right?
If so, i think you need to check your log4j.properties in the image, not
local. The log is
stored to /opt/flink/log/jobmanager.log by default.

If you are running active Kubernetes integration for a fresh taste. The
following cli option
could be used to remove the redirect.
-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem%
%jvmopts% %logging% %class% %args%"

Best,
Yang

vino yang  于2019年12月10日周二 上午10:55写道:

> Hi Li,
>
> A potential reason could be conflicting logging frameworks. Can you share
> the log in your .out file and let us know if the print format of the log is
> the same as the configuration file you gave.
>
> Best,
> Vino
>
> Li Peng  于2019年12月10日周二 上午10:09写道:
>
>> Hey folks, I noticed that my kubernetes flink logs (reached via *kubectl
>> logs *) completely ignore any of the configurations I put into
>> /flink/conf/. I set the logger level to WARN, yet I still see INFO level
>> logging from flink loggers
>> like org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made
>> copied the same properties to /flink/conf/log4j-console.properties
>> and log4j-cli.properties.
>>
>> From what I can tell, kubernetes just listens to stdout and stderr, so
>> shouldn't the log4j.properties control output to them? Anyone seen this
>> issue before?
>>
>> Here is my log4j.properties:
>>
>>
>> # This affects logging for both user code and Flink
>> log4j.rootLogger=WARN, file, console, stdout
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>> log4j.logger.org.apache.flink=WARN
>>
>> # The following lines keep the log level of common libraries/connectors on
>> # log level INFO. The root logger does not override this. You have to 
>> manually
>> # change the log levels here.
>> log4j.logger.akka=INFO
>> log4j.logger.org.apache.kafka=INFO
>> log4j.logger.org.apache.hadoop=INFO
>> log4j.logger.org.apache.zookeeper=INFO
>>
>> # Log all infos in the given file
>> log4j.appender.file=org.apache.log4j.FileAppender
>> log4j.appender.file.file=${log.file}
>> log4j.appender.file.append=false
>> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
>> %-5p %-60c %x - %m%n
>>
>> # Log all infos to the console
>> log4j.appender.console=org.apache.log4j.ConsoleAppender
>> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>> log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
>> %-5p %-60c %x - %m%n
>>
>> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
>> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
>>  file, console
>> log4j.logger.org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction=WARN
>> log4j.logger.org.apache.flink.runtime.checkpoint=WARN
>>
>> Thanks,
>> Li
>>
>


Re: Apache Flink - Retries for async processing

2019-12-09 Thread Jingsong Li
Hi M Singh,

Our internal has this scenario too, as far as I know, Flink does not have
this internal mechanism in 1.9 too.
I can share my solution:
- In async function, start a thread factory.
- Send the call to thread factory when this call has failed. Do refresh
security token too.
Actually, deal with anything in function. As long as we finally call the
relevant methods of ResultFuture.

Best,
Jingsong Lee

On Tue, Dec 10, 2019 at 3:25 AM M Singh  wrote:

> Hi Folks:
>
> I am working on a project where I will be using Flink's async processing
> capabilities.  The job has to make http request using a token.  The token
> expires periodically and needs to be refreshed.
>
> So, I was looking for patterns for handling async call failures and
> retries when the token expires.  I found this link Re: Backoff strategies
> for async IO functions?
> 
>  and
> it appears that Flink does not support retries and periodically refresh a
> security token.  I am using 1.6 at the moment but am planning to migrate to
> 1.9 soon.
>
> Re: Backoff strategies for async IO functions?
>
>
> 
>
>
> If there are any patterns on how to deal with this scenario, please let me
> know.
>
> Thanks
>
> Mans
>
>

-- 
Best, Jingsong Lee


HDFS_DELEGATION_TOKEN????????????

2019-12-09 Thread hss



hadoop??Kerberos Flink on Yarn 
??Per-jobHDFS_DELEGATION_TOKEN?? 
checkpoint ??


2019-12-02 00:00:00.283 ERROR org.apache.flink.yarn.YarnResourceManager  
 - Could not 
start TaskManager in container container_e39_1563434037485_0606_01_552751.
 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (token for BDATA_UME_ADM: HDFS_DELEGATION_TOKEN 
owner=bdata_ume_...@travelsky.bdp.com, renewer=yarn, realUser=, 
issueDate=1574414126899, maxDate=1575018926899, sequenceNumber=800, 
masterKeyId=225) can't be found in cache

Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-09 Thread Yun Tang
Hi Peng

What kind of deployment of K8s did you try in flink-doc[1], if using session 
mode, you can control your log4j configuration via configmap [2]. From my 
experience, this could control the log4j well.

If you did not override the command of flink docker, it will start-foreground 
the taskmanager.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html#session-cluster-resource-definitions

Best
Yun Tang

From: Li Peng 
Date: Tuesday, December 10, 2019 at 10:09 AM
To: user 
Subject: Flink on Kubernetes seems to ignore log4j.properties

Hey folks, I noticed that my kubernetes flink logs (reached via kubectl logs 
) completely ignore any of the configurations I put into 
/flink/conf/. I set the logger level to WARN, yet I still see INFO level 
logging from flink loggers like 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made copied 
the same properties to /flink/conf/log4j-console.properties and 
log4j-cli.properties.

From what I can tell, kubernetes just listens to stdout and stderr, so 
shouldn't the log4j.properties control output to them? Anyone seen this issue 
before?

Here is my log4j.properties:

# This affects logging for both user code and Flink
log4j.rootLogger=WARN, file, console, stdout

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=WARN

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
%-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
 file, console
log4j.logger.org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction=WARN
log4j.logger.org.apache.flink.runtime.checkpoint=WARN
Thanks,
Li


Re: StreamingFileSink doesn't close multipart uploads to s3?

2019-12-09 Thread Jingsong Li
Hi Kostas,

I  took a look to StreamingFileSink.close, it just delete all temporary
files. I know it is for failover. When Job fail, it should just delete temp
files for next restart.
But for testing purposes, we just want to run a bounded streaming job. If
there is no checkpoint trigger, no one will move the final temp files to
output path, so the result of this job is wrong.
Do you have any idea about this? Can we distinguish "fail close" from
"success finish close" in StreamingFileSink?

Best,
Jingsong Lee

On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas  wrote:

> Hi Li,
>
> This is the expected behavior. All the "exactly-once" sinks in Flink
> require checkpointing to be enabled.
> We will update the documentation to be clearer in the upcoming release.
>
> Thanks a lot,
> Kostas
>
> On Sat, Dec 7, 2019 at 3:47 AM Li Peng  wrote:
> >
> > Ok I seem to have solved the issue by enabling checkpointing. Based on
> the docs (I'm using 1.9.0), it seemed like only
> StreamingFileSink.forBulkFormat() should've required checkpointing, but
> based on this experience, StreamingFileSink.forRowFormat() requires it too!
> Is this the intended behavior? If so, the docs should probably be updated.
> >
> > Thanks,
> > Li
> >
> > On Fri, Dec 6, 2019 at 2:01 PM Li Peng  wrote:
> >>
> >> Hey folks, I'm trying to get StreamingFileSink to write to s3 every
> minute, with flink-s3-fs-hadoop, and based on the default rolling policy,
> which is configured to "roll" every 60 seconds, I thought that would be
> automatic (I interpreted rolling to mean actually close a multipart upload
> to s3).
> >>
> >> But I'm not actually seeing files written to s3 at all, instead I see a
> bunch of open multipart uploads when I check the AWS s3 console, for
> example:
> >>
> >>  "Uploads": [
> >> {
> >> "Initiated": "2019-12-06T20:57:47.000Z",
> >> "Key": "2019-12-06--20/part-0-0"
> >> },
> >> {
> >> "Initiated": "2019-12-06T20:57:47.000Z",
> >> "Key": "2019-12-06--20/part-1-0"
> >> },
> >> {
> >> "Initiated": "2019-12-06T21:03:12.000Z",
> >> "Key": "2019-12-06--21/part-0-1"
> >> },
> >> {
> >> "Initiated": "2019-12-06T21:04:15.000Z",
> >> "Key": "2019-12-06--21/part-0-2"
> >> },
> >> {
> >> "Initiated": "2019-12-06T21:22:23.000Z"
> >> "Key": "2019-12-06--21/part-0-3"
> >> }
> >> ]
> >>
> >> And these uploads are being open for a long time. So far after an hour,
> none of the uploads have been closed. Is this the expected behavior? If I
> wanted to get these uploads to actually write to s3 quickly, do I need to
> configure the hadoop stuff to get that done, like setting a smaller
> buffer/partition size to force it to upload?
> >>
> >> Thanks,
> >> Li
>


-- 
Best, Jingsong Lee


Re: Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-09 Thread hjxhai...@163.com
怎么退出邮件订阅




hjxhai...@163.com
 
发件人: JingsongLee
发送时间: 2019-12-10 10:48
收件人: 陈帅; user-zh@flink.apache.org
主题: Re: Flink实时数仓落Hive一般用哪种方式好?
Hi 陈帅,
 
1.BulkWriter.Factory接口不适合ORC, 正如yue ma所说,你需要一些改动
2.StreamingFileSink整个机制都是基于做checkpoint才会真正move文件的,不知道你所想的streaming写是什么,以及对你的业务场景有什么要求吗?
 
Best,
Jingsong Lee
 
 
--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:21
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink实时数仓落Hive一般用哪种方式好?
 
1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢?
2. BulkWriter是不是攒微批写文件的?
JingsongLee  于2019年12月9日周一 下午3:24写道:
Hi 帅,
- 目前可以通过改写StreamingFileSink的方式来支持Parquet。
(但是目前StreamingFileSink支持ORC比较难)
- BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
- 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。
 
在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]
 
[1] https://issues.apache.org/jira/browse/FLINK-14249
 
Best,
Jingsong Lee
 
 
--
From:陈帅 
Send Time:2019年12月8日(星期日) 10:04
To:user-zh@flink.apache.org 
Subject:Flink实时数仓落Hive一般用哪种方式好?
 
有人说直接写到HBase,再在Hive关联Hbase表
但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:
 
1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
写的话,目前来看没有现成的Streaming
Writer,官方提供的都是
BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
业务上的Update和Delete操作 数据一般是如何sync进Hive的?
 
2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?


Re: Flink RetractStream如何转成AppendStream?

2019-12-09 Thread JingsongLee
参考下lucas.wu的例子?

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:25
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink RetractStream如何转成AppendStream?

"你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。"
==>我想知道通过Flink SQL方式要如何实现这种转换?
JingsongLee  于2019年12月9日周一 下午3:17写道:
Hi 帅,

 你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。

 Best,
 Jingsong Lee


 --
 From:Jark Wu 
 Send Time:2019年12月8日(星期日) 11:54
 To:user-zh 
 Subject:Re: Flink RetractStream如何转成AppendStream?

 Hi,

 目前 Kafka  只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持
 RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。

 Best,
 Jark

 On Sun, 8 Dec 2019 at 10:08, 陈帅  wrote:

 > 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka
 >
 > sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
 >


Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-09 Thread JingsongLee
Hi 陈帅,

1.BulkWriter.Factory接口不适合ORC, 正如yue ma所说,你需要一些改动
2.StreamingFileSink整个机制都是基于做checkpoint才会真正move文件的,不知道你所想的streaming写是什么,以及对你的业务场景有什么要求吗?

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:21
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink实时数仓落Hive一般用哪种方式好?

1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢?
2. BulkWriter是不是攒微批写文件的?
JingsongLee  于2019年12月9日周一 下午3:24写道:
Hi 帅,
 - 目前可以通过改写StreamingFileSink的方式来支持Parquet。
 (但是目前StreamingFileSink支持ORC比较难)
 - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
 - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。

 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]

 [1] https://issues.apache.org/jira/browse/FLINK-14249

 Best,
 Jingsong Lee


 --
 From:陈帅 
 Send Time:2019年12月8日(星期日) 10:04
To:user-zh@flink.apache.org 
 Subject:Flink实时数仓落Hive一般用哪种方式好?

 有人说直接写到HBase,再在Hive关联Hbase表
 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:

 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
 写的话,目前来看没有现成的Streaming
 Writer,官方提供的都是
 BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
 业务上的Update和Delete操作 数据一般是如何sync进Hive的?

 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?


回复:yarn-session模式通过python api消费kafka数据报错

2019-12-09 Thread 改改
hi wei zhang,
非常感谢,终于跑起来了,感谢你这么耐心的指导初学者。
我当时从编译的源码中拷贝的时flink-sql-connector-kafka 
而不是flink-sql-connector-kafka-0.11,所以版本不必配。
再次感谢,祝工作顺利。
--
发件人:Wei Zhong 
发送时间:2019年12月10日(星期二) 10:23
收件人:改改 
抄 送:user-zh 
主 题:Re: yarn-session模式通过python api消费kafka数据报错

Hi 改改,

看现在的报错,可能是kafka版本不匹配,你需要放入lib目录的kafka connector 
需要是0.11版本的,即flink-sql-connector-kafka-0.11_2.11-1.9.1.jar


在 2019年12月10日,10:06,改改  写道:
HI Wei Zhong ,
 感谢您的回复,flink的lib目录下已经放了kafka connector的jar包的,我的flink/lib目录下文件目录如下:

 <5600791664319709.png>

另外我的集群环境如下:
 java :1.8.0_231
 flink: 1.9.1
 Python 3.6.9
 Hadoop 3.1.1.3.1.4.0-315

昨天试了下用python3.6 执行,依然是报错的,报错如下:

[root@hdp02 data_team_workspace]# /opt/flink-1.9.1/bin/flink run -py 
tumble_window.py
Starting execution of program
Traceback (most recent call last):
  File 
"/tmp/pyflink/3fb6ccfd-482f-4426-859a-ebe003e14769/pyflink.zip/pyflink/util/exceptions.py",
 line 147, in deco
  File 
"/tmp/pyflink/3fb6ccfd-482f-4426-859a-ebe003e14769/py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
o42.registerTableSource.
: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
 at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
 at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
 at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: No context matches.
The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=hdp03:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=hdp02:6667
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=user_01
connector.type=kafka
connector.version=0.11
format.fail-on-missing-field=true
format.json-schema={  type: 'object',  properties: {col1: {  type: 
'string'},col2: {  type: 'string'},col3: {  type: 
'string'},time: {  type: 'string',  format: 'date-time'}  }}
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=time
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=6
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=col1
schema.1.type=VARCHAR
schema.2.name=col2
schema.2.type=VARCHAR
schema.3.name=col3
schema.3.type=VARCHAR
update-mode=append
The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.formats.csv.CsvRowFormatFactory
org.apache.flink.addons.hbase.HBaseTableFactory
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
 at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
 at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
 at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
▽
 at 

Re: yarn-session模式通过python api消费kafka数据报错

2019-12-09 Thread Wei Zhong
Hi 改改,

看现在的报错,可能是kafka版本不匹配,你需要放入lib目录的kafka connector 
需要是0.11版本的,即flink-sql-connector-kafka-0.11_2.11-1.9.1.jar

> 在 2019年12月10日,10:06,改改  写道:
> 
> HI Wei Zhong ,
>  感谢您的回复,flink的lib目录下已经放了kafka connector的jar包的,我的flink/lib目录下文件目录如下:
>  
>  <5600791664319709.png>
> 
> 另外我的集群环境如下:
>  java :1.8.0_231
>  flink: 1.9.1
>  Python 3.6.9
>  Hadoop 3.1.1.3.1.4.0-315
> 
> 昨天试了下用python3.6 执行,依然是报错的,报错如下:
> 
> [root@hdp02 data_team_workspace]# /opt/flink-1.9.1/bin/flink run -py 
> tumble_window.py
> Starting execution of program
> Traceback (most recent call last):
>   File 
> "/tmp/pyflink/3fb6ccfd-482f-4426-859a-ebe003e14769/pyflink.zip/pyflink/util/exceptions.py",
>  line 147, in deco
>   File 
> "/tmp/pyflink/3fb6ccfd-482f-4426-859a-ebe003e14769/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>  line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o42.registerTableSource.
> : org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
>  at 
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>  at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>  at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>  at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>  at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>  at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> Reason: No context matches.
> The following properties are requested:
> connector.properties.0.key=zookeeper.connect
> connector.properties.0.value=hdp03:2181
> connector.properties.1.key=bootstrap.servers
> connector.properties.1.value=hdp02:6667
> connector.property-version=1
> connector.startup-mode=earliest-offset
> connector.topic=user_01
> connector.type=kafka
> connector.version=0.11
> format.fail-on-missing-field=true
> format.json-schema={  type: 'object',  properties: {col1: {  type: 
> 'string'},col2: {  type: 'string'},col3: {  type: 
> 'string'},time: {  type: 'string',  format: 'date-time'}  
> }}
> format.property-version=1
> format.type=json
> schema.0.name=rowtime
> schema.0.rowtime.timestamps.from=time
> schema.0.rowtime.timestamps.type=from-field
> schema.0.rowtime.watermarks.delay=6
> schema.0.rowtime.watermarks.type=periodic-bounded
> schema.0.type=TIMESTAMP
> schema.1.name=col1
> schema.1.type=VARCHAR
> schema.2.name=col2
> schema.2.type=VARCHAR
> schema.3.name=col3
> schema.3.type=VARCHAR
> update-mode=append
> The following factories have been considered:
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.formats.csv.CsvRowFormatFactory
> org.apache.flink.addons.hbase.HBaseTableFactory
> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> org.apache.flink.formats.json.JsonRowFormatFactory
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> org.apache.flink.table.planner.StreamPlannerFactory
> org.apache.flink.table.executor.StreamExecutorFactory
> org.apache.flink.table.planner.delegation.BlinkPlannerFactory
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
>  at 
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
>  at 
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
>  at 
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
> ▽
>  at 
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
>  at 
> 

Re: Re: Flink State 过期清除 TTL 问题

2019-12-09 Thread wangl...@geekplus.com.cn
Hi 唐云,

我的集群已经升到了 1.8.2,  cleanupFullSnapshot 和 cleanupInRocksdbCompactFilter 都试验了下。
但 cancel -s 停止后, 生成的 savepoint 目录还是没有变小。过程是这样的:

cancel -s 停止,savepoint 目录大小为 100M
代码变更,把原来的 setUpdateType 变为 cleanupFullSnapshot 
新的代码从 1 的 savepoint 目录恢复
新的代码运行一天左右,再 cancel -s, 新的 savepoint 目录变大

会不会是 每次 flink run -s ,从已有 savepoint 目录中恢复的数据所有的 updateTime 都变成了当前时间? 

谢谢,
王磊



wangl...@geekplus.com.cn
 
Sender: Yun Tang
Send Time: 2019-11-01 01:38
Receiver: user-zh@flink.apache.org
Subject: Re: Flink State 过期清除 TTL 问题
Hi 王磊
 
从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 
cleanupFullSnapshot,这样你在执行full 
snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。
 
另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2]
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state
[2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv
 
祝好
唐云
 
 
On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn"  
wrote:
 
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊



wangl...@geekplus.com.cn

 


回复:yarn-session模式通过python api消费kafka数据报错

2019-12-09 Thread 改改
HI Wei Zhong ,
 感谢您的回复,flink的lib目录下已经放了kafka connector的jar包的,我的flink/lib目录下文件目录如下:

 

另外我的集群环境如下:
 java :1.8.0_231
 flink: 1.9.1
 Python 3.6.9
 Hadoop 3.1.1.3.1.4.0-315

昨天试了下用python3.6 执行,依然是报错的,报错如下:

[root@hdp02 data_team_workspace]# /opt/flink-1.9.1/bin/flink run -py 
tumble_window.py
Starting execution of program
Traceback (most recent call last):
  File 
"/tmp/pyflink/3fb6ccfd-482f-4426-859a-ebe003e14769/pyflink.zip/pyflink/util/exceptions.py",
 line 147, in deco
  File 
"/tmp/pyflink/3fb6ccfd-482f-4426-859a-ebe003e14769/py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
o42.registerTableSource.
: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
 at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
 at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
 at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: No context matches.
The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=hdp03:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=hdp02:6667
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=user_01
connector.type=kafka
connector.version=0.11
format.fail-on-missing-field=true
format.json-schema={  type: 'object',  properties: {col1: {  type: 
'string'},col2: {  type: 'string'},col3: {  type: 
'string'},time: {  type: 'string',  format: 'date-time'}  }}
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=time
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=6
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=col1
schema.1.type=VARCHAR
schema.2.name=col2
schema.2.type=VARCHAR
schema.3.name=col3
schema.3.type=VARCHAR
update-mode=append
The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.formats.csv.CsvRowFormatFactory
org.apache.flink.addons.hbase.HBaseTableFactory
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
 at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
 at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
 at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
▽
 at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
 at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
 ... 13 more
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/python3.6/lib/python3.6/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
  File "/usr/python3.6/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
  File 

Re: yarn-session模式通过python api消费kafka数据报错

2019-12-09 Thread Wei Zhong
Hi 改改,

只看这个报错的话信息量太少不能确定,不过一个可能性比较大的原因是kafka 
connector的jar包没有放到lib目录下,能否检查一下你的flink的lib目录下是否存在kafka connector的jar包?

> 在 2019年12月6日,14:36,改改  写道:
> 
> 
> [root@hdp02 bin]# ./flink run -yid application_1575352295616_0014 -py 
> /opt/tumble_window.py
> 2019-12-06 14:15:48,262 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - Found Yarn properties file under /tmp/.yarn-properties-root.
> 2019-12-06 14:15:48,262 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - Found Yarn properties file under /tmp/.yarn-properties-root.
> 2019-12-06 14:15:48,816 INFO  org.apache.hadoop.yarn.client.RMProxy   
>   - Connecting to ResourceManager at 
> hdp02.wuagecluster/10.2.19.32:8050
> 2019-12-06 14:15:48,964 INFO  org.apache.hadoop.yarn.client.AHSProxy  
>   - Connecting to Application History server at 
> hdp03.wuagecluster/10.2.19.33:10200
> 2019-12-06 14:15:48,973 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-12-06 14:15:48,973 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-12-06 14:15:49,101 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Found 
> application JobManager host name 'hdp07.wuagecluster' and port '46376' from 
> supplied application id 'application_1575352295616_0014'
> Starting execution of program
> Traceback (most recent call last):
>  File "/usr/lib64/python2.7/runpy.py", line 162, in _run_module_as_main
>"__main__", fname, loader, pkg_name)
>  File "/usr/lib64/python2.7/runpy.py", line 72, in _run_code
>exec code in run_globals
>  File "/tmp/pyflink/b9a29ae4-89ac-4289-9111-5f77ad90d386/tumble_window.py", 
> line 62, in 
>.register_table_source("source")
>  File 
> "/tmp/pyflink/b9a29ae4-89ac-4289-9111-5f77ad90d386/pyflink.zip/pyflink/table/descriptors.py",
>  line 1293, in register_table_source
>  File 
> "/tmp/pyflink/b9a29ae4-89ac-4289-9111-5f77ad90d386/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1286, in __call__
>  File 
> "/tmp/pyflink/b9a29ae4-89ac-4289-9111-5f77ad90d386/pyflink.zip/pyflink/util/exceptions.py",
>  line 154, in deco
> pyflink.util.exceptions.TableException: u'findAndCreateTableSource failed.'
> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:83)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)



Re: Flink RetractStream如何转成AppendStream?

2019-12-09 Thread 陈帅
"你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。"
==>
我想知道通过Flink SQL方式要如何实现这种转换?

JingsongLee  于2019年12月9日周一 下午3:17写道:

> Hi 帅,
>
> 你可以先把RetractStream转成DataStream,这样就出现了Tuple Row>的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。
>
> Best,
> Jingsong Lee
>
>
> --
> From:Jark Wu 
> Send Time:2019年12月8日(星期日) 11:54
> To:user-zh 
> Subject:Re: Flink RetractStream如何转成AppendStream?
>
> Hi,
>
> 目前 Kafka  只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持
> RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。
>
> Best,
> Jark
>
> On Sun, 8 Dec 2019 at 10:08, 陈帅  wrote:
>
> > 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka
> >
> >
> sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
> >
>


Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-09 Thread 陈帅
1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢?
2. BulkWriter是不是攒微批写文件的?

JingsongLee  于2019年12月9日周一 下午3:24写道:

> Hi 帅,
> - 目前可以通过改写StreamingFileSink的方式来支持Parquet。
> (但是目前StreamingFileSink支持ORC比较难)
> - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
> - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。
>
> 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-14249
>
> Best,
> Jingsong Lee
>
>
> --
> From:陈帅 
> Send Time:2019年12月8日(星期日) 10:04
> To:user-zh@flink.apache.org 
> Subject:Flink实时数仓落Hive一般用哪种方式好?
>
> 有人说直接写到HBase,再在Hive关联Hbase表
> 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:
>
> 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
> 写的话,目前来看没有现成的Streaming
> Writer,官方提供的都是
> BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
> 业务上的Update和Delete操作 数据一般是如何sync进Hive的?
>
> 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?
>


Apache Flink - Retries for async processing

2019-12-09 Thread M Singh
Hi Folks:
I am working on a project where I will be using Flink's async processing 
capabilities.  The job has to make http request using a token.  The token 
expires periodically and needs to be refreshed.
So, I was looking for patterns for handling async call failures and retries 
when the token expires.  I found this link Re: Backoff strategies for async IO 
functions? and it appears that Flink does not support retries and periodically 
refresh a security token.  I am using 1.6 at the moment but am planning to 
migrate to 1.9 soon.

| 
| 
|  | 
Re: Backoff strategies for async IO functions?


 |

 |

 |

 
If there are any patterns on how to deal with this scenario, please let me know.
Thanks
Mans


Re: User program failures cause JobManager to be shutdown

2019-12-09 Thread 김동원
Hi Robert,

Yeah, I know. For the moment, I warned my colleagues not to call System.exit() 
:-) But it needs to be implemented for the sake of Flink usability as you 
described in the issue.
Thanks a lot for taking care of this issue.

Best,

Dongwon

> 2019. 12. 9. 오후 9:55, Robert Metzger  작성:
> 
> 
> Hey Dongwon,
> I filed a ticket: https://issues.apache.org/jira/browse/FLINK-15156
> This does not mean it will be implemented anytime soon :)
> 
>> On Mon, Dec 9, 2019 at 2:25 AM Dongwon Kim  wrote:
>> Hi Robert and Roman, 
>> Yeah, letting users know System.exit() is called would be much more 
>> appropriate than just intercepting and ignoring.
>> 
>> Best,
>> Dongwon
>> 
>>> On Sat, Dec 7, 2019 at 11:29 PM Robert Metzger  wrote:
>>> I guess we could manage the security only when calling the user's main() 
>>> method.
>>> 
>>> This problem actually exists for all usercode in Flink: You can also kill 
>>> TaskManagers like this.
>>> If we are going to add something like this to Flink, I would only log that 
>>> System.exit() has been called by the user code, not intercept and ignore 
>>> the call.
>>> 
 On Fri, Dec 6, 2019 at 10:31 AM Khachatryan Roman 
  wrote:
 Hi Dongwon,
 
 This should work but it could also interfere with Flink itself exiting in 
 case of a fatal error.
 
 Regards,
 Roman
 
 
> On Fri, Dec 6, 2019 at 2:54 AM Dongwon Kim  wrote:
> FYI, we've launched a session cluster where multiple jobs are managed by 
> a job manager. If that happens, all the other jobs also fail because the 
> job manager is shut down and all the task managers get into chaos 
> (failing to connect to the job manager).
> 
> I just searched a way to prevent System.exit() calls from terminating 
> JVMs and found [1]. Can it be a possible solution to the problem?
> 
> [1] 
> https://stackoverflow.com/questions/5549720/how-to-prevent-calls-to-system-exit-from-terminating-the-jvm
> 
> Best,
> - Dongwon
> 
>> On Fri, Dec 6, 2019 at 10:39 AM Dongwon Kim  
>> wrote:
>> Hi Robert and Roman,
>> 
>> Thank you for taking a look at this.
>> 
>>> what is your main() method / client doing when it's receiving wrong 
>>> program parameters? Does it call System.exit(), or something like that?
>> 
>> I just found that our HTTP client is programmed to call System.exit(1). 
>> I should guide not to call System.exit() in Flink applications. 
>> 
>> p.s. Just out of curiosity, is there no way for the web app to intercept 
>> System.exit() and prevent the job manager from being shutting down?
>> 
>> Best,
>> 
>> - Dongwon
>> 
>>> On Fri, Dec 6, 2019 at 3:59 AM Robert Metzger  
>>> wrote:
>>> Hi Dongwon,
>>> 
>>> what is your main() method / client doing when it's receiving wrong 
>>> program parameters? Does it call System.exit(), or something like that?
>>> 
>>> By the way, the http address from the error message is publicly 
>>> available. Not sure if this is internal data or not.
>>> 
 On Thu, Dec 5, 2019 at 6:32 PM Khachatryan Roman 
  wrote:
 Hi Dongwon,
 
 I wasn't able to reproduce your problem with Flink JobManager 1.9.1 
 with various kinds of errors in the job.
 I suggest you try it on a fresh Flink installation without any other 
 jobs submitted.
 
 Regards,
 Roman
 
 
> On Thu, Dec 5, 2019 at 3:48 PM Dongwon Kim  
> wrote:
> Hi Roman,
> 
> We're using the latest version 1.9.1 and those two lines are all I've 
> seen after executing the job on the web ui.
> 
> Best,
> 
> Dongwon
> 
>> On Thu, Dec 5, 2019 at 11:36 PM r_khachatryan 
>>  wrote:
>> Hi Dongwon,
>> 
>> Could you please provide Flink version you are running and the job 
>> manager
>> logs?
>> 
>> Regards,
>> Roman
>> 
>> 
>> eastcirclek wrote
>> > Hi,
>> > 
>> > I tried to run a program by uploading a jar on Flink UI. When I
>> > intentionally enter a wrong parameter to my program, JobManager 
>> > dies.
>> > Below
>> > is all log messages I can get from JobManager; JobManager dies as 
>> > soon as
>> > spitting the second line:
>> > 
>> > 2019-12-05 04:47:58,623 WARN
>> >>  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
>> >> Configuring the job submission via query parameters is 
>> >> deprecated. Please
>> >> migrate to submitting a JSON request instead.
>> >>
>> >>
>> >> *2019-12-05 04:47:59,133 ERROR com.skt.apm.http.HTTPClient
>> >>   - Cannot
>> >> 

Re: User program failures cause JobManager to be shutdown

2019-12-09 Thread Robert Metzger
Hey Dongwon,
I filed a ticket: https://issues.apache.org/jira/browse/FLINK-15156
This does not mean it will be implemented anytime soon :)

On Mon, Dec 9, 2019 at 2:25 AM Dongwon Kim  wrote:

> Hi Robert and Roman,
> Yeah, letting users know System.exit() is called would be much more
> appropriate than just intercepting and ignoring.
>
> Best,
> Dongwon
>
> On Sat, Dec 7, 2019 at 11:29 PM Robert Metzger 
> wrote:
>
>> I guess we could manage the security only when calling the user's main()
>> method.
>>
>> This problem actually exists for all usercode in Flink: You can also kill
>> TaskManagers like this.
>> If we are going to add something like this to Flink, I would only log
>> that System.exit() has been called by the user code, not intercept and
>> ignore the call.
>>
>> On Fri, Dec 6, 2019 at 10:31 AM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> This should work but it could also interfere with Flink itself exiting
>>> in case of a fatal error.
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Fri, Dec 6, 2019 at 2:54 AM Dongwon Kim 
>>> wrote:
>>>
 FYI, we've launched a session cluster where multiple jobs are managed
 by a job manager. If that happens, all the other jobs also fail because the
 job manager is shut down and all the task managers get into chaos (failing
 to connect to the job manager).

 I just searched a way to prevent System.exit() calls from terminating
 JVMs and found [1]. Can it be a possible solution to the problem?

 [1]
 https://stackoverflow.com/questions/5549720/how-to-prevent-calls-to-system-exit-from-terminating-the-jvm

 Best,
 - Dongwon

 On Fri, Dec 6, 2019 at 10:39 AM Dongwon Kim 
 wrote:

> Hi Robert and Roman,
>
> Thank you for taking a look at this.
>
> what is your main() method / client doing when it's receiving wrong
>> program parameters? Does it call System.exit(), or something like that?
>>
>
> I just found that our HTTP client is programmed to call
> System.exit(1). I should guide not to call System.exit() in Flink
> applications.
>
> p.s. Just out of curiosity, is there no way for the web app to
> intercept System.exit() and prevent the job manager from being shutting
> down?
>
> Best,
>
> - Dongwon
>
> On Fri, Dec 6, 2019 at 3:59 AM Robert Metzger 
> wrote:
>
>> Hi Dongwon,
>>
>> what is your main() method / client doing when it's receiving wrong
>> program parameters? Does it call System.exit(), or something like that?
>>
>> By the way, the http address from the error message is
>> publicly available. Not sure if this is internal data or not.
>>
>> On Thu, Dec 5, 2019 at 6:32 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> I wasn't able to reproduce your problem with Flink JobManager 1.9.1
>>> with various kinds of errors in the job.
>>> I suggest you try it on a fresh Flink installation without any other
>>> jobs submitted.
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Dec 5, 2019 at 3:48 PM Dongwon Kim 
>>> wrote:
>>>
 Hi Roman,

 We're using the latest version 1.9.1 and those two lines are all
 I've seen after executing the job on the web ui.

 Best,

 Dongwon

 On Thu, Dec 5, 2019 at 11:36 PM r_khachatryan <
 khachatryan.ro...@gmail.com> wrote:

> Hi Dongwon,
>
> Could you please provide Flink version you are running and the job
> manager
> logs?
>
> Regards,
> Roman
>
>
> eastcirclek wrote
> > Hi,
> >
> > I tried to run a program by uploading a jar on Flink UI. When I
> > intentionally enter a wrong parameter to my program, JobManager
> dies.
> > Below
> > is all log messages I can get from JobManager; JobManager dies
> as soon as
> > spitting the second line:
> >
> > 2019-12-05 04:47:58,623 WARN
> >>  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
> >> Configuring the job submission via query parameters is
> deprecated. Please
> >> migrate to submitting a JSON request instead.
> >>
> >>
> >> *2019-12-05 04:47:59,133 ERROR com.skt.apm.http.HTTPClient
> >>   - Cannot
> >> connect:
> http://52.141.38.11:8380/api/spec/poc_asset_model_01/model/imbalance/models
> >> 
> http://52.141.38.11:8380/api/spec/poc_asset_model_01/model/imbalance/models
> ;:
> >> com.fasterxml.jackson.databind.exc.MismatchedInputException:
> Cannot
> >> deserialize instance of `java.util.ArrayList` out of
> START_OBJECT 

Re: Change Flink binding address in local mode

2019-12-09 Thread Andrea Cardaci
On Mon, 9 Dec 2019 at 12:54, Chesnay Schepler  wrote:
> At this point I would suggest to file a ticket

Here it is: https://issues.apache.org/jira/browse/FLINK-15154


Re: Change Flink binding address in local mode

2019-12-09 Thread Chesnay Schepler
At this point I would suggest to file a ticket; these are the options 
that _should_ control the behavior but apparently aren't in all cases.


On 08/12/2019 12:23, Andrea Cardaci wrote:

Hi,

Flink (or some of its services) listens on three random TCP ports
during the local[1] execution, e.g., 39951, 41009 and 42849.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/local_execution.html#local-environment

The sockets listens on `0.0.0.0` and since I need to run some
long-running tests on an Internet-facing machine I was wondering how
to make them listen on `localhost` instead or if there is anything
else I can do to improve the security in this scenario.

Here's what I tried (with little luck):


Configuration config = new Configuration();
config.setString("taskmanager.host", "127.0.0.1");
config.setString("rest.bind-address", "127.0.0.1"); // OK
config.setString("jobmanager.rpc.address", "127.0.0.1");
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.getDefaultLocalParallelism(),
 config);

Only the `rest.bind-address` configuration actually changes the
binding address of one of those ports. Are there other parameters that
I'm not aware of or this is not the right approach in local mode?


Best,
Andrea





Re: KeyBy/Rebalance overhead?

2019-12-09 Thread Arvid Heise
Hi Komal,

as a general rule of thumb, you want to avoid network shuffles as much as
possible. As vino pointed out, you need to reshuffle, if you need to group
by key. Another frequent usecase is for a rebalancing of data in case of a
heavy skew. Since neither applies to you, removing the keyby is the best
option.

If you want to retain it, because you may experience skew in the future,
there are only a couple of things you can do. You may tinker with
networking settings to have smaller/larger network buffers (smaller = less
latency, larger = more throughput) [1]. Of course, you get better results
if you have a faster network (running in the cloud, you can play around
with different adapters). Also you could try if less/more machines are
actually faster (less machines = less network traffic, more machines = more
compute power).

In any case, your data volume is so low that I would probably not optimize
too much. We are talking about seconds and the times may vary largely from
run to run, because of the low data volume. If you want to test the
throughput as a POC for a larger volume, I'd either generate a larger
sample or replicate it to get more reliable numbers. In any case, try to
have your final use case in mind when deciding for an option.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#configuring-the-network-buffers

On Mon, Dec 9, 2019 at 10:25 AM vino yang  wrote:

> Hi Komal,
>
> Actually, the main factor about choosing the type of the partition depends
> on your business logic. If you want to do some aggregation logic based on a
> group. You must choose KeyBy to guarantee the correctness semantics.
>
> Best,
> Vino
>
> Komal Mariam  于2019年12月9日周一 下午5:07写道:
>
>> Thank you @vino yang   for the reply. I suspect
>> keyBy will beneficial in those cases where my subsequent operators are
>> computationally intensive. Their computation time being > than network
>> reshuffling cost.
>>
>> Regards,
>> Komal
>>
>> On Mon, 9 Dec 2019 at 15:23, vino yang  wrote:
>>
>>> Hi Komal,
>>>
>>> KeyBy(Hash Partition, logically partition) and rebalance(physical
>>> partition) are both one of the partitions been supported by Flink.[1]
>>>
>>> Generally speaking, partitioning may cause network communication(network
>>> shuffles) costs which may cause more time cost. The example provided by you
>>> may be benefit from operator chain[2] if you remove the keyBy operation.
>>>
>>> Best,
>>> Vino
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>>>
>>> Komal Mariam  于2019年12月9日周一 上午9:11写道:
>>>
 Anyone?

 On Fri, 6 Dec 2019 at 19:07, Komal Mariam 
 wrote:

> Hello everyone,
>
> I want to get some insights on the KeyBy (and Rebalance) operations as
> according to my understanding they partition our tasks over the defined
> parallelism and thus should make our pipeline faster.
>
> I am reading a topic which contains 170,000,000 pre-stored records
> with 11 Kafka partitions and replication factor of 1.   Hence I use
> .setStartFromEarliest() to read the stream.
> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
> and 1 job manager with 6 cores. (10 task slots per TM hence I set
> environment parallelism to 30).
>
> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
> keeping the number of records fixed to get a handle on how fast they're
> being processed.
>
> When I remove keyBy, I get the same results in 39 secs as opposed to
> 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
> below I still get the same extra overhead of 9 to 13secs. My data is 
> mostly
> uniformly distributed on it's key so I can rule out skew.  Rebalance
> likewise has the same latency as keyBy.
>
>  What I want to know is what may be causing this overhead? And is
> there any way to decrease it?
>
> Here's the script I'm running for testing purposes:
> --
> DataStream JSONStream  = env.addSource(new
> FlinkKafkaConsumer<>("data", new
> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>
> DataStream myPoints = JSONStream.map(new jsonToPoint());
>
> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>
> public class findDistancefromPOI extends RichFilterFunction {
> public boolean filter(Point input) throws Exception {
> Double distance = computeEuclideanDist(
> 16.4199  , 89.974  ,input.X(),input.Y);
>  return distance > 0;
> }
> }
>
> Best Regards,
> Komal
>



Job manager is failing to start with an S3 no key specified exception [1.7.2]

2019-12-09 Thread Kumar Bolar, Harshith
Hi all,

I'm running a standalone Flink cluster with Zookeeper and S3 for high 
availability storage. All of a sudden, the job managers started failing with an 
S3 `UnrecoverableS3OperationException` error. Here is the full error trace -

```
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Could not set up 
JobManager
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set 
up JobManager
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$UnrecoverableS3OperationException:
 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error 
Code: NoSuchKey; Request ID: 1769066EBD605AB5; S3 Extended Request ID: 
K8jjbsE4DPAsZJDVJKBq3Nh0E0o+feafefavbvbaae+nbUTphHHw73/eafafefa+dsVMR0=), S3 
Extended Request ID: 
lklalkioe+eae2234+nbUTphHHw73/gVSclc1o1YH7M0MeNjmXl+dsVMR0= (Path: 
s3://abc-staging/flink/jobmanagerha/flink-2/blob/job_3e16166a1122885eb6e9b2437929b266/blob_p-3b687174148e9e1dd951f2a9fbec83f4fcd5281e-b85417f69b354c83b270bf01dcf389e0)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:908)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:893)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:878)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:871)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:810)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:809)
... 10 more
Caused by: 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error 
Code: NoSuchKey; Request ID: 1769066EBaD6aefB5; S3 Extended Request ID: 
fealloga+4rVwsF+nbUTphHHw73/gVSclc1o1YH7M0MeNjmXl+dsVMR0=), S3 Extended Request 
ID: K8jjbsE4DPAsZJDVJKBq3Nh0E0o+4rVwsF+nbUTphHHweafga/lc1o1YH7M0MeNjmXl+dsVMR0=
at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
... 30 more
```

I could fix this by changing the `high-availability.cluster-id` property (which 
is currently set to `flink-2`) but with that I would lose all the existing jobs 
and state. Is there any way I can tell Flink to ignore this particular key in 
S3 and start the job managers?

Thanks,
Harshith


Re: KeyBy/Rebalance overhead?

2019-12-09 Thread vino yang
Hi Komal,

Actually, the main factor about choosing the type of the partition depends
on your business logic. If you want to do some aggregation logic based on a
group. You must choose KeyBy to guarantee the correctness semantics.

Best,
Vino

Komal Mariam  于2019年12月9日周一 下午5:07写道:

> Thank you @vino yang   for the reply. I suspect
> keyBy will beneficial in those cases where my subsequent operators are
> computationally intensive. Their computation time being > than network
> reshuffling cost.
>
> Regards,
> Komal
>
> On Mon, 9 Dec 2019 at 15:23, vino yang  wrote:
>
>> Hi Komal,
>>
>> KeyBy(Hash Partition, logically partition) and rebalance(physical
>> partition) are both one of the partitions been supported by Flink.[1]
>>
>> Generally speaking, partitioning may cause network communication(network
>> shuffles) costs which may cause more time cost. The example provided by you
>> may be benefit from operator chain[2] if you remove the keyBy operation.
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>>
>> Komal Mariam  于2019年12月9日周一 上午9:11写道:
>>
>>> Anyone?
>>>
>>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam 
>>> wrote:
>>>
 Hello everyone,

 I want to get some insights on the KeyBy (and Rebalance) operations as
 according to my understanding they partition our tasks over the defined
 parallelism and thus should make our pipeline faster.

 I am reading a topic which contains 170,000,000 pre-stored records with
 11 Kafka partitions and replication factor of 1.   Hence I use
 .setStartFromEarliest() to read the stream.
 My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
 and 1 job manager with 6 cores. (10 task slots per TM hence I set
 environment parallelism to 30).

 There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
 keeping the number of records fixed to get a handle on how fast they're
 being processed.

 When I remove keyBy, I get the same results in 39 secs as opposed to 52
 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
 below I still get the same extra overhead of 9 to 13secs. My data is mostly
 uniformly distributed on it's key so I can rule out skew.  Rebalance
 likewise has the same latency as keyBy.

  What I want to know is what may be causing this overhead? And is there
 any way to decrease it?

 Here's the script I'm running for testing purposes:
 --
 DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
 new
 JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())

 DataStream myPoints = JSONStream.map(new jsonToPoint());

 mypoints.keyBy("oID").filter(new findDistancefromPOI());

 public class findDistancefromPOI extends RichFilterFunction {
 public boolean filter(Point input) throws Exception {
 Double distance = computeEuclideanDist(
 16.4199  , 89.974  ,input.X(),input.Y);
  return distance > 0;
 }
 }

 Best Regards,
 Komal

>>>


Re: Sample Code for querying Flink's default metrics

2019-12-09 Thread vino yang
Hi Pankaj,

> Is there any sample code for how to read such default metrics?  Is there
any way to query the default metrics, such as CPU usage and Memory, without
using REST API or Reporters?

What's your real requirement? Can you use code to call REST API?  Why does
it not match your requirements?

> Additionally, how do I query Backpressure using code, or is it still only
visually available via the dashboard UI? Consequently, is there any way to
infer Backpressure by querying one (or more) of the Memory metrics of the
TaskManager?

The backpressure is related to not only memory metrics but also IO and
network metrics, for more details about measure backpressure please see
this blog.[1][2]

[1]: https://flink.apache.org/2019/06/05/flink-network-stack.html
[2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html

Best,
Vino

Pankaj Chand  于2019年12月9日周一 下午12:07写道:

> Hello,
>
> Using Flink on Yarn, I could not understand the documentation for how to
> read the default metrics via code. In particular, I want to read
> throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and
> Memory.
>
> Is there any sample code for how to read such default metrics?  Is there
> any way to query the default metrics, such as CPU usage and Memory, without
> using REST API or Reporters?
>
> Additionally, how do I query Backpressure using code, or is it still only
> visually available via the dashboard UI? Consequently, is there any way to
> infer Backpressure by querying one (or more) of the Memory metrics of the
> TaskManager?
>
> Thank you,
>
> Pankaj
>


Re: KeyBy/Rebalance overhead?

2019-12-09 Thread Komal Mariam
Thank you @vino yang   for the reply. I suspect
keyBy will beneficial in those cases where my subsequent operators are
computationally intensive. Their computation time being > than network
reshuffling cost.

Regards,
Komal

On Mon, 9 Dec 2019 at 15:23, vino yang  wrote:

> Hi Komal,
>
> KeyBy(Hash Partition, logically partition) and rebalance(physical
> partition) are both one of the partitions been supported by Flink.[1]
>
> Generally speaking, partitioning may cause network communication(network
> shuffles) costs which may cause more time cost. The example provided by you
> may be benefit from operator chain[2] if you remove the keyBy operation.
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>
> Komal Mariam  于2019年12月9日周一 上午9:11写道:
>
>> Anyone?
>>
>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam  wrote:
>>
>>> Hello everyone,
>>>
>>> I want to get some insights on the KeyBy (and Rebalance) operations as
>>> according to my understanding they partition our tasks over the defined
>>> parallelism and thus should make our pipeline faster.
>>>
>>> I am reading a topic which contains 170,000,000 pre-stored records with
>>> 11 Kafka partitions and replication factor of 1.   Hence I use
>>> .setStartFromEarliest() to read the stream.
>>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
>>> and 1 job manager with 6 cores. (10 task slots per TM hence I set
>>> environment parallelism to 30).
>>>
>>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>>> keeping the number of records fixed to get a handle on how fast they're
>>> being processed.
>>>
>>> When I remove keyBy, I get the same results in 39 secs as opposed to 52
>>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
>>> below I still get the same extra overhead of 9 to 13secs. My data is mostly
>>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>>> likewise has the same latency as keyBy.
>>>
>>>  What I want to know is what may be causing this overhead? And is there
>>> any way to decrease it?
>>>
>>> Here's the script I'm running for testing purposes:
>>> --
>>> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
>>> new
>>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>>
>>> DataStream myPoints = JSONStream.map(new jsonToPoint());
>>>
>>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>>
>>> public class findDistancefromPOI extends RichFilterFunction {
>>> public boolean filter(Point input) throws Exception {
>>> Double distance = computeEuclideanDist(
>>> 16.4199  , 89.974  ,input.X(),input.Y);
>>>  return distance > 0;
>>> }
>>> }
>>>
>>> Best Regards,
>>> Komal
>>>
>>


Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-09 Thread yue ma
可以用改写StreamingfileSink的方式去直接写orc文件
不过这种方式的分区需要手动添加

陈帅  于2019年12月8日周日 上午10:04写道:

> 有人说直接写到HBase,再在Hive关联Hbase表
> 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:
>
> 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
> 写的话,目前来看没有现成的Streaming
> Writer,官方提供的都是
> BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
> 业务上的Update和Delete操作 数据一般是如何sync进Hive的?
>
> 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?
>