Re: 关于flink检查点

2020-04-20 Thread Lee Sysuke
大负载下缩短检查点间隔这个动作,是不是可以理解为,流量大的时候检查点间隔小,听上去可以自适应地保证检查点间隔时间内的流量。
但有个问题是检查点间隔缩短,意味着检查点的开销会增大,在本来由于业务流量造成的高负载情况下进一步提高节点的负载,不知道这种系统设计上会不会有困难。
还请其他大佬指教

coke half  于2020年4月18日周六 下午1:05写道:

>
> 你好,我现在了解到有对检查点开销的问题建模中考虑到恢复时间等开销,当负载大时检查点间隔会缩短。其实问题也就是,在大负载情况下自动缩短检查点间隔这个动作在实际业务场景中有意义吗?谢谢
>
> 
> 发件人: Lee Sysuke 
> 发送时间: Friday, April 17, 2020 10:41:42 AM
> 收件人: user-zh 
> 主题: Re: 关于flink检查点
>
> 一点个人看法:
>
> 一般业务场景下,大家都需要对流任务的错误范围有个比较确定性的认知。比如设置固定的5min周期,就可以比较确定流处理即使failover,误差也能控制在五分钟内。
> 但如果是自适应的间隔,负载越高周期越长,但实际failover在高负载下出现的概率应该远大于低负载,这样的设置实用价值可能就并不太大了。
>
> half coke  于2020年4月15日周三 下午4:15写道:
>
> > 是的,根据任务负载的变化自动调整checkpoint的间隔,或者可以通过用户写的逻辑调整检查点。
> > 刚开始学习flink,想请教一下。
> >
> > Congxian Qiu  于2020年4月15日周三 下午12:33写道:
> >
> > > hi
> > >
> > > 你说的间隔自适应是指什么呢?是指做 checkpoint 的间隔自动调整吗?
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > half coke  于2020年4月15日周三 下午12:24写道:
> > >
> > > > 请问下为什么flink没有支持自适应检查点间隔呢?是出于什么样的考虑吗?
> > > >
> > >
> >
>


Re:Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread forideal
Hi Kurt:
I had the same mistake.

   sql:
   insertinto
dw_access_log select get_json_value(query_nor, query_nor_counter) as`value`from 
ods_access_log_source groupby tumble (time_key, interval'1'MINUTE),
group_key


get_json_value
public class GetJsonValue extends AggregateFunction> {
@Override
public boolean isDeterministic() {
return false;
}

@Override
public Map createAccumulator() {
return new HashMap<>();
}

@Override
public void open(FunctionContext context) throws Exception {

}

public void accumulate(Map datas, String key, long value) {
datas.put(key, value);
}

@Override
public String getValue(Map acc) {
return JSON.toJSONString(acc);
}


@Override
public TypeInformation getResultType() {
return Types.STRING;
}

}




Best forideal









At 2020-04-21 10:05:05, "Kurt Young"  wrote:

Thanks, once you can reproduce this issue locally, please open a jira with your 
testing program.


Best,

Kurt





On Tue, Apr 21, 2020 at 8:48 AM 刘建刚  wrote:

Thank you. It is an online job and my input is huge. I check the trace and find 
that the array is resized when the array is not enough. The code is as below:


public void add (int value) {
int[] items = this.items;
   if (size == items.length) items = resize(Math.max(8, (int)(size * 1.75f)));
items[size++] = value;
}


Only blink planner has this error. Can it be a thread-safe problem or something 
else? I will try to reproduce it locally. 


2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List archive.] 
 写道:


Hi,


Are you using versions < 1.9.2? From the exception stack, it looks like caused 
by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
Could you try it using 1.9.2?


Best,
Jark


On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email]> wrote:

Can you reproduce this in a local program with mini-cluster?


Best,

Kurt





On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote:

You can read this for this type error.


https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446


I would suggest you set break points  in your code. Step through the code, this 
 method should show you which array variable is being passed a null argument 
when the array variable is not null able.








On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:

   I am using Roaring64NavigableMap to compute uv. It is ok to us flink 
planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, 
B, C, D,
E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E


  The udf is as following:
public static class Bitmap extends AggregateFunction {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}

@Override
public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
return accumulator;
}

public void accumulate(Roaring64NavigableMap bitmap, long id) {
  bitmap.add(id);
}
}
public static class UV extends ScalarFunction {
public long eval(Roaring64NavigableMap bitmap) {
return bitmap.getLongCardinality();
}
}
  The error is as following:


2020-04-20 16:37:13,868 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  
[flink-akka.actor.default-dispatcher-40]  - 
GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, 
appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 6)], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, 
channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, 
channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> 
SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) 
(8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at 
org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at 
org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at 

Re: How to print log in flink-streaming-java module

2020-04-20 Thread Lee Sysuke
Pls refer to conf/log4j.properties

Polarisary  于2020年4月21日周二 上午11:08写道:

> Hi all,
> when i add some log in org.apache.flink.streaming.api.environment(
> flink-streaming-java module ) and package flink-dist_2.11-1.10.0.jar, but
> it print nothing in jm or tm log.
>
> i did it like this:
>
> ```
> public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
> checkNotNull(streamGraph, "StreamGraph cannot be null.");
> checkNotNull(configuration.get(DeploymentOptions.TARGET), "No
> execution.target specified in your configuration file.");
>
> final PipelineExecutorFactory executorFactory =
> executorServiceLoader.getExecutorFactory(configuration);
>
> checkNotNull(
> executorFactory,
> "Cannot find compatible factory for specified execution.target (=%s)",
> configuration.get(DeploymentOptions.TARGET));
>
> LOG.error("# Note >>> DeploymentOptions.TARGET : {}", DeploymentOptions.
> TARGET);
> ```
>
> Do someone know how to add log in this java class
> Waiting for reply, Many Thanks
>
>
>
>
>
>


Re: Enrich streaming data with small lookup data that slowly changes over time

2020-04-20 Thread Mu Kong
Hi Jark Wu,

Thanks for your help!
I gave the document a quick glimpse, it seems method [1] fits my purpose
better.
Let me give it a deeper look.

Thank you very much!!

Best,
Mu


On Tue, Apr 21, 2020 at 12:06 PM Jark Wu  wrote:

> Hi Mu,
>
> Flink SQL does support dimension table join. There are two ways to join
> the dimension table.
> If the data is in your database (e.g. MySQL, HBase), you can use this way
> [1] to join the data in your database in realtime and enrich fresh data.
> If the data is in a log stream (change stream), you can use this way [2]
> to join the data.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
>
> On Tue, 21 Apr 2020 at 10:51, Mu Kong  wrote:
>
>> Hi community,
>>
>> I have a stream of traffic data with a service_id in it.
>> I'm enriching this data with a map of (service_id, service_name), which
>> only has 10 ~ 20 pairs and is read from config file.
>>
>> The problem I'm facing now is, this map changes from time to time, and I
>> don't want to redeploy the application to just change the map in the config
>> file.
>>
>> Is there an existing solution for solving this problem?
>>
>> Thanks in advance!
>>
>> Best regards,
>> Mu
>>
>


Re: Checkpoints for kafka source sometimes get 55 GB size (instead of 2 MB) and flink job fails during restoring from such checkpoint

2020-04-20 Thread Yun Tang
Hi Oleg

Have you ever checked to load the _metadata via 
Checkpoints.loadCheckpointMetadata to see how many records in the offsets meta? 
If only one record which is indicated from the logs, that will be really weird.
Moreover, I have several comments based on your description:

  *   state.backend.fs.memory-threshold would not take effect in 
MemoryStateBackend as that configuration is only available for FsStateBackend.

  *   If you switch the source from Kafka to kinesis and also restore my "bad" 
checkpoint, that would not be allowed unless you provided 
--allowNonRestoredState [1] option. If so, the source data has been changed, 
and the job would actually run from the scratch.
  *   Would you please share the code of how to create the 
"dsp-producer-z-clickstream-web-raw" source?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
Best
Yun Tang

From: Oleg Vysotsky 
Sent: Tuesday, April 21, 2020 6:45
To: Jacob Sevart ; Timo Walther ; 
user@flink.apache.org 
Cc: Long Nguyen ; Gurpreet Singh 
Subject: Re: Checkpoints for kafka source sometimes get 55 GB size (instead of 
2 MB) and flink job fails during restoring from such checkpoint


Hi Jacob & Timo,

Thank you for checking!



I don’t use union list state in my app. FlinkKafkaConsumerBase (from kafka 
connector) uses it to store offsets per partition, but partitions are small 
(input topic has 32 partitions). The metadata file is large (about 1.5 Gb) in 
case of “abnormal” checkpoint. Usual checkpoints have metadata file around 
180-190 Mb. I use  MemoryStateBackend with state.backend.fs.memory-threshold = 
1024000.



In one of my experiment I modified FlinkKafkaConsumerBase  to disable 
initializing states in FlinkKafkaConsumerBase#initializeState and disabled 
saving states in FlinkKafkaConsumerBase#snapshotState. I also force “ to use 
the partition discoverer to fetch the initial seed partitions” by changing 
FlinkKafkaConsumerBase#open method  (please check the code below). The problem 
is still there: when I restore from “bad” checkpoint the flink job creates 
“abnormal” checkpoints with 55 Gb associated to kafka source.  Looks like flink 
stores not only data related to partition offsets in checkpoint which are 
associated with kafka source. Any idea?



Looks like the problem does relate to kafka source. E.g. switching  source from 
kafka to kinesis and back temporary fix the problem:

If I restore flink job from "bad" checkpoint (which creates the problem: not 
"abnormal" checkpoint) with switching from kafka to kinesis input (which has 
identical data). After "restored" flink job creates flrst checkpoint I cancel 
and restore the flink job from “new” checkpoint with kafka input from specific 
timestamp. The flink job creates correct checkpoints.



Just in case, I attached two “snapshots” from flink ui with example of abnormal 
checkpoint.



Thanks a lot!

Oleg



@Override
public final void initializeState(FunctionInitializationContext context) throws 
Exception {

   OperatorStateStore stateStore = context.getOperatorStateStore();
   if (false) {
   ListState> oldRoundRobinListState =
  
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

   this.unionOffsetStates = stateStore.getUnionListState(new 
ListStateDescriptor<>(
  OFFSETS_STATE_NAME,
  TypeInformation.of(new TypeHint>() {
  })));

   if (context.isRestored() && !restoredFromOldState) {
  restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

  // migrate from 1.2 state, if there is any
  for (Tuple2 kafkaOffset : 
oldRoundRobinListState.get()) {
 restoredFromOldState = true;
 unionOffsetStates.add(kafkaOffset);
  }
  oldRoundRobinListState.clear();

  if (restoredFromOldState && discoveryIntervalMillis != 
PARTITION_DISCOVERY_DISABLED) {
 throw new IllegalArgumentException(
"Topic / partition discovery cannot be enabled if the job is 
restored from a savepoint from Flink 1.2.x.");
  }

  // populate actual holder for restored state
  for (Tuple2 kafkaOffset : 
unionOffsetStates.get()) {
 restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
  }

  LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoredState);
   } else {
  LOG.info("No restore state for FlinkKafkaConsumer.");
   }

   } else {

  LOG.warn("initializeState skipped");

   }

}



@Override
public final void snapshotState(FunctionSnapshotContext context) throws 
Exception {
   if (!running) {
  LOG.debug("snapshotState() called on closed source");
   } else if (false) {

  unionOffsetStates.clear();

  final AbstractFetcher fetcher = this.kafkaFetcher;
  StringBuilder sb = new StringBuilder("snapshotState ");
  if (fetcher == null) {
 sb.append("null fetcher: ");
 // the fetcher has not yet been 

How to print log in flink-streaming-java module

2020-04-20 Thread Polarisary
Hi all, 
when i add some log in org.apache.flink.streaming.api.environment( 
flink-streaming-java module ) and package flink-dist_2.11-1.10.0.jar, but it 
print nothing in jm or tm log.

i did it like this:

```
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
   checkNotNull(streamGraph, "StreamGraph cannot be null.");
   checkNotNull(configuration.get(DeploymentOptions.TARGET), "No 
execution.target specified in your configuration file.");

   final PipelineExecutorFactory executorFactory =
  executorServiceLoader.getExecutorFactory(configuration);

   checkNotNull(
  executorFactory,
  "Cannot find compatible factory for specified execution.target (=%s)",
  configuration.get(DeploymentOptions.TARGET));

   LOG.error("# Note >>> DeploymentOptions.TARGET : {}", 
DeploymentOptions.TARGET);
```

Do someone know how to add log in this java class


Waiting for reply, Many 
Thanks







Re: Enrich streaming data with small lookup data that slowly changes over time

2020-04-20 Thread Jark Wu
Hi Mu,

Flink SQL does support dimension table join. There are two ways to join the
dimension table.
If the data is in your database (e.g. MySQL, HBase), you can use this way
[1] to join the data in your database in realtime and enrich fresh data.
If the data is in a log stream (change stream), you can use this way [2] to
join the data.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function

On Tue, 21 Apr 2020 at 10:51, Mu Kong  wrote:

> Hi community,
>
> I have a stream of traffic data with a service_id in it.
> I'm enriching this data with a map of (service_id, service_name), which
> only has 10 ~ 20 pairs and is read from config file.
>
> The problem I'm facing now is, this map changes from time to time, and I
> don't want to redeploy the application to just change the map in the config
> file.
>
> Is there an existing solution for solving this problem?
>
> Thanks in advance!
>
> Best regards,
> Mu
>


Enrich streaming data with small lookup data that slowly changes over time

2020-04-20 Thread Mu Kong
Hi community,

I have a stream of traffic data with a service_id in it.
I'm enriching this data with a map of (service_id, service_name), which
only has 10 ~ 20 pairs and is read from config file.

The problem I'm facing now is, this map changes from time to time, and I
don't want to redeploy the application to just change the map in the config
file.

Is there an existing solution for solving this problem?

Thanks in advance!

Best regards,
Mu


Re: Questions about Flink RichSinkFunction constructor VS open()

2020-04-20 Thread Jark Wu
Hi Jiawei,

Yes, you should initialize connection in open() method, because constructor
is only called in client side (where may can't connect to your database).
Besides, after construction, the RichSinkFunction instance will be
serialized into binary and ship to server (TaskManagers) for
deserialization and execution.
However, connection instances are mostly not serializable (e.g. jdbc
connection). So you can't and shouldn't initialize it in constructor.
open() is the suggested way to do that.

Best,
Jark

On Tue, 21 Apr 2020 at 09:26, Jiawei Wu  wrote:

> I have posted this question in StackOverflow:
> https://stackoverflow.com/questions/61334549/flink-richsinkfunction-constructor-vs-open
>
> The question is:
> > Let's say I need to implemnt a custom sink using RichSinkFunction, and I
> need some variables like DBConnection in the sink. Where should I
> initialize the DBConnection? I see most of the articles init the
> DBConnection in the open() method, why not in the constructor?
>
> A folow up questions is what kind of variables should be inited in
> constructor and what should be init in open()?
>


Re:Re: flink-1.10 checkpoint 偶尔报 NullPointerException

2020-04-20 Thread chenkaibit



这个不是稳定复现的,但是在最近 1.10 上测试的几个作业出现了,触发时也没有其他报错。我加了一些日志,再观察下




在 2020-04-21 01:12:48,"Yun Tang"  写道:
>Hi
>
>这个NPE有点奇怪,从executeCheckpointing方法[1]里面其实比较难定位究竟是哪一个变量或者变量的取值是null。
>一种排查思路是打开 org.apache.flink.streaming.runtime.tasks 的DEBUG 
>level日志,通过debug日志缩小范围,判断哪个变量是null
>
>这个异常出现的时候,相关task上面的日志有什么异常么,触发这个NPE的条件是什么,稳定复现么?
>
>[1] 
>https://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1349
>
>祝好
>唐云
>
>
>From: chenkaibit 
>Sent: Monday, April 20, 2020 18:39
>To: user-zh@flink.apache.org 
>Subject: flink-1.10 checkpoint 偶尔报 NullPointerException
>
>大家遇到过这个错误吗, CheckpointOperation.executeCheckpointing 的时候报 NullPointerException
>java.lang.Exception: Couldnot perform checkpoint 5505for operator Source: 
>KafkaTableSource(xxx) -> SourceConversion(table=[xxx, source: 
>[KafkaTableSource(xxx)]], fields=[xxx]) -> Calc(select=[xxx) AS xxx]) -> 
>SinkConversionToTuple2 -> Sink: Elasticsearch6UpsertTableSink(xxx) (1/1).
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$228/1024478318.call(UnknownSource)
>
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
>
>at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>
>at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>
>at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>
>at java.lang.Thread.run(Thread.java:745)
>
>Causedby: java.lang.NullPointerException
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$229/1010499540.run(UnknownSource)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
>
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
>
>... 12 more


Re: StreamQueryConfig vs TemporalTableFunction

2020-04-20 Thread Jark Wu
Hi Dom,

The TemporalTableFunction join will also cleanup expired state if
`setIdleStateRetentionTime()` is enabled in StreamQueryConfig or
TableConfig.

Best,
Jark

On Tue, 21 Apr 2020 at 04:47, Dominik Wosiński  wrote:

> Hey,
> I wanted to ask whether the TemporalTableFunctions are subject to
> StreamQueryConfig retention? I was pretty sure that they are not, but I
> have recently noticed weird behavior in one of my jobs that suggests that
> they indeed are.
>
>
> Thanks for answers,
> Best Regards,
> Dom.
>


Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread Kurt Young
Thanks, once you can reproduce this issue locally, please open a jira with
your testing program.

Best,
Kurt


On Tue, Apr 21, 2020 at 8:48 AM 刘建刚  wrote:

> Thank you. It is an online job and my input is huge. I check the trace and
> find that the array is resized when the array is not enough. The code is as
> below:
>
> public void add (int value) {
>int[] items = this.items;
>if (size == items.length) items = resize(Math.max(8, (int)(size * 1.75f)));
>items[size++] = value;
> }
>
>
> Only blink planner has this error. Can it be a thread-safe problem or
> something else? I will try to reproduce it locally.
>
> 2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List archive.]
>  写道:
>
> Hi,
>
> Are you using versions < 1.9.2? From the exception stack, it looks like
> caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
> Could you try it using 1.9.2?
>
> Best,
> Jark
>
> On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email]> wrote:
>
>> Can you reproduce this in a local program with mini-cluster?
>>
>> Best,
>> Kurt
>>
>>
>> On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote:
>>
>>> You can read this for this type error.
>>>
>>>
>>> https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446
>>>
>>> I would suggest you set break points  in your code. Step through the
>>> code, this  method should show you which array variable is being passed a
>>> null argument when the array variable is not null able.
>>>
>>>
>>>
>>>
>>> On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:
>>>
I am using Roaring64NavigableMap to compute uv. It is ok to us
 flink planner and not ok with blink planner. The SQL is as following:

 SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as 
 curTimestamp, A, B, C, D,
 E, uv(bitmap(id)) as bmp
 FROM person
 GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E


   The udf is as following:

 public static class Bitmap extends 
 AggregateFunction {
@Override
public Roaring64NavigableMap createAccumulator() {
   return new Roaring64NavigableMap();
}

@Override
public Roaring64NavigableMap getValue(Roaring64NavigableMap 
 accumulator) {
   return accumulator;
}

public void accumulate(Roaring64NavigableMap bitmap, long id) {
   bitmap.add(id);
}
 }

 public static class UV extends ScalarFunction {
public long eval(Roaring64NavigableMap bitmap) {
   return bitmap.getLongCardinality();
}
 }

   The error is as following:

 2020-04-20 16:37:13,868 INFO
  org.apache.flink.runtime.executiongraph.ExecutionGraph
  [flink-akka.actor.default-dispatcher-40]  -
 GroupWindowAggregate(groupBy=[brand, platform, channel, versionName,
 appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 6)],
 properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand,
 platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5,
 start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
 proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS
 curTimestamp, brand, platform, channel, versionName, appMajorVersion,
 uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink:
 Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING
 to FAILED.
 java.lang.ArrayIndexOutOfBoundsException: -1
   at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
   at
 org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
   at
 org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
   at
 org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
   at
 org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
   at
 org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
   at
 org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
   at
 org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
   at
 org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
   at
 org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
   at
 org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
   at
 

Questions about Flink RichSinkFunction constructor VS open()

2020-04-20 Thread Jiawei Wu
I have posted this question in StackOverflow:
https://stackoverflow.com/questions/61334549/flink-richsinkfunction-constructor-vs-open

The question is:
> Let's say I need to implemnt a custom sink using RichSinkFunction, and I
need some variables like DBConnection in the sink. Where should I
initialize the DBConnection? I see most of the articles init the
DBConnection in the open() method, why not in the constructor?

A folow up questions is what kind of variables should be inited in
constructor and what should be init in open()?


Re: Changing number of partitions for a topic

2020-04-20 Thread Benchao Li
Hi Suraj,

There is a config option[1] to enable partition discovery, which is
disabled by default.
The community discussed to enable it by default[2], but only aims to the
new Source API.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#partition-discovery
[2] https://issues.apache.org/jira/browse/FLINK-15703

Suraj Puvvada  于2020年4月21日周二 上午6:01写道:

> Hello,
>
> I have a flink job that reads from a source topic that currently has 4
> partitions and I need to increase the partition count to 8.
>
> Do you need to restart the job for that to take effect ?
> How does it work in case there is persistent state (like a window
> operator) involved ?
>
> Any design documents on how partition mapping works would be very helpful.
>
> Thanks
> Suraj
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复:问题请教-flinksql的kafkasource方面

2020-04-20 Thread Sun.Zhu
嗯是的,都设置成小于等于partition数




| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年04月21日 00:28,Jark Wu 写道:
Hi,

你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。

Best,
Jark

On Mon, 20 Apr 2020 at 22:33, Benchao Li  wrote:

> 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。
>
> 祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道:
>
> > 我们是1.8版本,但是这段源码应该是没变把
> > // check if all tasks that we need to trigger are running.
> > // if not, abort the checkpoint
> > Execution[] executions = new Execution[tasksToTrigger.length];
> > for (int i = 0; i < tasksToTrigger.length; i++) {
> >Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
> >if (ee == null) {
> >   LOG.info("Checkpoint triggering task {} of job {} is not being
> > executed at the moment. Aborting checkpoint.",
> > tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> > job);
> >   throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >} else if (ee.getState() == ExecutionState.RUNNING) {
> >   executions[i] = ee;
> >} else {
> >   LOG.info("Checkpoint triggering task {} of job {} is not in state
> {}
> > but {} instead. Aborting checkpoint.",
> > tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> > job,
> > ExecutionState.RUNNING,
> > ee.getState());
> >   throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >}
> > }
> > 还是我理解的不对
> >
> > > 2020年4月20日 下午6:21,Benchao Li  写道:
> > >
> > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> > >
> > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:
> > >
> > >>
> >
> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> | |
> > >> Sun.Zhu
> > >> |
> > >> |
> > >> 邮箱:17626017...@163.com
> > >> |
> > >>
> > >> Signature is customized by Netease Mail Master
> > >>
> > >> 在2020年04月20日 10:37,Benchao Li 写道:
> > >> 应该是不会的。分配不到partition的source会标记为idle状态。
> > >>
> > >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
> > >>
> > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> | |
> > >>> Sun.Zhu
> > >>> |
> > >>> |
> > >>> 邮箱:17626017...@163.com
> > >>> |
> > >>>
> > >>> Signature is customized by Netease Mail Master
> > >>>
> > >>> 在2020年04月19日 22:43,人生若只如初见 写道:
> > >>> 嗯嗯,十分感谢
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> --原始邮件--
> > >>> 发件人:"Benchao Li" > >>> 发送时间:2020年4月19日(星期天) 晚上9:25
> > >>> 收件人:"user-zh" > >>>
> > >>> 主题:Re: 问题请教-flinksql的kafkasource方面
> > >>>
> > >>>
> > >>>
> > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> > >>>
> > >>> Jark Wu  > >>>
> > >>>  Hi,
> > >>> 
> > >>>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> > >>>  根据你的 Java 代码,数据的 event time
> > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> > >>>  能容忍 5s 乱序).
> > >>>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition
> 进度比某些
> > >>> partition
> > >>>  进度快很多的现象,
> > >>> 
> 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> > >>> 
> > >>>  完美的解决方案还需要等 FLIP-27 的完成。
> > >>>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> > >>> 
> > >>>  Best,
> > >>>  Jark
> > >>> 
> > >>> 
> > >>>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  > >>> 
> > >>>   你好
> > >>>  
> > >>>  
> > >>> 
> > >>>
> > >>
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> > >>>  
> > >>>  
> > >>>  
> > >>>   附:
> > >>>   userbehavior建表语句
> > >>>   CREATE TABLE user_behavior (
> > >>>   nbsp; nbsp; user_id BIGINT,
> > >>>   nbsp; nbsp; item_id BIGINT,
> > >>>   nbsp; nbsp; category_id BIGINT,
> > >>>   nbsp; nbsp; behavior STRING,
> > >>>   nbsp; nbsp; ts TIMESTAMP(3),
> > >>>   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
> > >>> 通过计算列产生一个处理时间列
> > >>>   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> > >>> SECOND nbsp;--
> > >>>   在ts上定义watermark,ts成为事件时间列
> > >>>   ) WITH (
> > >>>   nbsp; nbsp; 'connector.type' = 'kafka',
> > nbsp;--
> > >>> 使用 kafka connector
> > >>>   nbsp; nbsp; 'connector.version' = 'universal',
> > >>> nbsp;-- kafka
> > >>>   版本,universal 支持 0.11 以上的版本
> > >>>   nbsp; nbsp; 'connector.topic' = 'user_behavior',
> > >>> nbsp;-- kafka topic
> > >>>   nbsp; nbsp; 'connector.startup-mode' =
> > >>> 'earliest-offset', nbsp;-- 从起始
> > >>>   offset 开始读取
> > >>>   nbsp; nbsp;
> > 'connector.properties.zookeeper.connect'
> > >> =
> > >>> '
> > >>>   192.168.0.150:2181', nbsp;-- zookeeper 地址
> > >>>   nbsp; nbsp;
> > 'connector.properties.bootstrap.servers'
> > >> =
> > >>> '
> > >>>   192.168.0.150:9092', nbsp;-- kafka broker 地址
> > >>>   nbsp; nbsp; 'format.type' = 'json' nbsp;--
> > >> 数据源格式为
> > >>> json
> > >>>   )
> > >>>  
> > >>>   每小时购买数建表语句
> > >>>   CREATE TABLE buy_cnt_per_hour (nbsp;
> > >>>  

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread 刘建刚
Thank you. It is an online job and my input is huge. I check the trace and find 
that the array is resized when the array is not enough. The code is as below:

public void add (int value) {
   int[] items = this.items;
   if (size == items.length) items = resize(Math.max(8, (int)(size * 1.75f)));
   items[size++] = value;
}

Only blink planner has this error. Can it be a thread-safe problem or something 
else? I will try to reproduce it locally. 

> 2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List archive.] 
>  写道:
> 
> Hi,
> 
> Are you using versions < 1.9.2? From the exception stack, it looks like 
> caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
> Could you try it using 1.9.2?
> 
> Best,
> Jark
> 
> On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email] 
> > wrote:
> Can you reproduce this in a local program with mini-cluster?
> 
> Best,
> Kurt
> 
> 
> On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email] 
> > wrote:
> You can read this for this type error.
> 
> https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446
>  
> 
> 
> I would suggest you set break points  in your code. Step through the code, 
> this  method should show you which array variable is being passed a null 
> argument when the array variable is not null able.
> 
> 
> 
> 
> On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email] 
> > wrote:
>   I am using Roaring64NavigableMap to compute uv. It is ok to us flink 
> planner and not ok with blink planner. The SQL is as following:
> SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, 
> A, B, C, D,
> E, uv(bitmap(id)) as bmp
> FROM person
> GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E
> 
>   The udf is as following:
> public static class Bitmap extends AggregateFunction Roaring64NavigableMap> {
>@Override
>public Roaring64NavigableMap createAccumulator() {
>   return new Roaring64NavigableMap();
>}
> 
>@Override
>public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
>   return accumulator;
>}
> 
>public void accumulate(Roaring64NavigableMap bitmap, long id) {
>   bitmap.add(id);
>}
> }
> public static class UV extends ScalarFunction {
>public long eval(Roaring64NavigableMap bitmap) {
>   return bitmap.getLongCardinality();
>}
> }
>   The error is as following:
> 
> 2020-04-20 16:37:13,868 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher-40] - 
> GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, 
> appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 6)], 
> properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, 
> channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS 
> w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
> w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, 
> platform, channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> 
> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) 
> (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
>   at 
> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
>   at 
> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
>   at 
> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
>   at 
> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
>   at 
> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
>   at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
>   at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>   at 
> org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
>   at 
> 

Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi,

I tried to add the following cast, and it works. Doesn't look nice though

/StreamingFileSink
  .forRowFormat(new Path(path), myEncoder)
  .withRollingPolicy(DefaultRollingPolicy.create().build())
 
.withBucketAssigner(myBucketAssigner)*.asInstanceOf[RowFormatBuilder[IN,
String, _]]*
  .build()/

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Job manager URI rpc address:port

2020-04-20 Thread Som Lima
This is the code I was looking for,  which will allow me programmatically
to connect to remote jobmanager same as  spark remote master .
The spark master which shares the compute load with slaves , in the case of
flink jobmanager with taskmanagers.


Configuration conf = new Configuration();
conf.setString("mykey","myvalue");final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(conf);


I found it at the bottom of this page .

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/index.html




On Sun, 19 Apr 2020, 11:02 tison,  wrote:

> You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port"
> options before run the program or take a look at RemoteStreamEnvironment
> which enables configuring host and port.
>
> Best,
> tison.
>
>
> Som Lima  于2020年4月19日周日 下午5:58写道:
>
>> Hi,
>>
>> After running
>>
>> $ ./bin/start-cluster.sh
>>
>> The following line of code defaults jobmanager  to localhost:6123
>>
>> final  ExecutionEnvironment env = Environment.getExecutionEnvironment();
>>
>> which is same on spark.
>>
>> val spark =
>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>
>> However if I wish to run the servers on a different physical computer.
>> Then in Spark I can do it this way using the spark URI in my IDE.
>>
>> Conf =
>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>
>> Can you please tell me the equivalent change to make so I can run my
>> servers and my IDE from different physical computers.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>


Suppressing illegal Access Warnings

2020-04-20 Thread Zahid Rahman
Hi,

*I was getting these warnings, I think these are due to certain version of
Maven libraries which is impacting Java frameworks every where.*

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
org.jboss.netty.util.internal.ByteBufferUtil
(file:/home/kub19/.m2/repository/io/netty/netty/3.10.6.Final/netty-3.10.6.Final.jar)
to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of
org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal
reflective access operations
WARNING: All illegal access operations will be denied in a future release


*I used the following code , Can you see any conflict or unwanted impact
with any Flink functionality ?*
* new DisableAccessWarning().disableAccessWarnings();*

/**
 * Implements the "WordCount" program that computes a simple word
occurrence histogram
 * over text files.
 *
 * The input is a plain text file with lines separated by newline characters.
 *
 * Usage: WordCount --input path --output
path
 * If no parameters are provided, the program is run with default data
from {@link WordCountData}.
 *
 * This example shows how to:
 * 
 * write a simple Flink program.
 * use Tuple data types.
 * write and use user-defined functions.
 * 
 *
 */
public class WordCount {

   // *
   // PROGRAM
   // *

   public static void main(String[] args) throws Exception {

  // disable illegal access warnings
  new  DisableAccessWarning().disableAccessWarnings();


final class DisableAccessWarning {
   public static void disableAccessWarnings() {
  try {
 Class unsafeClass = Class.forName("sun.misc.Unsafe");
 Field field = unsafeClass.getDeclaredField("theUnsafe");
 field.setAccessible(true);
 Object unsafe = field.get(null);

 Method putObjectVolatile =
unsafeClass.getDeclaredMethod("putObjectVolatile", Object.class,
long.class, Object.class);
 Method staticFieldOffset =
unsafeClass.getDeclaredMethod("staticFieldOffset", Field.class);

 Class loggerClass =
Class.forName("jdk.internal.module.IllegalAccessLogger");
 Field loggerField = loggerClass.getDeclaredField("logger");
 Long offset = (Long) staticFieldOffset.invoke(unsafe, loggerField);
 putObjectVolatile.invoke(unsafe, loggerClass, offset, null);
  } catch (Exception ignored) {
  }
   }
}


Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



RocksDB default logging configuration

2020-04-20 Thread Bajaj, Abhinav
Hi,

Some of our teams ran into the disk space issues because of RocksDB default 
logging configuration - 
FLINK-15068.
It seems the workaround suggested uses the OptionsFactory to set some of the 
parameters from inside the job.

Since we provision the Flink cluster(version 1.7.1) for the teams, we control 
the RocksDB statebackend configuration from flink-conf.yaml.
And it seems there isn’t any related RocksDB 
configuration
 to set in flink-conf.yaml.

Is there a way for the job developer to retrieve the default statebackend 
information from the cluster in the job and set the DBOptions on top of it?

Appreciate the help!

~ Abhinav Bajaj

PS:  Sharing below snippet as desired option if possible -

StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = streamExecEnv.getDefaultStateBackend();
stateBackend.setOptions(new OptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions dbOptions) {
  dbOptions.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
  dbOptions.setMaxLogFileSize(1024 * 1024)
  return dbOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
columnFamilyOptions) {
  return columnFamilyOptions;
}
});




Re: Problem getting watermark right with event time

2020-04-20 Thread Fabian Hueske
Hi Sudan,

I noticed a few issues with your code:

1) Please check the computation of timestamps. Your code

public long extractAscendingTimestamp(Eventi.Event element) {
  return element.getEventTime().getSeconds() * 1000;
}

only seems to look at the seconds of a timestamp. Typically, you would just
return the whole timestamp encoded as a long that represents the
milliseconds since epoch (1970-01-01 00:00:00.000).
Why do you multiple with 1000?

2) An AscendingTimestampExtractor assumes that records arrive with strictly
ascending timestamps.
If the timestamps in your data are slightly out of order, you probably want
another watermark assigner for example a
BoundedOutOfOrdernessTimestampExtractor [1].

3) You probably don't want to key on event time:

keyBy(Eventi.Event::getEventTime)

Usually, you choose a partitioning key here. If you cannot partition your
data and all records should be grouped in the single stream of windows you
should use DataStream.windowAll().
Note however, that this means that your code cannot run in parallel. See
[2] for details.

 Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#keyed-vs-non-keyed-windows

Am So., 19. Apr. 2020 um 21:37 Uhr schrieb Sudan S :

> Hi,
>
> I am having a problem getting watermark right. The setup is
> - I have a Flink Job which reads from a Kafka topic, uses Protobuf
> Deserialization, uses Sliding Window of (120seconds, 30 seconds), sums up
> the value and finally returns the result.
>
> The code is pasted below.
>
> The problem here is, I'm not able to reach the sink. I am able to reach
> the assignTimestamp when the timestamp arrives, but past that, neither
> process function nor the sink function is getting invoked in spite of
> pumping events regularly. I'm not able to figure out how to debug this
> issue.
> Plz help.
>
> public class StreamingJob {
>
> public static void main(String[] args) throws Exception {
>
> Properties kafkaConsumerProps = new Properties();
> kafkaConsumerProps.setProperty("bootstrap.servers",
> "{bootstrap_servers}");
> kafkaConsumerProps.setProperty("group.id", "{group_id}");
>
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
> Configuration());
> env.enableCheckpointing(100);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setMaxParallelism(5);
> env.setParallelism(5);
>
> SingleOutputStreamOperator texStream = env
> .addSource(new FlinkKafkaConsumer011<>("auth", new
> EventiSchema(), kafkaConsumerProps)).setParallelism(5).setMaxParallelism(5);
> SlidingEventTimeWindows window =
> SlidingEventTimeWindows.of(Time.seconds(120), Time.seconds(30));
> texStream.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor() {
> @Override
> public long extractAscendingTimestamp(Eventi.Event element) {
> return element.getEventTime().getSeconds() * 1000;
> }
> }).keyBy(Eventi.Event::getEventTime).window(window).process(new
> ProcessWindowFunction() {
> @Override
> public void process(Timestamp timestamp, Context context,
> Iterable elements, Collector out) throws Exception {
> int sum = 0;
> for (Eventi.Event element : elements) {
> sum++;
> }
> out.collect(sum);
> }
> }).print()
>
> env.execute();
> }
> }
>
> --
> *"The information contained in this e-mail and any accompanying documents
> may contain information that is confidential or otherwise protected from
> disclosure. If you are not the intended recipient of this message, or if
> this message has been addressed to you in error, please immediately alert
> the sender by replying to this e-mail and then delete this message,
> including any attachments. Any dissemination, distribution or other use of
> the contents of this message by anyone other than the intended recipient is
> strictly prohibited. All messages sent to and from this e-mail address may
> be monitored as permitted by applicable law and regulations to ensure
> compliance with our internal policies and to protect our business."*
> --
>


Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema

2020-04-20 Thread Fabian Hueske
Hi Anil,

Here's a pointer to Flink's end-2-end test that's checking the integration
with schema registry [1].
It was recently updated so I hope it works the same way in Flink 1.9.

Best,
Fabian

[1]
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java

Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K :

> Hi,
>
> What is the best way to use Confluent SchemaRegistry with
> FlinkKafkaProducer?
>
> What I have right now is as follows.
>
> SerializationSchema serializationSchema =
> ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, 
> schemaRegistryUrl);
>
> FlinkKafkaProducer kafkaProducer =
> new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig);
> outputStream.addSink(producer);
>
> FlinkKafkaProducer with is SerializationSchema now depricated.
>
> I am using flink 1.9.
>
> How to use FlinkKafkaProducer with KafkaSerializationSchema with 
> ConfluentSchemaRegsitry?
>
> Is there some reference/documentation i could use?
>
> Thanks , Anil.
>
>


Running in LocalExecutionEnvironment in production

2020-04-20 Thread Suraj Puvvada
Hello,

We currently have a lot of jobs running in LocalExecutionEnvorinment and
wanted to understand the limitations and if it is recommended to run in
this mode.

Appreciate your thoughts on this.

Thanks
Suraj


Changing number of partitions for a topic

2020-04-20 Thread Suraj Puvvada
Hello,

I have a flink job that reads from a source topic that currently has 4
partitions and I need to increase the partition count to 8.

Do you need to restart the job for that to take effect ?
How does it work in case there is persistent state (like a window operator)
involved ?

Any design documents on how partition mapping works would be very helpful.

Thanks
Suraj


StreamQueryConfig vs TemporalTableFunction

2020-04-20 Thread Dominik Wosiński
Hey,
I wanted to ask whether the TemporalTableFunctions are subject to
StreamQueryConfig retention? I was pretty sure that they are not, but I
have recently noticed weird behavior in one of my jobs that suggests that
they indeed are.


Thanks for answers,
Best Regards,
Dom.


Distributed Incremental Streaming Graph Analytics: State Accessing/Message Passing Options

2020-04-20 Thread burgeraw
I'm working on a system to process streaming graphs in Flink. I am trying to
maintain the state of the graph within a time window, so I can then run
graph algorithms on it. The goal is to do this with incremental updates, so
the state does not have to be fully recomputed for each window. I figured
keying on source vertex and then storing the adjacent edges in the
ProcessWindowFunction state could be a potential way to achieve this.
However, for this scenario, I am looking for proper ways to distributively
access this streaming graph state from downstream operators (other than
those maintaining the state). So, essentially, how to access state that is
stored in another node than the one doing the processing. 
I also read about Stateful Functions, which I believe could be another
potential way to store the windowed graph state. What do you believe is the
better, more efficient option? Also, are there any other options I should
consider?
Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Cannot register for Flink Forward Conference

2020-04-20 Thread Eleanore Jin
Hi Seth,

Thanks for the prompt response! Yes all my colleagues were able to register.

Best,
Eleanore

On Mon, Apr 20, 2020 at 12:49 PM Seth Wiesman  wrote:

> Hi Eleanore,
>
> There was a misconfiguration on the website if you try again everything
> should work.
>
> Seth
>
> On Mon, Apr 20, 2020 at 1:39 PM Eleanore Jin 
> wrote:
>
>> Hi community,
>>
>> My colleagues tried to register for the Flink forward conference:
>> https://www.bigmarker.com/series/flink-forward-virtual-confer1/series_summit?__hssc=37051071.2.1587407738905&__hstc=37051071.db5b837955b42a71990541edc07d7beb.1587407738904.1587407738904.1587407738904.1=3382611e-f2c7-4bc8-a1f8-3ab5d3322f61%7Cbf3a04dd-df45-4ccf-83b0-8046dc8ab7d2
>>
>> But they saw the following message, and if they click continue, it goes
>> back to the page. I just wonder does the website still allow registration?
>> If so, can someone please help with this?
>>
>> Thanks a lot!
>> Eleanore
>>
>> [image: image.png]
>>
>


Re: Cannot register for Flink Forward Conference

2020-04-20 Thread Seth Wiesman
Hi Eleanore,

There was a misconfiguration on the website if you try again everything
should work.

Seth

On Mon, Apr 20, 2020 at 1:39 PM Eleanore Jin  wrote:

> Hi community,
>
> My colleagues tried to register for the Flink forward conference:
> https://www.bigmarker.com/series/flink-forward-virtual-confer1/series_summit?__hssc=37051071.2.1587407738905&__hstc=37051071.db5b837955b42a71990541edc07d7beb.1587407738904.1587407738904.1587407738904.1=3382611e-f2c7-4bc8-a1f8-3ab5d3322f61%7Cbf3a04dd-df45-4ccf-83b0-8046dc8ab7d2
>
> But they saw the following message, and if they click continue, it goes
> back to the page. I just wonder does the website still allow registration?
> If so, can someone please help with this?
>
> Thanks a lot!
> Eleanore
>
> [image: image.png]
>


Cannot register for Flink Forward Conference

2020-04-20 Thread Eleanore Jin
Hi community,

My colleagues tried to register for the Flink forward conference:
https://www.bigmarker.com/series/flink-forward-virtual-confer1/series_summit?__hssc=37051071.2.1587407738905&__hstc=37051071.db5b837955b42a71990541edc07d7beb.1587407738904.1587407738904.1587407738904.1=3382611e-f2c7-4bc8-a1f8-3ab5d3322f61%7Cbf3a04dd-df45-4ccf-83b0-8046dc8ab7d2

But they saw the following message, and if they click continue, it goes
back to the page. I just wonder does the website still allow registration?
If so, can someone please help with this?

Thanks a lot!
Eleanore

[image: image.png]


Re: flink-1.10 checkpoint 偶尔报 NullPointerException

2020-04-20 Thread Yun Tang
Hi

这个NPE有点奇怪,从executeCheckpointing方法[1]里面其实比较难定位究竟是哪一个变量或者变量的取值是null。
一种排查思路是打开 org.apache.flink.streaming.runtime.tasks 的DEBUG 
level日志,通过debug日志缩小范围,判断哪个变量是null

这个异常出现的时候,相关task上面的日志有什么异常么,触发这个NPE的条件是什么,稳定复现么?

[1] 
https://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1349

祝好
唐云


From: chenkaibit 
Sent: Monday, April 20, 2020 18:39
To: user-zh@flink.apache.org 
Subject: flink-1.10 checkpoint 偶尔报 NullPointerException

大家遇到过这个错误吗, CheckpointOperation.executeCheckpointing 的时候报 NullPointerException
java.lang.Exception: Couldnot perform checkpoint 5505for operator Source: 
KafkaTableSource(xxx) -> SourceConversion(table=[xxx, source: 
[KafkaTableSource(xxx)]], fields=[xxx]) -> Calc(select=[xxx) AS xxx]) -> 
SinkConversionToTuple2 -> Sink: Elasticsearch6UpsertTableSink(xxx) (1/1).

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$228/1024478318.call(UnknownSource)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)

at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:745)

Causedby: java.lang.NullPointerException

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$229/1010499540.run(UnknownSource)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)

... 12 more


Re: FLINK JOB solved

2020-04-20 Thread Som Lima
I found the problem.

in the flink1.0.0/conf

There are two files.
Masters
and slaves

the Masters contains localhost:8081
in the slaves  just localhost.

I changed them both to server  ipaddress.

Now the FLINK JOB link has full :8081 link and displays Apache Flink
Dashboard in browser.




On Mon, 20 Apr 2020, 12:07 Som Lima,  wrote:

> Yes exactly that is the change I am having to make.  Changing FLINK JOB
> default localhost to ip of server computer in the browser.
>
> I followed the instructions as per your
> link.
>
> https://medium.com/@zjffdu/flink-on-zeppelin-part-1-get-started-2591aaa6aa47
>
> i.e. 0.0.0.0  of zeppelin.server.addr. for remote access.
>
>
>
> On Mon, 20 Apr 2020, 10:30 Jeff Zhang,  wrote:
>
>> I see, so you are running flink interpreter in local mode. But you access
>> zeppelin from a remote machine, right ?  Do you mean you can access it
>> after changing localhost to ip ? If so, then I can add one configuration in
>> zeppelin side to replace the localhost to real ip.
>>
>> Som Lima  于2020年4月20日周一 下午4:44写道:
>>
>>> I am only running the zeppelin  word count example by clicking the
>>> zeppelin run arrow.
>>>
>>>
>>> On Mon, 20 Apr 2020, 09:42 Jeff Zhang,  wrote:
>>>
 How do you run flink job ? It should not always be localhost:8081

 Som Lima  于2020年4月20日周一 下午4:33写道:

> Hi,
>
> FLINK JOB  url  defaults to localhost
>
> i.e. localhost:8081.
>
> I have to manually change it to server :8081 to get Apache  flink
> Web Dashboard to display.
>
>
>
>
>

 --
 Best Regards

 Jeff Zhang

>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: 问题请教-flinksql的kafkasource方面

2020-04-20 Thread Jark Wu
Hi,

你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。

Best,
Jark

On Mon, 20 Apr 2020 at 22:33, Benchao Li  wrote:

> 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。
>
> 祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道:
>
> > 我们是1.8版本,但是这段源码应该是没变把
> > // check if all tasks that we need to trigger are running.
> > // if not, abort the checkpoint
> > Execution[] executions = new Execution[tasksToTrigger.length];
> > for (int i = 0; i < tasksToTrigger.length; i++) {
> >Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
> >if (ee == null) {
> >   LOG.info("Checkpoint triggering task {} of job {} is not being
> > executed at the moment. Aborting checkpoint.",
> > tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> > job);
> >   throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >} else if (ee.getState() == ExecutionState.RUNNING) {
> >   executions[i] = ee;
> >} else {
> >   LOG.info("Checkpoint triggering task {} of job {} is not in state
> {}
> > but {} instead. Aborting checkpoint.",
> > tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> > job,
> > ExecutionState.RUNNING,
> > ee.getState());
> >   throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >}
> > }
> > 还是我理解的不对
> >
> > > 2020年4月20日 下午6:21,Benchao Li  写道:
> > >
> > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> > >
> > > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:
> > >
> > >>
> >
> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> | |
> > >> Sun.Zhu
> > >> |
> > >> |
> > >> 邮箱:17626017...@163.com
> > >> |
> > >>
> > >> Signature is customized by Netease Mail Master
> > >>
> > >> 在2020年04月20日 10:37,Benchao Li 写道:
> > >> 应该是不会的。分配不到partition的source会标记为idle状态。
> > >>
> > >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
> > >>
> > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> | |
> > >>> Sun.Zhu
> > >>> |
> > >>> |
> > >>> 邮箱:17626017...@163.com
> > >>> |
> > >>>
> > >>> Signature is customized by Netease Mail Master
> > >>>
> > >>> 在2020年04月19日 22:43,人生若只如初见 写道:
> > >>> 嗯嗯,十分感谢
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> --原始邮件--
> > >>> 发件人:"Benchao Li" > >>> 发送时间:2020年4月19日(星期天) 晚上9:25
> > >>> 收件人:"user-zh" > >>>
> > >>> 主题:Re: 问题请教-flinksql的kafkasource方面
> > >>>
> > >>>
> > >>>
> > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> > >>>
> > >>> Jark Wu  > >>>
> > >>>  Hi,
> > >>> 
> > >>>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> > >>>  根据你的 Java 代码,数据的 event time
> > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> > >>>  能容忍 5s 乱序).
> > >>>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition
> 进度比某些
> > >>> partition
> > >>>  进度快很多的现象,
> > >>> 
> 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> > >>> 
> > >>>  完美的解决方案还需要等 FLIP-27 的完成。
> > >>>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> > >>> 
> > >>>  Best,
> > >>>  Jark
> > >>> 
> > >>> 
> > >>>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  > >>> 
> > >>>   你好
> > >>>  
> > >>>  
> > >>> 
> > >>>
> > >>
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> > >>>  
> > >>>  
> > >>>  
> > >>>   附:
> > >>>   userbehavior建表语句
> > >>>   CREATE TABLE user_behavior (
> > >>>   nbsp; nbsp; user_id BIGINT,
> > >>>   nbsp; nbsp; item_id BIGINT,
> > >>>   nbsp; nbsp; category_id BIGINT,
> > >>>   nbsp; nbsp; behavior STRING,
> > >>>   nbsp; nbsp; ts TIMESTAMP(3),
> > >>>   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
> > >>> 通过计算列产生一个处理时间列
> > >>>   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> > >>> SECOND nbsp;--
> > >>>   在ts上定义watermark,ts成为事件时间列
> > >>>   ) WITH (
> > >>>   nbsp; nbsp; 'connector.type' = 'kafka',
> > nbsp;--
> > >>> 使用 kafka connector
> > >>>   nbsp; nbsp; 'connector.version' = 'universal',
> > >>> nbsp;-- kafka
> > >>>   版本,universal 支持 0.11 以上的版本
> > >>>   nbsp; nbsp; 'connector.topic' = 'user_behavior',
> > >>> nbsp;-- kafka topic
> > >>>   nbsp; nbsp; 'connector.startup-mode' =
> > >>> 'earliest-offset', nbsp;-- 从起始
> > >>>   offset 开始读取
> > >>>   nbsp; nbsp;
> > 'connector.properties.zookeeper.connect'
> > >> =
> > >>> '
> > >>>   192.168.0.150:2181', nbsp;-- zookeeper 地址
> > >>>   nbsp; nbsp;
> > 'connector.properties.bootstrap.servers'
> > >> =
> > >>> '
> > >>>   192.168.0.150:9092', nbsp;-- kafka broker 地址
> > >>>   nbsp; nbsp; 'format.type' = 'json' nbsp;--
> > >> 数据源格式为
> > >>> json
> > >>>   )
> > >>>  
> > >>>   每小时购买数建表语句
> > >>>   CREATE TABLE buy_cnt_per_hour (nbsp;
> > >>>   nbsp; nbsp; hour_of_day BIGINT,
> > >>>   nbsp; nbsp; buy_cnt BIGINT
> > >>>   ) WITH (
> > >>>   nbsp; nbsp; 'connector.type' = 'elasticsearch',

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread Jark Wu
Hi,

Are you using versions < 1.9.2? From the exception stack, it looks like
caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
Could you try it using 1.9.2?

Best,
Jark

On Mon, 20 Apr 2020 at 21:00, Kurt Young  wrote:

> Can you reproduce this in a local program with mini-cluster?
>
> Best,
> Kurt
>
>
> On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman  wrote:
>
>> You can read this for this type error.
>>
>>
>> https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446
>>
>> I would suggest you set break points  in your code. Step through the
>> code, this  method should show you which array variable is being passed a
>> null argument when the array variable is not null able.
>>
>>
>>
>>
>> On Mon, 20 Apr 2020, 10:07 刘建刚,  wrote:
>>
>>>I am using Roaring64NavigableMap to compute uv. It is ok to us
>>> flink planner and not ok with blink planner. The SQL is as following:
>>>
>>> SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as 
>>> curTimestamp, A, B, C, D,
>>> E, uv(bitmap(id)) as bmp
>>> FROM person
>>> GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E
>>>
>>>
>>>   The udf is as following:
>>>
>>> public static class Bitmap extends AggregateFunction>> Roaring64NavigableMap> {
>>>@Override
>>>public Roaring64NavigableMap createAccumulator() {
>>>   return new Roaring64NavigableMap();
>>>}
>>>
>>>@Override
>>>public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) 
>>> {
>>>   return accumulator;
>>>}
>>>
>>>public void accumulate(Roaring64NavigableMap bitmap, long id) {
>>>   bitmap.add(id);
>>>}
>>> }
>>>
>>> public static class UV extends ScalarFunction {
>>>public long eval(Roaring64NavigableMap bitmap) {
>>>   return bitmap.getLongCardinality();
>>>}
>>> }
>>>
>>>   The error is as following:
>>>
>>> 2020-04-20 16:37:13,868 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>  [flink-akka.actor.default-dispatcher-40]  -
>>> GroupWindowAggregate(groupBy=[brand, platform, channel, versionName,
>>> appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 6)],
>>> properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand,
>>> platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5,
>>> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
>>> proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS
>>> curTimestamp, brand, platform, channel, versionName, appMajorVersion,
>>> uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink:
>>> Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING
>>> to FAILED.
>>> java.lang.ArrayIndexOutOfBoundsException: -1
>>>   at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>>   at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
>>>   at
>>> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
>>>   at
>>> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
>>>   at
>>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
>>>   at
>>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
>>>   at
>>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
>>>   at
>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
>>>   at
>>> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
>>>   at
>>> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
>>>   at
>>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>>>   at
>>> org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
>>>   at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>>>   at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>>>   at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>>>   at
>>> 

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
Thanks Timo,
I can see why this is pretty complicated to solve nicely at the moment (and
in general).
We will work around this for now, and looking forward to help make this
better in the future!

Gyula


On Mon, Apr 20, 2020 at 4:37 PM Timo Walther  wrote:

> Hi Gyula,
>
> first of all the exception
>
> ```
> org.apache.flink.table.api.TableException: Rowtime attributes must not
> be in the input rows of a regular join. As a workaround you can cast the
> time attributes of input tables to TIMESTAMP before.
> ```
>
> is IMHO one of the biggest shortcomings that we currently have in the
> planners due to internals around time interval joins [0]. But this is a
> different topic.
>
> I think in theory Gyula is right, however, we would need to store the
> static table somewhere in order to perform lookups while the stream is
> passing by. And while checking the time attributes we would need to know
> which table is bounded and what kind of changes are coming into the
> streaming table.
>
> There is still a lot of work in the future to make the concepts smoother.
>
> Regards,
> Timo
>
>
> [0] https://issues.apache.org/jira/browse/FLINK-10211
>
>
>
>
>
> On 20.04.20 16:09, Gyula Fóra wrote:
> > The HiveTableSource (and many others) return isBounded() -> true.
> > In this case it is not even possible for it to change over time, so I am
> > a bit confused.
> >
> > To me it sounds like you should always be able to join a stream against
> > a bounded table, temporal or not it is pretty well defined.
> > Maybe there is some fundamental concept that I dont understand, I don't
> > have much experience with this to be fair.
> >
> > Gyula
> >
> > On Mon, Apr 20, 2020 at 4:03 PM Kurt Young  > > wrote:
> >
> > The reason here is Flink doesn't know the hive table is static.
> > After you create these two tables and
> > trying to join them, Flink will assume both table will be changing
> > with time.
> >
> > Best,
> > Kurt
> >
> >
> > On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra  > > wrote:
> >
> > Hi!
> >
> > The problem here is that I dont have a temporal table.
> >
> > I have a regular stream from kafka (with even time attribute)
> > and a static table in hive.
> > The Hive table is static, it doesn't change. It doesn't have any
> > time attribute, it's not temporal.
> >
> > Gyula
> >
> > On Mon, Apr 20, 2020 at 3:43 PM godfrey he  > > wrote:
> >
> > Hi Gyual,
> >
> > Can you convert the regular join to lookup join (temporal
> > join) [1],
> > and then you can use window aggregate.
> >
> >  >  I understand that the problem is that we cannot join
> > with the Hive table and still maintain the watermark/even
> > time column. But why is this?
> > Regular join can't maintain the time attribute as increasing
> > trend (one record may be joined with a very old record),
> > that means the watermark does not also been guaranteed to
> > increase.
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
> >
> > Best,
> > Godfrey
> >
> > Gyula Fóra  > > 于2020年4月20日周一 下午4:46
> > 写道:
> >
> > Hi All!
> >
> > We hit a the following problem with SQL and trying to
> > understand if there is a valid workaround.
> >
> > We have 2 tables:
> >
> > _Kafka_
> > timestamp (ROWTIME)
> > item
> > quantity
> >
> > _Hive_
> > item
> > price
> >
> > So we basically have incoming (ts, id, quantity) and we
> > want to join it with the hive table to get the total
> > price (price * quantity) got the current item.
> >
> > After this we want to create window aggregate on
> > quantity*price windowed on timestamp (event time
> attribute).
> >
> > In any way we formulate this query we hit the following
> > error:
> > org.apache.flink.table.api.TableException: Rowtime
> > attributes must not be in the input rows of a regular
> > join. As a workaround you can cast the time attributes
> > of input tables to TIMESTAMP before.
> >
> >   I understand that the problem is that we cannot join
> > with the Hive table and still maintain the
> > watermark/even time column. But why is this?
> >
> > In datastream world I would just simply assign Max
> > watermark to my enrichment 

Re: Flink Serialization as stable (kafka) output format?

2020-04-20 Thread Theo Diefenthal
Hi Robert, 

Thank you very much for pointing me to the nice blog post. 

It aligns with my readings that the flink serializer is fast, outperfoms avro 
(especially reflect) and still supports schema evolution well. So nice done job 
@flink :) 

But as Arvid says, Avro is compatible with much more languages than just JVM. 
Who knows how my project grows in future and whether there are going to be 
people wanting to connect to Kafka with Python/Go/whatever, ... As long as you 
don't recommend, I won't jump into using flink as external serializer but keep 
with the internal usages of PojoSerializer everywhere :) 

I'll definitely also keep in mind that avro reflect performs much worse 
compared to avro specific/generic then I expected. 

Best regards 
Theo 



Von: "Robert Metzger"  
An: "Arvid Heise"  
CC: "Theo Diefenthal" , "user" 
 
Gesendet: Sonntag, 19. April 2020 08:23:42 
Betreff: Re: Flink Serialization as stable (kafka) output format? 

Hey Theo, 
we recently published a blog post that answers your request for a comparison 
between Kryo and Avro in Flink: [ 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html 
| 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html 
] 

On Tue, Mar 10, 2020 at 9:27 AM Arvid Heise < [ mailto:ar...@ververica.com | 
ar...@ververica.com ] > wrote: 



Hi Theo, 

I strongly discourage the use of flink serialization for persistent storage of 
data. It was never intended to work in this way and does not offer the benefits 
of Avro of lazy schema evolution and maturity. 

Unless you can explicitly measure that Avro is a bottleneck in your setup, 
stick with it. It's the preferred way to store data in Kafka for a reason. It's 
mature, supports plenty of languages, and the schema evolution feature will 
save you so many headaches in the future. 

If it turns out to be a bottleneck, the most logical alternative is protobuf. 
Kryo is even worse than Flink serializer for Kafka. In general, realistically 
speaking, it's so much more cost-effective to just add another node to your 
Flink cluster and use Avro than coming up with any clever solution (just assume 
that you need at least one man month to implement and do the math). 

And btw, you should always use generated Java/scala classes if possible for 
Avro. It's faster and offers a much nicer development experience. 

On Mon, Mar 9, 2020 at 3:57 PM Robert Metzger < [ mailto:rmetz...@apache.org | 
rmetz...@apache.org ] > wrote: 

BQ_BEGIN

Hi Theo, 


BQ_BEGIN
However, in most benchmarks, avro turns out to be rather slow in terms of CPU 
cycles ( e.g. [ https://github.com/eishay/jvm-serializers/wiki | [1] ] ) 


Avro is slower compared to what? 
You should not only benchmark the CPU cycles for serializing the data. If you 
are sending JSON strings across the network, you'll probably have a lot more 
bytes to send across the network, making everything slower (usually network is 
slower than CPU) 

One of the reasons why people use Avro it supports schema evolution. 

Regarding your questions: 
1. For this use case, you can use the Flink data format as an internal message 
format (between the star architecture jobs) 
2. Generally speaking no 
3. You will at leave have a dependency to flink-core. And this is a somewhat 
custom setup, so you might be facing breaking API changes. 
4. I'm not aware of any benchmarks. The Flink serializers are mostly for 
internal use (between our operators), Kryo is our fallback (to not suffer to 
much from the not invented here syndrome), while Avro is meant for cross-system 
serialization. 

I have the feeling that you can move ahead with using Flink's Pojo serializer 
everywhere :) 

Best, 
Robert 



On Wed, Mar 4, 2020 at 1:04 PM Theo Diefenthal < [ 
mailto:theo.diefent...@scoop-software.de | theo.diefent...@scoop-software.de ] 
> wrote: 

BQ_BEGIN

Hi, 

Without knowing too much about flink serialization, I know that Flinks states 
that it serializes POJOtypes much faster than even the fast Kryo for Java. I 
further know that it supports schema evolution in the same way as avro. 

In our project, we have a star architecture, where one flink job produces 
results into a kafka topic and where we have multiple downstream consumers from 
that kafka topic (Mostly other flink jobs). 
For fast development cycles, we currently use JSON as output format for the 
kafka topic due to easy debugging capabilities and best migration 
possibilities. However, when scaling up, we need to switch to a more efficient 
format. Most often, Avro is mentioned in combination with a schema registry, as 
its much more efficient then JSON where essentially, each message contains the 
schema as well. However, in most benchmarks, avro turns out to be rather slow 
in terms of CPU cycles ( e.g. [ https://github.com/eishay/jvm-serializers/wiki 
| [1] ] ) 

My question(s) now: 
1. Is it reasonable to use flink serializers as message format in Kafka? 
2. Are there any 

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Timo Walther

Hi Gyula,

first of all the exception

```
org.apache.flink.table.api.TableException: Rowtime attributes must not 
be in the input rows of a regular join. As a workaround you can cast the 
time attributes of input tables to TIMESTAMP before.

```

is IMHO one of the biggest shortcomings that we currently have in the 
planners due to internals around time interval joins [0]. But this is a 
different topic.


I think in theory Gyula is right, however, we would need to store the 
static table somewhere in order to perform lookups while the stream is 
passing by. And while checking the time attributes we would need to know 
which table is bounded and what kind of changes are coming into the 
streaming table.


There is still a lot of work in the future to make the concepts smoother.

Regards,
Timo


[0] https://issues.apache.org/jira/browse/FLINK-10211





On 20.04.20 16:09, Gyula Fóra wrote:

The HiveTableSource (and many others) return isBounded() -> true.
In this case it is not even possible for it to change over time, so I am 
a bit confused.


To me it sounds like you should always be able to join a stream against 
a bounded table, temporal or not it is pretty well defined.
Maybe there is some fundamental concept that I dont understand, I don't 
have much experience with this to be fair.


Gyula

On Mon, Apr 20, 2020 at 4:03 PM Kurt Young > wrote:


The reason here is Flink doesn't know the hive table is static.
After you create these two tables and
trying to join them, Flink will assume both table will be changing
with time.

Best,
Kurt


On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra mailto:gyula.f...@gmail.com>> wrote:

Hi!

The problem here is that I dont have a temporal table.

I have a regular stream from kafka (with even time attribute)
and a static table in hive.
The Hive table is static, it doesn't change. It doesn't have any
time attribute, it's not temporal.

Gyula

On Mon, Apr 20, 2020 at 3:43 PM godfrey he mailto:godfre...@gmail.com>> wrote:

Hi Gyual,

Can you convert the regular join to lookup join (temporal
join) [1],
and then you can use window aggregate.

 >  I understand that the problem is that we cannot join
with the Hive table and still maintain the watermark/even
time column. But why is this?
Regular join can't maintain the time attribute as increasing
trend (one record may be joined with a very old record),
that means the watermark does not also been guaranteed to
increase.


https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table

Best,
Godfrey

Gyula Fóra mailto:gyula.f...@gmail.com>> 于2020年4月20日周一 下午4:46
写道:

Hi All!

We hit a the following problem with SQL and trying to
understand if there is a valid workaround.

We have 2 tables:

_Kafka_
timestamp (ROWTIME)
item
quantity

_Hive_
item
price

So we basically have incoming (ts, id, quantity) and we
want to join it with the hive table to get the total
price (price * quantity) got the current item.

After this we want to create window aggregate on
quantity*price windowed on timestamp (event time attribute).

In any way we formulate this query we hit the following
error:
org.apache.flink.table.api.TableException: Rowtime
attributes must not be in the input rows of a regular
join. As a workaround you can cast the time attributes
of input tables to TIMESTAMP before.

  I understand that the problem is that we cannot join
with the Hive table and still maintain the
watermark/even time column. But why is this?

In datastream world I would just simply assign Max
watermark to my enrichment input and join outputs will
get the ts of the input record. Can I achieve something
similar in SQL/Table api?

Thank you!
Gyula





Re: 问题请教-flinksql的kafkasource方面

2020-04-20 Thread Benchao Li
我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。

祝尚 <17626017...@163.com> 于2020年4月20日周一 下午10:29写道:

> 我们是1.8版本,但是这段源码应该是没变把
> // check if all tasks that we need to trigger are running.
> // if not, abort the checkpoint
> Execution[] executions = new Execution[tasksToTrigger.length];
> for (int i = 0; i < tasksToTrigger.length; i++) {
>Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
>if (ee == null) {
>   LOG.info("Checkpoint triggering task {} of job {} is not being
> executed at the moment. Aborting checkpoint.",
> tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> job);
>   throw new
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
>} else if (ee.getState() == ExecutionState.RUNNING) {
>   executions[i] = ee;
>} else {
>   LOG.info("Checkpoint triggering task {} of job {} is not in state {}
> but {} instead. Aborting checkpoint.",
> tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> job,
> ExecutionState.RUNNING,
> ee.getState());
>   throw new
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
>}
> }
> 还是我理解的不对
>
> > 2020年4月20日 下午6:21,Benchao Li  写道:
> >
> > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> >
> > Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:
> >
> >>
> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
> >>
> >>
> >>
> >>
> >>
> >> | |
> >> Sun.Zhu
> >> |
> >> |
> >> 邮箱:17626017...@163.com
> >> |
> >>
> >> Signature is customized by Netease Mail Master
> >>
> >> 在2020年04月20日 10:37,Benchao Li 写道:
> >> 应该是不会的。分配不到partition的source会标记为idle状态。
> >>
> >> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
> >>
> >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> >>>
> >>>
> >>>
> >>>
> >>> | |
> >>> Sun.Zhu
> >>> |
> >>> |
> >>> 邮箱:17626017...@163.com
> >>> |
> >>>
> >>> Signature is customized by Netease Mail Master
> >>>
> >>> 在2020年04月19日 22:43,人生若只如初见 写道:
> >>> 嗯嗯,十分感谢
> >>>
> >>>
> >>>
> >>>
> >>> --原始邮件--
> >>> 发件人:"Benchao Li" >>> 发送时间:2020年4月19日(星期天) 晚上9:25
> >>> 收件人:"user-zh" >>>
> >>> 主题:Re: 问题请教-flinksql的kafkasource方面
> >>>
> >>>
> >>>
> >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> >>>
> >>> Jark Wu  >>>
> >>>  Hi,
> >>> 
> >>>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> >>>  根据你的 Java 代码,数据的 event time
> >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> >>>  能容忍 5s 乱序).
> >>>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> >>> partition
> >>>  进度快很多的现象,
> >>>  导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> >>> 
> >>>  完美的解决方案还需要等 FLIP-27 的完成。
> >>>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> >>> 
> >>>  Best,
> >>>  Jark
> >>> 
> >>> 
> >>>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  >>> 
> >>>   你好
> >>>  
> >>>  
> >>> 
> >>>
> >>
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> >>>  
> >>>  
> >>>  
> >>>   附:
> >>>   userbehavior建表语句
> >>>   CREATE TABLE user_behavior (
> >>>   nbsp; nbsp; user_id BIGINT,
> >>>   nbsp; nbsp; item_id BIGINT,
> >>>   nbsp; nbsp; category_id BIGINT,
> >>>   nbsp; nbsp; behavior STRING,
> >>>   nbsp; nbsp; ts TIMESTAMP(3),
> >>>   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
> >>> 通过计算列产生一个处理时间列
> >>>   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> >>> SECOND nbsp;--
> >>>   在ts上定义watermark,ts成为事件时间列
> >>>   ) WITH (
> >>>   nbsp; nbsp; 'connector.type' = 'kafka',
> nbsp;--
> >>> 使用 kafka connector
> >>>   nbsp; nbsp; 'connector.version' = 'universal',
> >>> nbsp;-- kafka
> >>>   版本,universal 支持 0.11 以上的版本
> >>>   nbsp; nbsp; 'connector.topic' = 'user_behavior',
> >>> nbsp;-- kafka topic
> >>>   nbsp; nbsp; 'connector.startup-mode' =
> >>> 'earliest-offset', nbsp;-- 从起始
> >>>   offset 开始读取
> >>>   nbsp; nbsp;
> 'connector.properties.zookeeper.connect'
> >> =
> >>> '
> >>>   192.168.0.150:2181', nbsp;-- zookeeper 地址
> >>>   nbsp; nbsp;
> 'connector.properties.bootstrap.servers'
> >> =
> >>> '
> >>>   192.168.0.150:9092', nbsp;-- kafka broker 地址
> >>>   nbsp; nbsp; 'format.type' = 'json' nbsp;--
> >> 数据源格式为
> >>> json
> >>>   )
> >>>  
> >>>   每小时购买数建表语句
> >>>   CREATE TABLE buy_cnt_per_hour (nbsp;
> >>>   nbsp; nbsp; hour_of_day BIGINT,
> >>>   nbsp; nbsp; buy_cnt BIGINT
> >>>   ) WITH (
> >>>   nbsp; nbsp; 'connector.type' = 'elasticsearch', --
> 使用
> >>> elasticsearch
> >>>   connector
> >>>   nbsp; nbsp; 'connector.version' = '6', nbsp;--
> >>> elasticsearch 版本,6 能支持
> >>>   es 6+ 以及 7+ 的版本
> >>>   nbsp; nbsp; 'connector.hosts' = '
> >>> http://192.168.0.150:9200', nbsp;--
> >>>   elasticsearch 地址
> >>>   nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour',
> >>> nbsp;--
> >>>   elasticsearch 索引名,相当于数据库的表名
> >>>   nbsp; nbsp; 'connector.document-type' =
> >>> 'user_behavior', --
> >>>   elasticsearch 的 

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-20 Thread Yang Wang
Hi till, thanks for the feedback and suggestion.

I think it make senses to only support flink-dist-*.jar at the first step.
Just as your suggestion,
the config option could be "yarn.submission.automatic-flink-dist-upload",
default is true. Users
could use "-yt/--yarnship" to specify a HDFS path that contains
flink-dist-*.jar and set the above
config option to "false" to disable flink-dist-*.jar uploading.

Best,
Yang

Till Rohrmann  于2020年4月20日周一 下午8:07写道:

> Thanks for the clarification Yang. Now it makes sense to me.
>
> If it makes things easier, then I would still go first with the dead
> simple solution to turn automatic upload of local dist off via a
> configuration option before trying to implement a smart solution
> which relies on pattern matching or something else. For example, users
> might specify a remote location which is not accessible from the client.
> Then one could not figure out which files are already uploaded. The smart
> solution could be a follow up step then.
>
> Cheers,
> Till
>
> On Mon, Apr 20, 2020 at 1:09 PM Yang Wang  wrote:
>
>> Hi till,
>>
>> Sorry for that i do not giving a detailed explanation of the
>> optimization. Actually, the optimization contains
>> the following two parts.
>> * Use remote uploaded jars to avoid unnecessary uploading(e.g.
>> flink-dist-*.jar, user jars, dependencies).
>> this could be done via enriching "-yt/--yarnship" to support remote ship
>> files.
>> * Use the "PUBLIC" or "PRIVATE" visibility of YARN local resource to
>> avoid unnecessary downloading. When
>> a local resource is public, once it is download by YARN NodeManager, it
>> could be reused by all the application
>> in the same NodeManager.
>>
>> >> Why do we need to specify the visibility of the remote files? Won't
>>> the visibility be specified when uploading these files?
>>
>> It is mostly for the users who want to eliminate the unnecessary
>> downloading so that the container could be
>> launched faster. "PRIVATE" means the remote jars could be shared by the
>> applications submitted by the current user.
>> "PUBLIC" means the remote jars could be shared by all the Flink
>> applications. And "APPLICATION" means they
>> could only be shared by the containers of the current application in a
>> same NodeManager.
>>
>>
>> For the implementation, i think we could do it step by step.
>> * Enrich "-yt/--yarnship" to support HDFS directory
>> * Add a new config option to control whether to avoid the unnecessary
>> uploading
>> * Enrich "-yt/--yarnship" to specify local resource visibility
>>
>>
>> Best,
>> Yang
>>
>>
>>
>> Till Rohrmann  于2020年4月20日周一 下午5:26写道:
>>
>>> Shall we say for the first version we only can deactivate the upload of
>>> local files instead of doing some optimizations? I guess my problem is that
>>> I don't fully understand the optimizations yet. Maybe we introduce a power
>>> user config option `yarn.submission.automatic-flink-dist-upload` or so.
>>>
>>> Why do we need to specify the visibility of the remote files? Won't the
>>> visibility be specified when uploading these files?
>>>
>>> Apart from that, the proposal looks good to me.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Apr 20, 2020 at 5:38 AM Yang Wang  wrote:
>>>
 Hi tison,

 I think i get your concerns and points.

 Take both FLINK-13938[1] and FLINK-14964[2] into account, i will do in
 the following steps.
 * Enrich "-yt/--yarnship" to support HDFS directory
 * Enrich "-yt/--yarnship" to specify local resource visibility. It is
 "APPLICATION" by default. It could be also configured to "PUBLIC",
 which means shared by all applications, or "PRIVATE" which means shared
 by a same user.
 * Add a new config option to control whether to optimize the
 submission(default is false). When configured to true, Flink client will
 try to filter the jars and files by name and size to avoid unnecessary
 uploading.

 A very rough submission command could be issued as following.
 *./bin/flink run -m yarn-cluster -d -yt
 hdfs://myhdfs/flink/release/flink-1.11:PUBLIC,hdfs://myhdfs/user/someone/mylib
 \*
 *-yD yarn.submission-optimization.enable=true
 examples/streaming/WindowJoin.jar*

 cc @Rong Rong , since you also help to review the
 old PR of FLINK-13938, maybe you could also share some thoughts.


 [1]. https://issues.apache.org/jira/browse/FLINK-13938
 [2]. https://issues.apache.org/jira/browse/FLINK-14964


 Best,
 Yang



 tison  于2020年4月18日周六 下午12:12写道:

> Hi Yang,
>
> Name filtering & schema special handling makes sense for me. We can
> enrich later if there is requirement without breaking interface.
>
> For #1, from my perspective your first proposal is
>
>   having an option specifies remote flink/lib, then we turn off auto
> uploading local flink/lib and register that path as local resources
>
> It seems we here add 

Re: 问题请教-flinksql的kafkasource方面

2020-04-20 Thread 祝尚
我们是1.8版本,但是这段源码应该是没变把
// check if all tasks that we need to trigger are running.
// if not, abort the checkpoint
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
   Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
   if (ee == null) {
  LOG.info("Checkpoint triggering task {} of job {} is not being executed 
at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
  throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
   } else if (ee.getState() == ExecutionState.RUNNING) {
  executions[i] = ee;
   } else {
  LOG.info("Checkpoint triggering task {} of job {} is not in state {} but 
{} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
  throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
   }
}
还是我理解的不对

> 2020年4月20日 下午6:21,Benchao Li  写道:
> 
> 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> 
> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:
> 
>> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
>> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
>> 
>> 
>> 
>> 
>> 
>> | |
>> Sun.Zhu
>> |
>> |
>> 邮箱:17626017...@163.com
>> |
>> 
>> Signature is customized by Netease Mail Master
>> 
>> 在2020年04月20日 10:37,Benchao Li 写道:
>> 应该是不会的。分配不到partition的source会标记为idle状态。
>> 
>> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
>> 
>>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
>>> 
>>> 
>>> 
>>> 
>>> | |
>>> Sun.Zhu
>>> |
>>> |
>>> 邮箱:17626017...@163.com
>>> |
>>> 
>>> Signature is customized by Netease Mail Master
>>> 
>>> 在2020年04月19日 22:43,人生若只如初见 写道:
>>> 嗯嗯,十分感谢
>>> 
>>> 
>>> 
>>> 
>>> --原始邮件--
>>> 发件人:"Benchao Li">> 发送时间:2020年4月19日(星期天) 晚上9:25
>>> 收件人:"user-zh">> 
>>> 主题:Re: 问题请教-flinksql的kafkasource方面
>>> 
>>> 
>>> 
>>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
>>> 
>>> Jark Wu >> 
>>>  Hi,
>>> 
>>>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
>>>  根据你的 Java 代码,数据的 event time
>>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
>>>  能容忍 5s 乱序).
>>>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
>>> partition
>>>  进度快很多的现象,
>>>  导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
>>> 
>>>  完美的解决方案还需要等 FLIP-27 的完成。
>>>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
>>> 
>>>  Best,
>>>  Jark
>>> 
>>> 
>>>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 >> 
>>>   你好
>>>  
>>>  
>>> 
>>> 
>> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
>>>  
>>>  
>>>  
>>>   附:
>>>   userbehavior建表语句
>>>   CREATE TABLE user_behavior (
>>>   nbsp; nbsp; user_id BIGINT,
>>>   nbsp; nbsp; item_id BIGINT,
>>>   nbsp; nbsp; category_id BIGINT,
>>>   nbsp; nbsp; behavior STRING,
>>>   nbsp; nbsp; ts TIMESTAMP(3),
>>>   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
>>> 通过计算列产生一个处理时间列
>>>   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
>>> SECOND nbsp;--
>>>   在ts上定义watermark,ts成为事件时间列
>>>   ) WITH (
>>>   nbsp; nbsp; 'connector.type' = 'kafka', nbsp;--
>>> 使用 kafka connector
>>>   nbsp; nbsp; 'connector.version' = 'universal',
>>> nbsp;-- kafka
>>>   版本,universal 支持 0.11 以上的版本
>>>   nbsp; nbsp; 'connector.topic' = 'user_behavior',
>>> nbsp;-- kafka topic
>>>   nbsp; nbsp; 'connector.startup-mode' =
>>> 'earliest-offset', nbsp;-- 从起始
>>>   offset 开始读取
>>>   nbsp; nbsp; 'connector.properties.zookeeper.connect'
>> =
>>> '
>>>   192.168.0.150:2181', nbsp;-- zookeeper 地址
>>>   nbsp; nbsp; 'connector.properties.bootstrap.servers'
>> =
>>> '
>>>   192.168.0.150:9092', nbsp;-- kafka broker 地址
>>>   nbsp; nbsp; 'format.type' = 'json' nbsp;--
>> 数据源格式为
>>> json
>>>   )
>>>  
>>>   每小时购买数建表语句
>>>   CREATE TABLE buy_cnt_per_hour (nbsp;
>>>   nbsp; nbsp; hour_of_day BIGINT,
>>>   nbsp; nbsp; buy_cnt BIGINT
>>>   ) WITH (
>>>   nbsp; nbsp; 'connector.type' = 'elasticsearch', -- 使用
>>> elasticsearch
>>>   connector
>>>   nbsp; nbsp; 'connector.version' = '6', nbsp;--
>>> elasticsearch 版本,6 能支持
>>>   es 6+ 以及 7+ 的版本
>>>   nbsp; nbsp; 'connector.hosts' = '
>>> http://192.168.0.150:9200', nbsp;--
>>>   elasticsearch 地址
>>>   nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour',
>>> nbsp;--
>>>   elasticsearch 索引名,相当于数据库的表名
>>>   nbsp; nbsp; 'connector.document-type' =
>>> 'user_behavior', --
>>>   elasticsearch 的 type,相当于数据库的库名
>>>   nbsp; nbsp; 'connector.bulk-flush.max-actions' = '1',
>>> nbsp;-- 每条数据都刷新
>>>   nbsp; nbsp; 'format.type' = 'json', nbsp;--
>>> 输出数据格式 json
>>>   nbsp; nbsp; 'update-mode' = 'append'
>>>   )
>>>  
>>>   插入语句
>>>   INSERT INTO buy_cnt_per_hournbsp;
>>>   SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
>>> HOUR)),COUNT(*)nbsp;
>>>   FROM user_behavior
>>>   WHERE behavior = 'buy'
>>>   GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
>>>  

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
Thanks for the clarification, we can live with this restriction I
just wanted to make sure that I fully understand why we are getting
these errors and if there is any reasonable workaround.

Thanks again :)
Gyula

On Mon, Apr 20, 2020 at 4:21 PM Kurt Young  wrote:

> According to the current implementation, yes you are right hive table
> source will always be bounded.
> But conceptually, we can't do this assumption. For example, we
> might further improve hive table source
> to also support unbounded cases, .e.g. monitoring hive tables and always
> read newly appeared data.
> So right now, Flink relies on the "global flag" to distinguish whether the
> table should be treated as static
> or dynamically changing.
>
> The "global flag" is whether you are using `BatchTableEnvironment` or
> `StreamTableEnvironment` (old versions)
> and EnvironmentSettings's batchMode or streamingMode (newer versions).
>
> But we should admit that Flink hasn't finish the unification work. Your
> case will also be considered in the
> future when we want to further unify and simplify these concepts and
> usages.
>
> Best,
> Kurt
>
>
> On Mon, Apr 20, 2020 at 10:09 PM Gyula Fóra  wrote:
>
>> The HiveTableSource (and many others) return isBounded() -> true.
>> In this case it is not even possible for it to change over time, so I am
>> a bit confused.
>>
>> To me it sounds like you should always be able to join a stream against a
>> bounded table, temporal or not it is pretty well defined.
>> Maybe there is some fundamental concept that I dont understand, I don't
>> have much experience with this to be fair.
>>
>> Gyula
>>
>> On Mon, Apr 20, 2020 at 4:03 PM Kurt Young  wrote:
>>
>>> The reason here is Flink doesn't know the hive table is static. After
>>> you create these two tables and
>>> trying to join them, Flink will assume both table will be changing with
>>> time.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra  wrote:
>>>
 Hi!

 The problem here is that I dont have a temporal table.

 I have a regular stream from kafka (with even time attribute) and a
 static table in hive.
 The Hive table is static, it doesn't change. It doesn't have any time
 attribute, it's not temporal.

 Gyula

 On Mon, Apr 20, 2020 at 3:43 PM godfrey he  wrote:

> Hi Gyual,
>
> Can you convert the regular join to lookup join (temporal join) [1],
> and then you can use window aggregate.
>
> >  I understand that the problem is that we cannot join with the Hive
> table and still maintain the watermark/even time column. But why is this?
> Regular join can't maintain the time attribute as increasing trend
> (one record may be joined with a very old record),
> that means the watermark does not also been guaranteed to increase.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
>
> Best,
> Godfrey
>
> Gyula Fóra  于2020年4月20日周一 下午4:46写道:
>
>> Hi All!
>>
>> We hit a the following problem with SQL and trying to understand if
>> there is a valid workaround.
>>
>> We have 2 tables:
>>
>> *Kafka*
>> timestamp (ROWTIME)
>> item
>> quantity
>>
>> *Hive*
>> item
>> price
>>
>> So we basically have incoming (ts, id, quantity) and we want to join
>> it with the hive table to get the total price (price * quantity) got the
>> current item.
>>
>> After this we want to create window aggregate on quantity*price
>> windowed on timestamp (event time attribute).
>>
>> In any way we formulate this query we hit the following error:
>> org.apache.flink.table.api.TableException: Rowtime attributes must
>> not be in the input rows of a regular join. As a workaround you can cast
>> the time attributes of input tables to TIMESTAMP before.
>>
>>  I understand that the problem is that we cannot join with the Hive
>> table and still maintain the watermark/even time column. But why is this?
>>
>> In datastream world I would just simply assign Max watermark to my
>> enrichment input and join outputs will get the ts of the input record. 
>> Can
>> I achieve something similar in SQL/Table api?
>>
>> Thank you!
>> Gyula
>>
>>


Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Kurt Young
According to the current implementation, yes you are right hive table
source will always be bounded.
But conceptually, we can't do this assumption. For example, we
might further improve hive table source
to also support unbounded cases, .e.g. monitoring hive tables and always
read newly appeared data.
So right now, Flink relies on the "global flag" to distinguish whether the
table should be treated as static
or dynamically changing.

The "global flag" is whether you are using `BatchTableEnvironment` or
`StreamTableEnvironment` (old versions)
and EnvironmentSettings's batchMode or streamingMode (newer versions).

But we should admit that Flink hasn't finish the unification work. Your
case will also be considered in the
future when we want to further unify and simplify these concepts and
usages.

Best,
Kurt


On Mon, Apr 20, 2020 at 10:09 PM Gyula Fóra  wrote:

> The HiveTableSource (and many others) return isBounded() -> true.
> In this case it is not even possible for it to change over time, so I am a
> bit confused.
>
> To me it sounds like you should always be able to join a stream against a
> bounded table, temporal or not it is pretty well defined.
> Maybe there is some fundamental concept that I dont understand, I don't
> have much experience with this to be fair.
>
> Gyula
>
> On Mon, Apr 20, 2020 at 4:03 PM Kurt Young  wrote:
>
>> The reason here is Flink doesn't know the hive table is static. After you
>> create these two tables and
>> trying to join them, Flink will assume both table will be changing with
>> time.
>>
>> Best,
>> Kurt
>>
>>
>> On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra  wrote:
>>
>>> Hi!
>>>
>>> The problem here is that I dont have a temporal table.
>>>
>>> I have a regular stream from kafka (with even time attribute) and a
>>> static table in hive.
>>> The Hive table is static, it doesn't change. It doesn't have any time
>>> attribute, it's not temporal.
>>>
>>> Gyula
>>>
>>> On Mon, Apr 20, 2020 at 3:43 PM godfrey he  wrote:
>>>
 Hi Gyual,

 Can you convert the regular join to lookup join (temporal join) [1],
 and then you can use window aggregate.

 >  I understand that the problem is that we cannot join with the Hive
 table and still maintain the watermark/even time column. But why is this?
 Regular join can't maintain the time attribute as increasing trend (one
 record may be joined with a very old record),
 that means the watermark does not also been guaranteed to increase.


 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table

 Best,
 Godfrey

 Gyula Fóra  于2020年4月20日周一 下午4:46写道:

> Hi All!
>
> We hit a the following problem with SQL and trying to understand if
> there is a valid workaround.
>
> We have 2 tables:
>
> *Kafka*
> timestamp (ROWTIME)
> item
> quantity
>
> *Hive*
> item
> price
>
> So we basically have incoming (ts, id, quantity) and we want to join
> it with the hive table to get the total price (price * quantity) got the
> current item.
>
> After this we want to create window aggregate on quantity*price
> windowed on timestamp (event time attribute).
>
> In any way we formulate this query we hit the following error:
> org.apache.flink.table.api.TableException: Rowtime attributes must not
> be in the input rows of a regular join. As a workaround you can cast the
> time attributes of input tables to TIMESTAMP before.
>
>  I understand that the problem is that we cannot join with the Hive
> table and still maintain the watermark/even time column. But why is this?
>
> In datastream world I would just simply assign Max watermark to my
> enrichment input and join outputs will get the ts of the input record. Can
> I achieve something similar in SQL/Table api?
>
> Thank you!
> Gyula
>
>


Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
The HiveTableSource (and many others) return isBounded() -> true.
In this case it is not even possible for it to change over time, so I am a
bit confused.

To me it sounds like you should always be able to join a stream against a
bounded table, temporal or not it is pretty well defined.
Maybe there is some fundamental concept that I dont understand, I don't
have much experience with this to be fair.

Gyula

On Mon, Apr 20, 2020 at 4:03 PM Kurt Young  wrote:

> The reason here is Flink doesn't know the hive table is static. After you
> create these two tables and
> trying to join them, Flink will assume both table will be changing with
> time.
>
> Best,
> Kurt
>
>
> On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra  wrote:
>
>> Hi!
>>
>> The problem here is that I dont have a temporal table.
>>
>> I have a regular stream from kafka (with even time attribute) and a
>> static table in hive.
>> The Hive table is static, it doesn't change. It doesn't have any time
>> attribute, it's not temporal.
>>
>> Gyula
>>
>> On Mon, Apr 20, 2020 at 3:43 PM godfrey he  wrote:
>>
>>> Hi Gyual,
>>>
>>> Can you convert the regular join to lookup join (temporal join) [1],
>>> and then you can use window aggregate.
>>>
>>> >  I understand that the problem is that we cannot join with the Hive
>>> table and still maintain the watermark/even time column. But why is this?
>>> Regular join can't maintain the time attribute as increasing trend (one
>>> record may be joined with a very old record),
>>> that means the watermark does not also been guaranteed to increase.
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
>>>
>>> Best,
>>> Godfrey
>>>
>>> Gyula Fóra  于2020年4月20日周一 下午4:46写道:
>>>
 Hi All!

 We hit a the following problem with SQL and trying to understand if
 there is a valid workaround.

 We have 2 tables:

 *Kafka*
 timestamp (ROWTIME)
 item
 quantity

 *Hive*
 item
 price

 So we basically have incoming (ts, id, quantity) and we want to join it
 with the hive table to get the total price (price * quantity) got the
 current item.

 After this we want to create window aggregate on quantity*price
 windowed on timestamp (event time attribute).

 In any way we formulate this query we hit the following error:
 org.apache.flink.table.api.TableException: Rowtime attributes must not
 be in the input rows of a regular join. As a workaround you can cast the
 time attributes of input tables to TIMESTAMP before.

  I understand that the problem is that we cannot join with the Hive
 table and still maintain the watermark/even time column. But why is this?

 In datastream world I would just simply assign Max watermark to my
 enrichment input and join outputs will get the ts of the input record. Can
 I achieve something similar in SQL/Table api?

 Thank you!
 Gyula




Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Kurt Young
The reason here is Flink doesn't know the hive table is static. After you
create these two tables and
trying to join them, Flink will assume both table will be changing with
time.

Best,
Kurt


On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra  wrote:

> Hi!
>
> The problem here is that I dont have a temporal table.
>
> I have a regular stream from kafka (with even time attribute) and a static
> table in hive.
> The Hive table is static, it doesn't change. It doesn't have any time
> attribute, it's not temporal.
>
> Gyula
>
> On Mon, Apr 20, 2020 at 3:43 PM godfrey he  wrote:
>
>> Hi Gyual,
>>
>> Can you convert the regular join to lookup join (temporal join) [1],
>> and then you can use window aggregate.
>>
>> >  I understand that the problem is that we cannot join with the Hive
>> table and still maintain the watermark/even time column. But why is this?
>> Regular join can't maintain the time attribute as increasing trend (one
>> record may be joined with a very old record),
>> that means the watermark does not also been guaranteed to increase.
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
>>
>> Best,
>> Godfrey
>>
>> Gyula Fóra  于2020年4月20日周一 下午4:46写道:
>>
>>> Hi All!
>>>
>>> We hit a the following problem with SQL and trying to understand if
>>> there is a valid workaround.
>>>
>>> We have 2 tables:
>>>
>>> *Kafka*
>>> timestamp (ROWTIME)
>>> item
>>> quantity
>>>
>>> *Hive*
>>> item
>>> price
>>>
>>> So we basically have incoming (ts, id, quantity) and we want to join it
>>> with the hive table to get the total price (price * quantity) got the
>>> current item.
>>>
>>> After this we want to create window aggregate on quantity*price windowed
>>> on timestamp (event time attribute).
>>>
>>> In any way we formulate this query we hit the following error:
>>> org.apache.flink.table.api.TableException: Rowtime attributes must not
>>> be in the input rows of a regular join. As a workaround you can cast the
>>> time attributes of input tables to TIMESTAMP before.
>>>
>>>  I understand that the problem is that we cannot join with the Hive
>>> table and still maintain the watermark/even time column. But why is this?
>>>
>>> In datastream world I would just simply assign Max watermark to my
>>> enrichment input and join outputs will get the ts of the input record. Can
>>> I achieve something similar in SQL/Table api?
>>>
>>> Thank you!
>>> Gyula
>>>
>>>


Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
Hi!

The problem here is that I dont have a temporal table.

I have a regular stream from kafka (with even time attribute) and a static
table in hive.
The Hive table is static, it doesn't change. It doesn't have any time
attribute, it's not temporal.

Gyula

On Mon, Apr 20, 2020 at 3:43 PM godfrey he  wrote:

> Hi Gyual,
>
> Can you convert the regular join to lookup join (temporal join) [1],
> and then you can use window aggregate.
>
> >  I understand that the problem is that we cannot join with the Hive
> table and still maintain the watermark/even time column. But why is this?
> Regular join can't maintain the time attribute as increasing trend (one
> record may be joined with a very old record),
> that means the watermark does not also been guaranteed to increase.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
>
> Best,
> Godfrey
>
> Gyula Fóra  于2020年4月20日周一 下午4:46写道:
>
>> Hi All!
>>
>> We hit a the following problem with SQL and trying to understand if there
>> is a valid workaround.
>>
>> We have 2 tables:
>>
>> *Kafka*
>> timestamp (ROWTIME)
>> item
>> quantity
>>
>> *Hive*
>> item
>> price
>>
>> So we basically have incoming (ts, id, quantity) and we want to join it
>> with the hive table to get the total price (price * quantity) got the
>> current item.
>>
>> After this we want to create window aggregate on quantity*price windowed
>> on timestamp (event time attribute).
>>
>> In any way we formulate this query we hit the following error:
>> org.apache.flink.table.api.TableException: Rowtime attributes must not be
>> in the input rows of a regular join. As a workaround you can cast the time
>> attributes of input tables to TIMESTAMP before.
>>
>>  I understand that the problem is that we cannot join with the Hive table
>> and still maintain the watermark/even time column. But why is this?
>>
>> In datastream world I would just simply assign Max watermark to my
>> enrichment input and join outputs will get the ts of the input record. Can
>> I achieve something similar in SQL/Table api?
>>
>> Thank you!
>> Gyula
>>
>>


Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread godfrey he
Hi Gyual,

Can you convert the regular join to lookup join (temporal join) [1],
and then you can use window aggregate.

>  I understand that the problem is that we cannot join with the Hive table
and still maintain the watermark/even time column. But why is this?
Regular join can't maintain the time attribute as increasing trend (one
record may be joined with a very old record),
that means the watermark does not also been guaranteed to increase.

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table

Best,
Godfrey

Gyula Fóra  于2020年4月20日周一 下午4:46写道:

> Hi All!
>
> We hit a the following problem with SQL and trying to understand if there
> is a valid workaround.
>
> We have 2 tables:
>
> *Kafka*
> timestamp (ROWTIME)
> item
> quantity
>
> *Hive*
> item
> price
>
> So we basically have incoming (ts, id, quantity) and we want to join it
> with the hive table to get the total price (price * quantity) got the
> current item.
>
> After this we want to create window aggregate on quantity*price windowed
> on timestamp (event time attribute).
>
> In any way we formulate this query we hit the following error:
> org.apache.flink.table.api.TableException: Rowtime attributes must not be
> in the input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before.
>
>  I understand that the problem is that we cannot join with the Hive table
> and still maintain the watermark/even time column. But why is this?
>
> In datastream world I would just simply assign Max watermark to my
> enrichment input and join outputs will get the ts of the input record. Can
> I achieve something similar in SQL/Table api?
>
> Thank you!
> Gyula
>
>


StreamingFileSink to a S3 Bucket on a remote account using STS

2020-04-20 Thread orionemail
Hi,

New to both AWS and Flink but currently have a need to write incoming data into 
a S3 bucket managed via AWS Tempory credentials.

I am unable to get this to work, but I am not entirely sure on the steps 
needed.  I can write to S3 buckets that are not 'remote' and managed by STS 
tempory credentials fine.

I am using flink 1.9.1, as this will when deployed live in EMR.

My flink-conf.yml contains the following entries:

fs.s3a.bucket.sky-rdk-telemetry.aws.credentials.provider: > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider
fs.s3a.bucket.sky-rdk-telemetry.assumed.role.credentials.provider: 
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
fs.s3a.bucket.sky-rdk-telemetry.access-key: x
fs.s3a.bucket.sky-rdk-telemetry.secret-key: 
fs.s3a.bucket.sky-rdk-telemetry.assumed.role.arn: 
fs.s3a.bucket.sky-rdk-telemetry.assumed.role.session.name: 

And my POM contains




com.amazonaws
aws-java-sdk-bom
1.11.700
pom
import





com.amazonaws
aws-java-sdk-sts
1.11.700


I have put the jar flink-s3-fs-hadoop-1.9.1.jar into the plugins directory.

Running my test Jar I am getting exceptions related to Class not found for 
org/apache/flink/fs/s3base/shaded/com/amazonaws/services/securitytoken/model/AWSSecurityTokenServiceException

and poking around I see this is shaded into a package in Kinesis.  I have added 
some rules to maven shade to rewrite the package as needed but this still 
doesn't help.

Am I heading in the correct direction?  Searching has not turned up much 
information that I have been able to make use of.

Thanks for your time,

J

Sent with [ProtonMail](https://protonmail.com) Secure Email.

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread Kurt Young
Can you reproduce this in a local program with mini-cluster?

Best,
Kurt


On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman  wrote:

> You can read this for this type error.
>
>
> https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446
>
> I would suggest you set break points  in your code. Step through the code,
> this  method should show you which array variable is being passed a null
> argument when the array variable is not null able.
>
>
>
>
> On Mon, 20 Apr 2020, 10:07 刘建刚,  wrote:
>
>>I am using Roaring64NavigableMap to compute uv. It is ok to us
>> flink planner and not ok with blink planner. The SQL is as following:
>>
>> SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, 
>> A, B, C, D,
>> E, uv(bitmap(id)) as bmp
>> FROM person
>> GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E
>>
>>
>>   The udf is as following:
>>
>> public static class Bitmap extends AggregateFunction> Roaring64NavigableMap> {
>>@Override
>>public Roaring64NavigableMap createAccumulator() {
>>   return new Roaring64NavigableMap();
>>}
>>
>>@Override
>>public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
>>   return accumulator;
>>}
>>
>>public void accumulate(Roaring64NavigableMap bitmap, long id) {
>>   bitmap.add(id);
>>}
>> }
>>
>> public static class UV extends ScalarFunction {
>>public long eval(Roaring64NavigableMap bitmap) {
>>   return bitmap.getLongCardinality();
>>}
>> }
>>
>>   The error is as following:
>>
>> 2020-04-20 16:37:13,868 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>  [flink-akka.actor.default-dispatcher-40]  -
>> GroupWindowAggregate(groupBy=[brand, platform, channel, versionName,
>> appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 6)],
>> properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand,
>> platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5,
>> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
>> proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS
>> curTimestamp, brand, platform, channel, versionName, appMajorVersion,
>> uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink:
>> Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING
>> to FAILED.
>> java.lang.ArrayIndexOutOfBoundsException: -1
>>   at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>   at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
>>   at
>> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
>>   at
>> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
>>   at
>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
>>   at
>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
>>   at
>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
>>   at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
>>   at
>> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
>>   at
>> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
>>   at
>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>>   at
>> org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
>>   at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
>>   at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
>>   at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>>   at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>>   at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>>   at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>   at 

Re: Flink incremental checkpointing - how long does data is kept in the share folder

2020-04-20 Thread Yun Tang
Hi Shachar

You can refer to [1] to know the directory structure. The files (usually 
ByteStreamStateHandle) which are not in the shared folder are exclusive state 
like operator state or exclusive files uploaded during each incremental 
checkpoint. And actually I don't understand why you would say some files are 
not mentioned in the metadata file but are related to the checkpoint? How to 
judge that they are related to specific checkpoint?

BTW, my name is "Yun" which means cloud in Chinese, not the delicious "Yum" 

Best
Yun Tang

From: Shachar Carmeli 
Sent: Monday, April 20, 2020 15:36
To: user@flink.apache.org 
Subject: Re: Flink incremental checkpointing - how long does data is kept in 
the share folder

Hi Yum
I noticed that the some files are related to the checkpoint but are not 
mentioned in the metadata file
and some of the files that are related in the metadata file (usually 
ByteStreamStateHandle ) are not in the share file
can you explain this behaviour ?

see code I was using
final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, 
CheckpointTool.class.getClassLoader());

final Set pathSharedFromMetadata = 
savepoint.getOperatorStates().stream()
.flatMap(operatorState -> 
operatorState.getSubtaskStates().values().stream()

.flatMap(operatorSubtaskState -> 
operatorSubtaskState.getManagedKeyedState().stream()

.flatMap(keyedStateHandle -> ((IncrementalKeyedStateHandle) 
keyedStateHandle).getSharedState().values().stream()

.map(streamStateHandle -> {

totalSize[0] += streamStateHandle.getStateSize();

String name = null;

if (streamStateHandle instanceof FileStateHandle) {

name = ((FileStateHandle) 
streamStateHandle).getFilePath().getName();

} else {

final String handleName = ((ByteStreamStateHandle) 
streamStateHandle).getHandleName();

name = new File(handleName).getName();

}

return name;


}
.collect(Collectors.toSet());

Thanks in advance
Shachar

On 2020/04/13 14:30:40, Yun Tang  wrote:
> Hi Shachar
>
> I think you could refer to [1] to know the directory structure of 
> checkpoints. The '_metadata' file contains all information of which  
> checkpointed data file belongs, e.g. file paths under 'shared' folder. As I 
> said before, you need to call Checkpoints#loadCheckpointMetadata to load 
> '_metadata' to know which files belonging to that checkpoint.
>
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
>
> Best
> Yun Tang
>
> 
> From: Shachar Carmeli 
> Sent: Sunday, April 12, 2020 15:32
> To: user@flink.apache.org 
> Subject: Re: Flink incremental checkpointing - how long does data is kept in 
> the share folder
>
> Thank you for the quick response
> Your answer related to the checkpoint folder that contains the _metadata file 
> e.g. chk-1829
> What about the "shared" folder , how do I know which  files in that folder 
> are still relevant and which are left over from a failed checkpoint , they 
> are not directly related to the _metadata checkpoint or am I missing 
> something?
>
>
> On 2020/04/07 18:37:57, Yun Tang  wrote:
> > Hi Shachar
> >
> > Why do we see data that is older from lateness configuration
> > There might existed three reasons:
> >
> >   1.  RocksDB really still need that file in current checkpoint. If we 
> > upload one file named as 42.sst at 2/4 at some old checkpoint, current 
> > checkpoint could still include that 42.sst file again if that file is never 
> > be compacted since then. This is possible in theory.
> >   2.  Your checkpoint size is large and checkpoint coordinator could not 
> > remove as fast as possible before exit.
> >   3.  

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-20 Thread Till Rohrmann
Thanks for the clarification Yang. Now it makes sense to me.

If it makes things easier, then I would still go first with the dead simple
solution to turn automatic upload of local dist off via a configuration
option before trying to implement a smart solution which relies on pattern
matching or something else. For example, users might specify a remote
location which is not accessible from the client. Then one could not figure
out which files are already uploaded. The smart solution could be a follow
up step then.

Cheers,
Till

On Mon, Apr 20, 2020 at 1:09 PM Yang Wang  wrote:

> Hi till,
>
> Sorry for that i do not giving a detailed explanation of the optimization.
> Actually, the optimization contains
> the following two parts.
> * Use remote uploaded jars to avoid unnecessary uploading(e.g.
> flink-dist-*.jar, user jars, dependencies).
> this could be done via enriching "-yt/--yarnship" to support remote ship
> files.
> * Use the "PUBLIC" or "PRIVATE" visibility of YARN local resource to avoid
> unnecessary downloading. When
> a local resource is public, once it is download by YARN NodeManager, it
> could be reused by all the application
> in the same NodeManager.
>
> >> Why do we need to specify the visibility of the remote files? Won't the
>> visibility be specified when uploading these files?
>
> It is mostly for the users who want to eliminate the unnecessary
> downloading so that the container could be
> launched faster. "PRIVATE" means the remote jars could be shared by the
> applications submitted by the current user.
> "PUBLIC" means the remote jars could be shared by all the Flink
> applications. And "APPLICATION" means they
> could only be shared by the containers of the current application in a
> same NodeManager.
>
>
> For the implementation, i think we could do it step by step.
> * Enrich "-yt/--yarnship" to support HDFS directory
> * Add a new config option to control whether to avoid the unnecessary
> uploading
> * Enrich "-yt/--yarnship" to specify local resource visibility
>
>
> Best,
> Yang
>
>
>
> Till Rohrmann  于2020年4月20日周一 下午5:26写道:
>
>> Shall we say for the first version we only can deactivate the upload of
>> local files instead of doing some optimizations? I guess my problem is that
>> I don't fully understand the optimizations yet. Maybe we introduce a power
>> user config option `yarn.submission.automatic-flink-dist-upload` or so.
>>
>> Why do we need to specify the visibility of the remote files? Won't the
>> visibility be specified when uploading these files?
>>
>> Apart from that, the proposal looks good to me.
>>
>> Cheers,
>> Till
>>
>> On Mon, Apr 20, 2020 at 5:38 AM Yang Wang  wrote:
>>
>>> Hi tison,
>>>
>>> I think i get your concerns and points.
>>>
>>> Take both FLINK-13938[1] and FLINK-14964[2] into account, i will do in
>>> the following steps.
>>> * Enrich "-yt/--yarnship" to support HDFS directory
>>> * Enrich "-yt/--yarnship" to specify local resource visibility. It is
>>> "APPLICATION" by default. It could be also configured to "PUBLIC",
>>> which means shared by all applications, or "PRIVATE" which means shared
>>> by a same user.
>>> * Add a new config option to control whether to optimize the
>>> submission(default is false). When configured to true, Flink client will
>>> try to filter the jars and files by name and size to avoid unnecessary
>>> uploading.
>>>
>>> A very rough submission command could be issued as following.
>>> *./bin/flink run -m yarn-cluster -d -yt
>>> hdfs://myhdfs/flink/release/flink-1.11:PUBLIC,hdfs://myhdfs/user/someone/mylib
>>> \*
>>> *-yD yarn.submission-optimization.enable=true
>>> examples/streaming/WindowJoin.jar*
>>>
>>> cc @Rong Rong , since you also help to review the
>>> old PR of FLINK-13938, maybe you could also share some thoughts.
>>>
>>>
>>> [1]. https://issues.apache.org/jira/browse/FLINK-13938
>>> [2]. https://issues.apache.org/jira/browse/FLINK-14964
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>>
>>> tison  于2020年4月18日周六 下午12:12写道:
>>>
 Hi Yang,

 Name filtering & schema special handling makes sense for me. We can
 enrich later if there is requirement without breaking interface.

 For #1, from my perspective your first proposal is

   having an option specifies remote flink/lib, then we turn off auto
 uploading local flink/lib and register that path as local resources

 It seems we here add another special logic for handling one kind of
 things...what I propose is we do these two steps explicitly separated:

 1. an option turns off auto uploading local flink/lib
 2. a general option register remote files as local resources

 The rest thing here is that you propose we handle flink/lib as PUBLIC
 visibility while other files as APPLICATION visibility, whether a
 composite configuration or name filtering to special handle libs makes
 sense though.

 YarnClusterDescriptor already has a lot of special handling logics
 which introduce a 

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread Zahid Rahman
You can read this for this type error.

https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446

I would suggest you set break points  in your code. Step through the code,
this  method should show you which array variable is being passed a null
argument when the array variable is not null able.




On Mon, 20 Apr 2020, 10:07 刘建刚,  wrote:

>I am using Roaring64NavigableMap to compute uv. It is ok to us
> flink planner and not ok with blink planner. The SQL is as following:
>
> SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, 
> A, B, C, D,
> E, uv(bitmap(id)) as bmp
> FROM person
> GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E
>
>
>   The udf is as following:
>
> public static class Bitmap extends AggregateFunction Roaring64NavigableMap> {
>@Override
>public Roaring64NavigableMap createAccumulator() {
>   return new Roaring64NavigableMap();
>}
>
>@Override
>public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
>   return accumulator;
>}
>
>public void accumulate(Roaring64NavigableMap bitmap, long id) {
>   bitmap.add(id);
>}
> }
>
> public static class UV extends ScalarFunction {
>public long eval(Roaring64NavigableMap bitmap) {
>   return bitmap.getLongCardinality();
>}
> }
>
>   The error is as following:
>
> 2020-04-20 16:37:13,868 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph
>  [flink-akka.actor.default-dispatcher-40]  -
> GroupWindowAggregate(groupBy=[brand, platform, channel, versionName,
> appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 6)],
> properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand,
> platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5,
> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
> proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS
> curTimestamp, brand, platform, channel, versionName, appMajorVersion,
> uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink:
> Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING
> to FAILED.
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>   at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
>   at
> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
>   at
> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
>   at
> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
>   at
> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
>   at
> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
>   at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
>   at
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
>   at
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
>   at
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>   at
> org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
>   at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
>   at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
>   at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>   at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>   at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>   at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>   at java.lang.Thread.run(Thread.java:745)
>
>   Do I need register Roaring64NavigableMap somewhere? Anyone can help
> me? Thank you.
>
>


Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi Sivaprasanna,

That is a compile-time error, not a runtime error.

/value build is not a member of ?0
possible cause: maybe a semicolon is missing before `value build'?/. 

There won't be any issue with either *withRollingPolicy*() or
/withBucketAssigner/(), but not both.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Is there a good benchmark for Flink Stream API?

2020-04-20 Thread Felipe Gutierrez
Hi community,

Is there a benchmark for Flink Stream API to test varying workload
using a real data set?
I was reading [2] but it does not say if I can vary the workload
on-the-fly neither if I can use a real data set like the Taxi Ride
from NYC [1].

[1] https://training.ververica.com/setup/taxiData.html
[2] https://github.com/dataArtisans/flink-benchmarks

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com


Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Sivaprasanna
Hi Averell,

Can you please the complete stacktrace of the error?

On Mon, Apr 20, 2020 at 4:48 PM Averell  wrote:

> Hi,
>
> I have the following code:
>  /   StreamingFileSink
>   .forRowFormat(new Path(path), myEncoder)
>   .withRollingPolicy(DefaultRollingPolicy.create().build())
>   .withBucketAssigner(myBucketAssigner)
>   .build()/
> This is working fine in Flink 1.8.3. However, when I try to compile with
> Flink 1.10.0, I got the following error:
> / value build is not a member of ?0
> possible cause: maybe a semicolon is missing before `value build'?/
>
> As per the hint from IntelliJ,
> /.forRowFormat returns a RowFormatBuilder[_ <: RowFormatBuilder[_]]
> .withRollingPolicy(...) returns a RowFormatBuilder[_]
> .withBucketAssigner(...) returns Any/
>
> I'm using Maven 3.6.0, Java 1.8.0_242, and Scala 2.11.12. Tried
> with/without
> IntelliJ, no difference.
>
> Not sure/understand what's wrong
>
> Thanks!
> Averell
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi,

I have the following code:
 /   StreamingFileSink
  .forRowFormat(new Path(path), myEncoder)
  .withRollingPolicy(DefaultRollingPolicy.create().build())
  .withBucketAssigner(myBucketAssigner)
  .build()/
This is working fine in Flink 1.8.3. However, when I try to compile with
Flink 1.10.0, I got the following error:
/ value build is not a member of ?0
possible cause: maybe a semicolon is missing before `value build'?/

As per the hint from IntelliJ,
/.forRowFormat returns a RowFormatBuilder[_ <: RowFormatBuilder[_]]
.withRollingPolicy(...) returns a RowFormatBuilder[_]
.withBucketAssigner(...) returns Any/

I'm using Maven 3.6.0, Java 1.8.0_242, and Scala 2.11.12. Tried with/without
IntelliJ, no difference.

Not sure/understand what's wrong

Thanks!
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-20 Thread Yang Wang
Hi till,

Sorry for that i do not giving a detailed explanation of the optimization.
Actually, the optimization contains
the following two parts.
* Use remote uploaded jars to avoid unnecessary uploading(e.g.
flink-dist-*.jar, user jars, dependencies).
this could be done via enriching "-yt/--yarnship" to support remote ship
files.
* Use the "PUBLIC" or "PRIVATE" visibility of YARN local resource to avoid
unnecessary downloading. When
a local resource is public, once it is download by YARN NodeManager, it
could be reused by all the application
in the same NodeManager.

>> Why do we need to specify the visibility of the remote files? Won't the
> visibility be specified when uploading these files?

It is mostly for the users who want to eliminate the unnecessary
downloading so that the container could be
launched faster. "PRIVATE" means the remote jars could be shared by the
applications submitted by the current user.
"PUBLIC" means the remote jars could be shared by all the Flink
applications. And "APPLICATION" means they
could only be shared by the containers of the current application in a same
NodeManager.


For the implementation, i think we could do it step by step.
* Enrich "-yt/--yarnship" to support HDFS directory
* Add a new config option to control whether to avoid the unnecessary
uploading
* Enrich "-yt/--yarnship" to specify local resource visibility


Best,
Yang



Till Rohrmann  于2020年4月20日周一 下午5:26写道:

> Shall we say for the first version we only can deactivate the upload of
> local files instead of doing some optimizations? I guess my problem is that
> I don't fully understand the optimizations yet. Maybe we introduce a power
> user config option `yarn.submission.automatic-flink-dist-upload` or so.
>
> Why do we need to specify the visibility of the remote files? Won't the
> visibility be specified when uploading these files?
>
> Apart from that, the proposal looks good to me.
>
> Cheers,
> Till
>
> On Mon, Apr 20, 2020 at 5:38 AM Yang Wang  wrote:
>
>> Hi tison,
>>
>> I think i get your concerns and points.
>>
>> Take both FLINK-13938[1] and FLINK-14964[2] into account, i will do in
>> the following steps.
>> * Enrich "-yt/--yarnship" to support HDFS directory
>> * Enrich "-yt/--yarnship" to specify local resource visibility. It is
>> "APPLICATION" by default. It could be also configured to "PUBLIC",
>> which means shared by all applications, or "PRIVATE" which means shared
>> by a same user.
>> * Add a new config option to control whether to optimize the
>> submission(default is false). When configured to true, Flink client will
>> try to filter the jars and files by name and size to avoid unnecessary
>> uploading.
>>
>> A very rough submission command could be issued as following.
>> *./bin/flink run -m yarn-cluster -d -yt
>> hdfs://myhdfs/flink/release/flink-1.11:PUBLIC,hdfs://myhdfs/user/someone/mylib
>> \*
>> *-yD yarn.submission-optimization.enable=true
>> examples/streaming/WindowJoin.jar*
>>
>> cc @Rong Rong , since you also help to review the
>> old PR of FLINK-13938, maybe you could also share some thoughts.
>>
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-13938
>> [2]. https://issues.apache.org/jira/browse/FLINK-14964
>>
>>
>> Best,
>> Yang
>>
>>
>>
>> tison  于2020年4月18日周六 下午12:12写道:
>>
>>> Hi Yang,
>>>
>>> Name filtering & schema special handling makes sense for me. We can
>>> enrich later if there is requirement without breaking interface.
>>>
>>> For #1, from my perspective your first proposal is
>>>
>>>   having an option specifies remote flink/lib, then we turn off auto
>>> uploading local flink/lib and register that path as local resources
>>>
>>> It seems we here add another special logic for handling one kind of
>>> things...what I propose is we do these two steps explicitly separated:
>>>
>>> 1. an option turns off auto uploading local flink/lib
>>> 2. a general option register remote files as local resources
>>>
>>> The rest thing here is that you propose we handle flink/lib as PUBLIC
>>> visibility while other files as APPLICATION visibility, whether a
>>> composite configuration or name filtering to special handle libs makes
>>> sense though.
>>>
>>> YarnClusterDescriptor already has a lot of special handling logics which
>>> introduce a number of config options and keys, which should
>>> have been configured in few of common options and validated at the
>>> runtime.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Yang Wang  于2020年4月17日周五 下午11:42写道:
>>>
 Hi tison,

 For #3, if you mean registering remote HDFS file as local resource, we
 should make the "-yt/--yarnship"
 to support remote directory. I think it is the right direction.

 For #1, if the users could ship remote directory, then they could also
 specify like this
 "-yt hdfs://hdpdev/flink/release/flink-1.x,
 hdfs://hdpdev/user/someone/mylib". Do you mean we add an
 option for whether trying to avoid unnecessary uploading? Maybe we
 could filter by names and file 

Re: FLINK JOB

2020-04-20 Thread Som Lima
Yes exactly that is the change I am having to make.  Changing FLINK JOB
default localhost to ip of server computer in the browser.

I followed the instructions as per your
link.
https://medium.com/@zjffdu/flink-on-zeppelin-part-1-get-started-2591aaa6aa47

i.e. 0.0.0.0  of zeppelin.server.addr. for remote access.



On Mon, 20 Apr 2020, 10:30 Jeff Zhang,  wrote:

> I see, so you are running flink interpreter in local mode. But you access
> zeppelin from a remote machine, right ?  Do you mean you can access it
> after changing localhost to ip ? If so, then I can add one configuration in
> zeppelin side to replace the localhost to real ip.
>
> Som Lima  于2020年4月20日周一 下午4:44写道:
>
>> I am only running the zeppelin  word count example by clicking the
>> zeppelin run arrow.
>>
>>
>> On Mon, 20 Apr 2020, 09:42 Jeff Zhang,  wrote:
>>
>>> How do you run flink job ? It should not always be localhost:8081
>>>
>>> Som Lima  于2020年4月20日周一 下午4:33写道:
>>>
 Hi,

 FLINK JOB  url  defaults to localhost

 i.e. localhost:8081.

 I have to manually change it to server :8081 to get Apache  flink
 Web Dashboard to display.





>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


flink-1.10 checkpoint 偶尔报 NullPointerException

2020-04-20 Thread chenkaibit
大家遇到过这个错误吗, CheckpointOperation.executeCheckpointing 的时候报 NullPointerException 
java.lang.Exception: Couldnot perform checkpoint 5505for operator Source: 
KafkaTableSource(xxx) -> SourceConversion(table=[xxx, source: 
[KafkaTableSource(xxx)]], fields=[xxx]) -> Calc(select=[xxx) AS xxx]) -> 
SinkConversionToTuple2 -> Sink: Elasticsearch6UpsertTableSink(xxx) (1/1).

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$228/1024478318.call(UnknownSource)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)

at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:745)

Causedby: java.lang.NullPointerException

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$229/1010499540.run(UnknownSource)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)

... 12 more

Re: 问题请教-flinksql的kafkasource方面

2020-04-20 Thread Benchao Li
你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。

Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 下午5:43写道:

> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
>
>
>
>
>
> | |
> Sun.Zhu
> |
> |
> 邮箱:17626017...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年04月20日 10:37,Benchao Li 写道:
> 应该是不会的。分配不到partition的source会标记为idle状态。
>
> Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:
>
> > Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> >
> >
> >
> >
> > | |
> > Sun.Zhu
> > |
> > |
> > 邮箱:17626017...@163.com
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年04月19日 22:43,人生若只如初见 写道:
> > 嗯嗯,十分感谢
> >
> >
> >
> >
> > --原始邮件--
> >  发件人:"Benchao Li" > 发送时间:2020年4月19日(星期天) 晚上9:25
> > 收件人:"user-zh" >
> > 主题:Re: 问题请教-flinksql的kafkasource方面
> >
> >
> >
> > 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> >
> > Jark Wu  >
> >  Hi,
> > 
> >  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> >  根据你的 Java 代码,数据的 event time
> > 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> >  能容忍 5s 乱序).
> >  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> > partition
> >  进度快很多的现象,
> >  导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> > 
> >  完美的解决方案还需要等 FLIP-27 的完成。
> >  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> > 
> >  Best,
> >  Jark
> > 
> > 
> >  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  > 
> >   你好
> >  
> >  
> > 
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> >  
> >  
> >  
> >   附:
> >   userbehavior建表语句
> >   CREATE TABLE user_behavior (
> >   nbsp; nbsp; user_id BIGINT,
> >   nbsp; nbsp; item_id BIGINT,
> >   nbsp; nbsp; category_id BIGINT,
> >   nbsp; nbsp; behavior STRING,
> >   nbsp; nbsp; ts TIMESTAMP(3),
> >   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
> > 通过计算列产生一个处理时间列
> >   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> > SECOND nbsp;--
> >   在ts上定义watermark,ts成为事件时间列
> >   ) WITH (
> >   nbsp; nbsp; 'connector.type' = 'kafka', nbsp;--
> > 使用 kafka connector
> >   nbsp; nbsp; 'connector.version' = 'universal',
> > nbsp;-- kafka
> >   版本,universal 支持 0.11 以上的版本
> >   nbsp; nbsp; 'connector.topic' = 'user_behavior',
> > nbsp;-- kafka topic
> >   nbsp; nbsp; 'connector.startup-mode' =
> > 'earliest-offset', nbsp;-- 从起始
> >   offset 开始读取
> >   nbsp; nbsp; 'connector.properties.zookeeper.connect'
> =
> > '
> >   192.168.0.150:2181', nbsp;-- zookeeper 地址
> >   nbsp; nbsp; 'connector.properties.bootstrap.servers'
> =
> > '
> >   192.168.0.150:9092', nbsp;-- kafka broker 地址
> >   nbsp; nbsp; 'format.type' = 'json' nbsp;--
> 数据源格式为
> > json
> >   )
> >  
> >   每小时购买数建表语句
> >   CREATE TABLE buy_cnt_per_hour (nbsp;
> >   nbsp; nbsp; hour_of_day BIGINT,
> >   nbsp; nbsp; buy_cnt BIGINT
> >   ) WITH (
> >   nbsp; nbsp; 'connector.type' = 'elasticsearch', -- 使用
> > elasticsearch
> >   connector
> >   nbsp; nbsp; 'connector.version' = '6', nbsp;--
> > elasticsearch 版本,6 能支持
> >   es 6+ 以及 7+ 的版本
> >   nbsp; nbsp; 'connector.hosts' = '
> > http://192.168.0.150:9200', nbsp;--
> >   elasticsearch 地址
> >   nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour',
> > nbsp;--
> >   elasticsearch 索引名,相当于数据库的表名
> >   nbsp; nbsp; 'connector.document-type' =
> > 'user_behavior', --
> >   elasticsearch 的 type,相当于数据库的库名
> >   nbsp; nbsp; 'connector.bulk-flush.max-actions' = '1',
> > nbsp;-- 每条数据都刷新
> >   nbsp; nbsp; 'format.type' = 'json', nbsp;--
> > 输出数据格式 json
> >   nbsp; nbsp; 'update-mode' = 'append'
> >   )
> >  
> >   插入语句
> >   INSERT INTO buy_cnt_per_hournbsp;
> >   SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> > HOUR)),COUNT(*)nbsp;
> >   FROM user_behavior
> >   WHERE behavior = 'buy'
> >   GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
> >  
> >   kafka数据发送代码
> >  
> >   import com.alibaba.fastjson.JSONObject;
> >   import org.apache.kafka.clients.producer.KafkaProducer;
> >   import org.apache.kafka.clients.producer.ProducerRecord;
> >  
> >   import java.text.SimpleDateFormat;
> >   import java.util.*;
> >  
> >  
> >   public class UserBehaviorProducer {
> >   public static final String brokerList
> = "
> > 192.168.0.150:9092";
> >  
> >   // public static
> final
> > String topic="user_behavior";
> >   public static final String topic =
> > "user_behavior";
> >  
> >   public static void main(String args[])
> {
> >  
> >   //配置生产者客户端参数
> >   //将配置序列化
> >   Properties
> > properties = new Properties();
> >  
> > properties.put("key.serializer",
> >   "org.apache.kafka.common.serialization.StringSerializer");
> >  
> > properties.put("value.serializer",
> >   "org.apache.kafka.common.serialization.StringSerializer");
> >  
> > properties.put("bootstrap.servers", brokerList);
> >  
> > //创建KafkaProducer 实例
> >  
> > KafkaProducer >   KafkaProducer >   //构建待发送的消息
> >   //{"user_id":
> > "952483", "item_id":"310884", "category_id":
> >   "4580532", "behavior": 

回复:问题请教-flinksql的kafkasource方面

2020-04-20 Thread Sun.Zhu
我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
 ,源码CheckpointCoordinator#triggerCheckpoint也有说明





| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年04月20日 10:37,Benchao Li 写道:
应该是不会的。分配不到partition的source会标记为idle状态。

Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:

> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
>
>
>
>
> | |
> Sun.Zhu
> |
> |
> 邮箱:17626017...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年04月19日 22:43,人生若只如初见 写道:
> 嗯嗯,十分感谢
>
>
>
>
> --原始邮件--
>  发件人:"Benchao Li" 发送时间:2020年4月19日(星期天) 晚上9:25
> 收件人:"user-zh"
> 主题:Re: 问题请教-flinksql的kafkasource方面
>
>
>
> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
>
> Jark Wu 
>  Hi,
> 
>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
>  根据你的 Java 代码,数据的 event time
> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
>  能容忍 5s 乱序).
>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> partition
>  进度快很多的现象,
>  导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> 
>  完美的解决方案还需要等 FLIP-27 的完成。
>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> 
>  Best,
>  Jark
> 
> 
>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  
>   你好
>  
>  
> 
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
>  
>  
>  
>   附:
>   userbehavior建表语句
>   CREATE TABLE user_behavior (
>   nbsp; nbsp; user_id BIGINT,
>   nbsp; nbsp; item_id BIGINT,
>   nbsp; nbsp; category_id BIGINT,
>   nbsp; nbsp; behavior STRING,
>   nbsp; nbsp; ts TIMESTAMP(3),
>   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
> 通过计算列产生一个处理时间列
>   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> SECOND nbsp;--
>   在ts上定义watermark,ts成为事件时间列
>   ) WITH (
>   nbsp; nbsp; 'connector.type' = 'kafka', nbsp;--
> 使用 kafka connector
>   nbsp; nbsp; 'connector.version' = 'universal',
> nbsp;-- kafka
>   版本,universal 支持 0.11 以上的版本
>   nbsp; nbsp; 'connector.topic' = 'user_behavior',
> nbsp;-- kafka topic
>   nbsp; nbsp; 'connector.startup-mode' =
> 'earliest-offset', nbsp;-- 从起始
>   offset 开始读取
>   nbsp; nbsp; 'connector.properties.zookeeper.connect' =
> '
>   192.168.0.150:2181', nbsp;-- zookeeper 地址
>   nbsp; nbsp; 'connector.properties.bootstrap.servers' =
> '
>   192.168.0.150:9092', nbsp;-- kafka broker 地址
>   nbsp; nbsp; 'format.type' = 'json' nbsp;-- 数据源格式为
> json
>   )
>  
>   每小时购买数建表语句
>   CREATE TABLE buy_cnt_per_hour (nbsp;
>   nbsp; nbsp; hour_of_day BIGINT,
>   nbsp; nbsp; buy_cnt BIGINT
>   ) WITH (
>   nbsp; nbsp; 'connector.type' = 'elasticsearch', -- 使用
> elasticsearch
>   connector
>   nbsp; nbsp; 'connector.version' = '6', nbsp;--
> elasticsearch 版本,6 能支持
>   es 6+ 以及 7+ 的版本
>   nbsp; nbsp; 'connector.hosts' = '
> http://192.168.0.150:9200', nbsp;--
>   elasticsearch 地址
>   nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour',
> nbsp;--
>   elasticsearch 索引名,相当于数据库的表名
>   nbsp; nbsp; 'connector.document-type' =
> 'user_behavior', --
>   elasticsearch 的 type,相当于数据库的库名
>   nbsp; nbsp; 'connector.bulk-flush.max-actions' = '1',
> nbsp;-- 每条数据都刷新
>   nbsp; nbsp; 'format.type' = 'json', nbsp;--
> 输出数据格式 json
>   nbsp; nbsp; 'update-mode' = 'append'
>   )
>  
>   插入语句
>   INSERT INTO buy_cnt_per_hournbsp;
>   SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> HOUR)),COUNT(*)nbsp;
>   FROM user_behavior
>   WHERE behavior = 'buy'
>   GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
>  
>   kafka数据发送代码
>  
>   import com.alibaba.fastjson.JSONObject;
>   import org.apache.kafka.clients.producer.KafkaProducer;
>   import org.apache.kafka.clients.producer.ProducerRecord;
>  
>   import java.text.SimpleDateFormat;
>   import java.util.*;
>  
>  
>   public class UserBehaviorProducer {
>   public static final String brokerList = "
> 192.168.0.150:9092";
>  
>   // public static final
> String topic="user_behavior";
>   public static final String topic =
> "user_behavior";
>  
>   public static void main(String args[]) {
>  
>   //配置生产者客户端参数
>   //将配置序列化
>   Properties
> properties = new Properties();
>  
> properties.put("key.serializer",
>   "org.apache.kafka.common.serialization.StringSerializer");
>  
> properties.put("value.serializer",
>   "org.apache.kafka.common.serialization.StringSerializer");
>  
> properties.put("bootstrap.servers", brokerList);
>  
> //创建KafkaProducer 实例
>  
> KafkaProducer   KafkaProducer   //构建待发送的消息
>   //{"user_id":
> "952483", "item_id":"310884", "category_id":
>   "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
>   //{"user_id":
> "794777", "item_id":"5119439", "category_id":
>   "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
>   String[]
> behaviors = {"pv", "buy", "coll",
>  "cart"};//浏览,购买,收藏,加入购物车
>   JSONObject
> jsonObject = new JSONObject();
>   HashMap Stringgt; info = new HashMap   Random random =
> new Random();
>   SimpleDateFormat
> format = new
>   SimpleDateFormat("-MM-dd'T'HH:mm:ss'Z'");
>   long
> 

Re: FLINK JOB

2020-04-20 Thread Jeff Zhang
I see, so you are running flink interpreter in local mode. But you access
zeppelin from a remote machine, right ?  Do you mean you can access it
after changing localhost to ip ? If so, then I can add one configuration in
zeppelin side to replace the localhost to real ip.

Som Lima  于2020年4月20日周一 下午4:44写道:

> I am only running the zeppelin  word count example by clicking the
> zeppelin run arrow.
>
>
> On Mon, 20 Apr 2020, 09:42 Jeff Zhang,  wrote:
>
>> How do you run flink job ? It should not always be localhost:8081
>>
>> Som Lima  于2020年4月20日周一 下午4:33写道:
>>
>>> Hi,
>>>
>>> FLINK JOB  url  defaults to localhost
>>>
>>> i.e. localhost:8081.
>>>
>>> I have to manually change it to server :8081 to get Apache  flink
>>> Web Dashboard to display.
>>>
>>>
>>>
>>>
>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-20 Thread Till Rohrmann
Shall we say for the first version we only can deactivate the upload of
local files instead of doing some optimizations? I guess my problem is that
I don't fully understand the optimizations yet. Maybe we introduce a power
user config option `yarn.submission.automatic-flink-dist-upload` or so.

Why do we need to specify the visibility of the remote files? Won't the
visibility be specified when uploading these files?

Apart from that, the proposal looks good to me.

Cheers,
Till

On Mon, Apr 20, 2020 at 5:38 AM Yang Wang  wrote:

> Hi tison,
>
> I think i get your concerns and points.
>
> Take both FLINK-13938[1] and FLINK-14964[2] into account, i will do in the
> following steps.
> * Enrich "-yt/--yarnship" to support HDFS directory
> * Enrich "-yt/--yarnship" to specify local resource visibility. It is
> "APPLICATION" by default. It could be also configured to "PUBLIC",
> which means shared by all applications, or "PRIVATE" which means shared by
> a same user.
> * Add a new config option to control whether to optimize the
> submission(default is false). When configured to true, Flink client will
> try to filter the jars and files by name and size to avoid unnecessary
> uploading.
>
> A very rough submission command could be issued as following.
> *./bin/flink run -m yarn-cluster -d -yt
> hdfs://myhdfs/flink/release/flink-1.11:PUBLIC,hdfs://myhdfs/user/someone/mylib
> \*
> *-yD yarn.submission-optimization.enable=true
> examples/streaming/WindowJoin.jar*
>
> cc @Rong Rong , since you also help to review the
> old PR of FLINK-13938, maybe you could also share some thoughts.
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-13938
> [2]. https://issues.apache.org/jira/browse/FLINK-14964
>
>
> Best,
> Yang
>
>
>
> tison  于2020年4月18日周六 下午12:12写道:
>
>> Hi Yang,
>>
>> Name filtering & schema special handling makes sense for me. We can
>> enrich later if there is requirement without breaking interface.
>>
>> For #1, from my perspective your first proposal is
>>
>>   having an option specifies remote flink/lib, then we turn off auto
>> uploading local flink/lib and register that path as local resources
>>
>> It seems we here add another special logic for handling one kind of
>> things...what I propose is we do these two steps explicitly separated:
>>
>> 1. an option turns off auto uploading local flink/lib
>> 2. a general option register remote files as local resources
>>
>> The rest thing here is that you propose we handle flink/lib as PUBLIC
>> visibility while other files as APPLICATION visibility, whether a
>> composite configuration or name filtering to special handle libs makes
>> sense though.
>>
>> YarnClusterDescriptor already has a lot of special handling logics which
>> introduce a number of config options and keys, which should
>> have been configured in few of common options and validated at the
>> runtime.
>>
>> Best,
>> tison.
>>
>>
>> Yang Wang  于2020年4月17日周五 下午11:42写道:
>>
>>> Hi tison,
>>>
>>> For #3, if you mean registering remote HDFS file as local resource, we
>>> should make the "-yt/--yarnship"
>>> to support remote directory. I think it is the right direction.
>>>
>>> For #1, if the users could ship remote directory, then they could also
>>> specify like this
>>> "-yt hdfs://hdpdev/flink/release/flink-1.x,
>>> hdfs://hdpdev/user/someone/mylib". Do you mean we add an
>>> option for whether trying to avoid unnecessary uploading? Maybe we could
>>> filter by names and file size.
>>> I think this is a good suggestion, and we do not need to introduce a new
>>> config option "-ypl".
>>>
>>> For #2, for flink-dist, the #1 could already solve the problem. We do
>>> not need to support remote schema.
>>> It will confuse the users when we only support HDFS, not S3, OSS, etc.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> tison  于2020年4月17日周五 下午8:05写道:
>>>
 Hi Yang,

 I agree that these two of works would benefit from single assignee. My
 concern is as below

 1. Both share libs & remote flink dist/libs are remote ship files. I
 don't think we have to implement multiple codepath/configuration.
 2. So, for concept clarification, there are
   (1) an option to disable shipping local libs
   (2) flink-dist supports multiple schema at least we said "hdfs://"
   (3) an option for registering remote shipfiles with path &
 visibility. I think new configuration system helps.

 the reason we have to special handling (2) instead of including it in
 (3) is because when shipping flink-dist to TM container, we specially
 detect flink-dist. Of course we can merge it into general ship files and
 validate shipfiles finally contain flink-dist, which is an alternative.

 The *most important* difference is (1) and (3) which we don't have an
 option for only remote libs. Is this clarification satisfy your proposal?

 Best,
 tison.


 Till Rohrmann  于2020年4月17日周五 下午7:49写道:

> Hi Yang,
>
> from what I understand 

Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread 刘建刚
   I am using Roaring64NavigableMap to compute uv. It is ok to us flink 
planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, 
B, C, D,
E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E

  The udf is as following:
public static class Bitmap extends AggregateFunction {
   @Override
   public Roaring64NavigableMap createAccumulator() {
  return new Roaring64NavigableMap();
   }

   @Override
   public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
  return accumulator;
   }

   public void accumulate(Roaring64NavigableMap bitmap, long id) {
  bitmap.add(id);
   }
}
public static class UV extends ScalarFunction {
   public long eval(Roaring64NavigableMap bitmap) {
  return bitmap.getLongCardinality();
   }
}
  The error is as following:

2020-04-20 16:37:13,868 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  
[flink-akka.actor.default-dispatcher-40]  - 
GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, 
appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 6)], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, 
channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, 
channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> 
SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) 
(8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at 
org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at 
org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
  at 
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
  at 
org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
  at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
  at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
  at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
  at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
  at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
  at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:745)

  Do I need register Roaring64NavigableMap somewhere? Anyone can help me? 
Thank you.



Re: Flink 1.10 Out of memory

2020-04-20 Thread Zahid Rahman
As you can see from the task manager tab of flink web dashboard

Physical Memory:3.80 GB
JVM Heap Size:1.78 GB
Flink Managed Memory:128 MB

*Flink is only using 128M MB which can easily cause OOM*
*error.*

*These are DEFAULT settings.*

*I dusted off an old laptop so it only 3.8 GB RAM.*

What does your job metrics say  ?

On Mon, 20 Apr 2020, 07:26 Xintong Song,  wrote:

> Hi Lasse,
>
> From what I understand, your problem is that JVM tries to fork some native
> process (if you look at the exception stack the root exception is thrown
> from a native method) but there's no enough memory for doing that. This
> could happen when either Mesos is using cgroup strict mode for memory
> control, or there's no more memory on the machine. Flink cannot prevent
> native processes from using more memory. It can only reserve certain amount
> of memory for such native usage when requesting worker memory from the
> deployment environment (in your case Mesos) and allocating Java heap /
> direct memory.
>
> My suggestion is to try increasing the JVM overhead configuration. You can
> leverage the configuration options
> 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in
> the documentation[1].
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-overhead-max
>
> On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman  wrote:
>
>> https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>>
>> On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <
>> lassenedergaardfl...@gmail.com> wrote:
>>
>>> Hi.
>>>
>>> We have migrated to Flink 1.10 and face out of memory exception and
>>> hopeful can someone point us in the right direction.
>>>
>>> We have a job that use broadcast state, and we sometimes get out memory
>>> when it creates a savepoint. See stacktrack below.
>>> We have assigned 2.2 GB/task manager and
>>> configured  taskmanager.memory.process.size : 2200m
>>> In Flink 1.9 our container was terminated because OOM, so 1.10 do a
>>> better job, but it still not working and the task manager is leaking mem
>>> for each OOM and finial kill by Mesos
>>>
>>>
>>> Any idea what we can do to figure out what settings we need to change?
>>>
>>> Thanks in advance
>>>
>>> Lasse Nedergaard
>>>
>>>
>>> WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory -
>>> Could not close the state stream for
>>> s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa.
>>> java.io.IOException: Cannot allocate memory at
>>> java.io.FileOutputStream.writeBytes(Native Method) at
>>> java.io.FileOutputStream.write(FileOutputStream.java:326) at
>>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at
>>> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at
>>> java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at
>>> java.io.FilterOutputStream.close(FilterOutputStream.java:158) at
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995)
>>> at
>>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>>> at
>>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
>>> at
>>> org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>>> at
>>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277)
>>> at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at
>>> org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at
>>> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122)
>>> at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167)
>>> at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
>>> at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
>>> Discarding checkpoint 3509 of job 

Re: FLINK JOB

2020-04-20 Thread Som Lima
I am only running the zeppelin  word count example by clicking the zeppelin
run arrow.


On Mon, 20 Apr 2020, 09:42 Jeff Zhang,  wrote:

> How do you run flink job ? It should not always be localhost:8081
>
> Som Lima  于2020年4月20日周一 下午4:33写道:
>
>> Hi,
>>
>> FLINK JOB  url  defaults to localhost
>>
>> i.e. localhost:8081.
>>
>> I have to manually change it to server :8081 to get Apache  flink
>> Web Dashboard to display.
>>
>>
>>
>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
Hi All!

We hit a the following problem with SQL and trying to understand if there
is a valid workaround.

We have 2 tables:

*Kafka*
timestamp (ROWTIME)
item
quantity

*Hive*
item
price

So we basically have incoming (ts, id, quantity) and we want to join it
with the hive table to get the total price (price * quantity) got the
current item.

After this we want to create window aggregate on quantity*price windowed on
timestamp (event time attribute).

In any way we formulate this query we hit the following error:
org.apache.flink.table.api.TableException: Rowtime attributes must not be
in the input rows of a regular join. As a workaround you can cast the time
attributes of input tables to TIMESTAMP before.

 I understand that the problem is that we cannot join with the Hive table
and still maintain the watermark/even time column. But why is this?

In datastream world I would just simply assign Max watermark to my
enrichment input and join outputs will get the ts of the input record. Can
I achieve something similar in SQL/Table api?

Thank you!
Gyula


Re: FLINK JOB

2020-04-20 Thread Jeff Zhang
How do you run flink job ? It should not always be localhost:8081

Som Lima  于2020年4月20日周一 下午4:33写道:

> Hi,
>
> FLINK JOB  url  defaults to localhost
>
> i.e. localhost:8081.
>
> I have to manually change it to server :8081 to get Apache  flink  Web
> Dashboard to display.
>
>
>
>
>

-- 
Best Regards

Jeff Zhang


FLINK JOB

2020-04-20 Thread Som Lima
Hi,

FLINK JOB  url  defaults to localhost

i.e. localhost:8081.

I have to manually change it to server :8081 to get Apache  flink  Web
Dashboard to display.


Re: Flink incremental checkpointing - how long does data is kept in the share folder

2020-04-20 Thread Shachar Carmeli
Hi Yum
I noticed that the some files are related to the checkpoint but are not 
mentioned in the metadata file
and some of the files that are related in the metadata file (usually 
ByteStreamStateHandle ) are not in the share file 
can you explain this behaviour ?

see code I was using
final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, 
CheckpointTool.class.getClassLoader());

final Set pathSharedFromMetadata = 
savepoint.getOperatorStates().stream()
.flatMap(operatorState -> 
operatorState.getSubtaskStates().values().stream()

.flatMap(operatorSubtaskState -> 
operatorSubtaskState.getManagedKeyedState().stream()

.flatMap(keyedStateHandle -> ((IncrementalKeyedStateHandle) 
keyedStateHandle).getSharedState().values().stream()

.map(streamStateHandle -> {

totalSize[0] += streamStateHandle.getStateSize();

String name = null;

if (streamStateHandle instanceof FileStateHandle) {

name = ((FileStateHandle) 
streamStateHandle).getFilePath().getName();

} else {

final String handleName = ((ByteStreamStateHandle) 
streamStateHandle).getHandleName();

name = new File(handleName).getName();

}

return name;


}
.collect(Collectors.toSet());

Thanks in advance 
Shachar

On 2020/04/13 14:30:40, Yun Tang  wrote: 
> Hi Shachar
> 
> I think you could refer to [1] to know the directory structure of 
> checkpoints. The '_metadata' file contains all information of which  
> checkpointed data file belongs, e.g. file paths under 'shared' folder. As I 
> said before, you need to call Checkpoints#loadCheckpointMetadata to load 
> '_metadata' to know which files belonging to that checkpoint.
> 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
> 
> Best
> Yun Tang
> 
> 
> From: Shachar Carmeli 
> Sent: Sunday, April 12, 2020 15:32
> To: user@flink.apache.org 
> Subject: Re: Flink incremental checkpointing - how long does data is kept in 
> the share folder
> 
> Thank you for the quick response
> Your answer related to the checkpoint folder that contains the _metadata file 
> e.g. chk-1829
> What about the "shared" folder , how do I know which  files in that folder 
> are still relevant and which are left over from a failed checkpoint , they 
> are not directly related to the _metadata checkpoint or am I missing 
> something?
> 
> 
> On 2020/04/07 18:37:57, Yun Tang  wrote:
> > Hi Shachar
> >
> > Why do we see data that is older from lateness configuration
> > There might existed three reasons:
> >
> >   1.  RocksDB really still need that file in current checkpoint. If we 
> > upload one file named as 42.sst at 2/4 at some old checkpoint, current 
> > checkpoint could still include that 42.sst file again if that file is never 
> > be compacted since then. This is possible in theory.
> >   2.  Your checkpoint size is large and checkpoint coordinator could not 
> > remove as fast as possible before exit.
> >   3.  That file is created by a crash task manager and not known to 
> > checkpoint coordinator.
> >
> > How do I know that the files belong to a valid checkpoint and not a 
> > checkpoint of a crushed job - so we can delete those files
> > You have to call Checkpoints#loadCheckpointMetadata[1] to load latest 
> > _metadata in checkpoint directory and compare the file paths with current 
> > files in checkpoint directory. The ones are not in the checkpoint meta and 
> > older than latest checkpoint could be removed. You could follow this to 
> > debug or maybe I could write a tool to help know what files could be 
> > deleted later.
> >
> > [1] 
> > 

Re: Flink 1.10 Out of memory

2020-04-20 Thread Lasse Nedergaard
Hi

Thnaks for the reply. We Will try it out and let everybody know

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 20. apr. 2020 kl. 08.26 skrev Xintong Song :
> 
> 
> Hi Lasse,
> 
> From what I understand, your problem is that JVM tries to fork some native 
> process (if you look at the exception stack the root exception is thrown from 
> a native method) but there's no enough memory for doing that. This could 
> happen when either Mesos is using cgroup strict mode for memory control, or 
> there's no more memory on the machine. Flink cannot prevent native processes 
> from using more memory. It can only reserve certain amount of memory for such 
> native usage when requesting worker memory from the deployment environment 
> (in your case Mesos) and allocating Java heap / direct memory.
> 
> My suggestion is to try increasing the JVM overhead configuration. You can 
> leverage the configuration options 
> 'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in the 
> documentation[1].
> 
> Thank you~
> Xintong Song
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-overhead-max
> 
>> On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman  wrote:
>> https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/
>> 
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯ 
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>> 
>>> On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard 
>>>  wrote:
>>> Hi.
>>> 
>>> We have migrated to Flink 1.10 and face out of memory exception and hopeful 
>>> can someone point us in the right direction. 
>>> 
>>> We have a job that use broadcast state, and we sometimes get out memory 
>>> when it creates a savepoint. See stacktrack below. 
>>> We have assigned 2.2 GB/task manager and configured  
>>> taskmanager.memory.process.size : 2200m
>>> In Flink 1.9 our container was terminated because OOM, so 1.10 do a better 
>>> job, but it still not working and the task manager is leaking mem for each 
>>> OOM and finial kill by Mesos
>>> 
>>> 
>>> Any idea what we can do to figure out what settings we need to change?
>>> 
>>> Thanks in advance
>>> 
>>> Lasse Nedergaard 
>>> 
>>> 
>>> WARN  o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory  - Could 
>>> not close the state stream for 
>>> s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa.
>>> java.io.IOException: Cannot allocate memory
>>> at java.io.FileOutputStream.writeBytes(Native Method)
>>> at java.io.FileOutputStream.write(FileOutputStream.java:326)
>>> at 
>>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>>> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>>> at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
>>> at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
>>> at 
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995)
>>> at 
>>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>>> at 
>>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
>>> at 
>>> org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>>> at 
>>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277)
>>> at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263)
>>> at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250)
>>> at 
>>> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122)
>>> at 
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167)
>>> at 
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
>>> at 
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> 
>>> INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - 
>>> Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c.
>>> org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 
>>> 

flink待优化的列表,希望flink PMC 解决

2020-04-20 Thread xue...@outlook.com
1、 maven大包涉及到长期的工程维护问题

现在官网提供的maven打包方式,直接把第三方包解开后按照目录方式存放
而不是维持maven depend on的标准的jar包方式(带版本)
现在这种方式不利于软件的项目长期管理,项目长期累月运行后,
随着人员变化以及版本升级,会带来很多版本兼容和识别的工程问题

期望flink在1.10.0的后续版本改进该问题,可能需要更改运行时的classloader



2、 维度数据通过RichXXXFunction的open重复加载,浪费存储空间的问题

假设启动一个任务并行度是1K,假设平均分配到10台计算主机计算,那么一个TaskManager会有100个slot,执行RichXXFunciton的open方法执行,那么同一台运算主机,就会有10个重复性的数据打开、加载浪费CPU和内存,希望能够做到在同一个TaskManger,按照job
 
groupName或者jobID实现slot启动后预先加载数据,而不是slot所在的每个线程或者RichXXXFuction重复性的加载数据。Open通过本地的方式从启动预加载数据同步阻塞超时方式获取;



3、 TaskManager的在不同任务之间实现软资源隔离的问题

因业务代码写的可能有问题或者OOM,flink做不到像docker那样的资源隔离,但因多个不同的Job共用slot甚至TaskManager,会出现一个JOB出问题,搞挂一个计算节点的问题。如果能在JOBID级别对于RuntimeException尽心按照Job捕获打印异常,而不影响其他的Job,然后转成控制类的XXXEvent完成Job的生命周期的终结以及上报metrics;



4、 
异常信息,特别是业务的异常信息,往往被flink框架给掩盖,或者带有checkpoint的failover时,会不断重试。希望把业务的异常信息像单机一样直接暴露,对于一些异常信息提供metrics上报,部分限制重试;



5、 研发过程中的小数据量的逻辑测试和现网超大数据量的逻辑往往不一致

比如双流join、甚至最简单的官网样例WordCount也会有这个问题,需要增强Mock仿真现网的实际

情况,否则带来的问题是更改逻辑,导致上次的savepoint不能再用;



6、 向业务方开放接口,回调方式监听处理过程,业务方干预checkpint和任务的完成

举例:比如我的数据是由一个JOB1独立消费清洗规整后sink落盘到HDFS,支持按大小和时间滚动文件。我的另外一个JOB2持续监听JOB1的sink的HDFS文件和目录当成source,通过实时处理后在sink到hdfs或者其他sink。

sourceFunction的无介入点:

A、 对于JOB2如何按照什么顺序消费HDFS文件我无法干预

B、 无法知道这个文件是否消费完成

C、 无法知道这个文件的消费进度

D、JOB Fail时无法干预
sinkFunction的无介入点:

A、 无法知道什么数据已经checkpoint

B、 
如果JOB出现Fail和Restore因flink只对集群内部的state保证只执行一次,但对sink和souce目前缺乏有效的干预方式,因sink和source的差异无法做,为什么不开放给业务处理
Job的无接入点:

A、 
因JOB是长期运行的,但业务的处理是由时间或者业务上的完成点。即需要回调由业务方判断业务已经阶段性完成,这些sink的数据可以使用,或者阶段性的终止JOB,而不是只有很粗暴的一种方式cancel
 job。

我现在只能一次次的刷新hdfs的sink判断是否数据是否无中间状态已经处理完毕,还要对带window的观察,webUI的Records 
Received和Records 
Send,因这些数据是在state中缓冲的webUI上看不到,需要等待StreamingFinkSink的RolloverInterval和InactivityInterval的时间过去后去判断业务数据是否处理完毕



7、 TODO

以上部分都是直接用DataStream遇到的问题和找不到观察点和介入接口




发送自 Windows 10 版邮件应用