Re: 怎么执行flink代码里边的测试用例

2019-09-29 文章 Xintong Song
首先你要进入测试所在module的目录,在你这个例子中是 flink-connnectors\flink-connector-kafka-base\
然后执行 mvn -Dtest=KafkaProducerTestBase#testExactlyOnceCustomOperator test
-Dtest=后面可以跟<类名>#<方法名>执行某个测试用例,也可以跟<类名>执行某个类的所有测试用例


Thank you~

Xintong Song



On Sun, Sep 29, 2019 at 4:32 PM gaofeilong198...@163.com <
gaofeilong198...@163.com> wrote:

> hi, 我的linux环境可以正常执行 mvn clean install -DskipTests -Dfast 和 mvn clean
> install -DskipTests 对flink源码进行编译,
>
> 那么,我想执行flink里边的测试用例,比如
> flink-connectors\flink-connector-kafka-base\src\test\java\org\apache\flink\streaming\connectors\kafka\KafkaProducerTestBase.java#testExactlyOnceCustomOperator,
> 应该怎么执行?
>
>
>
> gaofeilong198...@163.com
>


Re:Re: map不能返回null值吗

2019-09-29 文章 a****



ok,我知道了。确定一下,之前没发现,跟了一下代码,所以问一下。多谢!





在 2019-09-29 16:44:53,"Qi Luo"  写道:
>Hi Allan,
>
>map只能返回非null,你可以考虑使用flatMap。
>
>Qi
>
>On Sun, Sep 29, 2019 at 4:31 PM allan <18612537...@163.com> wrote:
>
>> Hi,
>> 发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。
>>
>> java.lang.NullPointerException
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>> at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>> at org.apache.flink.streaming.runtime.io
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。
>> 难道map 不能返回null值吗?
>>
>>
>> @Override
>>protected  void pushToOperator(StreamRecord record) {
>>   try {
>>  // we know that the given outputTag matches our OutputTag so the
>> record
>>  // must be of the type that our operator (and Serializer) expects.
>>  @SuppressWarnings("unchecked")
>>  StreamRecord castRecord = (StreamRecord) record;
>>
>>  numRecordsIn.inc();
>>  StreamRecord copy =
>> castRecord.copy(serializer.copy(castRecord.getValue()));
>>  operator.setKeyContextElement1(copy);
>>  operator.processElement(copy);
>>   } catch (ClassCastException e) {
>>  if (outputTag != null) {
>> // Enrich error message
>> ClassCastException replace = new ClassCastException(
>>String.format(
>>   "%s. Failed to push OutputTag with id '%s' to operator.
>> " +
>>  "This can occur when multiple OutputTags with
>> different types " +
>>  "but identical names are being used.",
>>   e.getMessage(),
>>   outputTag.getId()));
>>
>> throw new ExceptionInChainedOperatorException(replace);
>>  } else {
>> throw new ExceptionInChainedOperatorException(e);
>>  }
>>   } catch (Exception e) {
>>  throw new ExceptionInChainedOperatorException(e);
>>   }
>>
>>}
>> }
>>
>>
>>


Re: map不能返回null值吗

2019-09-29 文章 Qi Luo
Hi Allan,

map只能返回非null,你可以考虑使用flatMap。

Qi

On Sun, Sep 29, 2019 at 4:31 PM allan <18612537...@163.com> wrote:

> Hi,
> 发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。
>
> java.lang.NullPointerException
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
>
>
> 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。
> 难道map 不能返回null值吗?
>
>
> @Override
>protected  void pushToOperator(StreamRecord record) {
>   try {
>  // we know that the given outputTag matches our OutputTag so the
> record
>  // must be of the type that our operator (and Serializer) expects.
>  @SuppressWarnings("unchecked")
>  StreamRecord castRecord = (StreamRecord) record;
>
>  numRecordsIn.inc();
>  StreamRecord copy =
> castRecord.copy(serializer.copy(castRecord.getValue()));
>  operator.setKeyContextElement1(copy);
>  operator.processElement(copy);
>   } catch (ClassCastException e) {
>  if (outputTag != null) {
> // Enrich error message
> ClassCastException replace = new ClassCastException(
>String.format(
>   "%s. Failed to push OutputTag with id '%s' to operator.
> " +
>  "This can occur when multiple OutputTags with
> different types " +
>  "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>
> throw new ExceptionInChainedOperatorException(replace);
>  } else {
> throw new ExceptionInChainedOperatorException(e);
>  }
>   } catch (Exception e) {
>  throw new ExceptionInChainedOperatorException(e);
>   }
>
>}
> }
>
>
>


怎么执行flink代码里边的测试用例

2019-09-29 文章 gaofeilong198...@163.com
hi, 我的linux环境可以正常执行 mvn clean install -DskipTests -Dfast 和 mvn clean install 
-DskipTests 对flink源码进行编译,

那么,我想执行flink里边的测试用例,比如 
flink-connectors\flink-connector-kafka-base\src\test\java\org\apache\flink\streaming\connectors\kafka\KafkaProducerTestBase.java#testExactlyOnceCustomOperator,
 应该怎么执行?



gaofeilong198...@163.com


Could not forward element to next operator

2019-09-29 文章 allan

Hi,
最近发现作业一直在报错,我的窗口是一分钟的窗口。这是什么原因,谁能帮助一下?错误如下:

TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
 Could not forward element to next operator}
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at org.apache.flink.streaming. 
runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)




map不能返回null值吗

2019-09-29 文章 allan
Hi,
发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。

java.lang.NullPointerException
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)


经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。
难道map 不能返回null值吗?


@Override
   protected  void pushToOperator(StreamRecord record) {
  try {
 // we know that the given outputTag matches our OutputTag so the record
 // must be of the type that our operator (and Serializer) expects.
 @SuppressWarnings("unchecked")
 StreamRecord castRecord = (StreamRecord) record;

 numRecordsIn.inc();
 StreamRecord copy = 
castRecord.copy(serializer.copy(castRecord.getValue()));
 operator.setKeyContextElement1(copy);
 operator.processElement(copy);
  } catch (ClassCastException e) {
 if (outputTag != null) {
// Enrich error message
ClassCastException replace = new ClassCastException(
   String.format(
  "%s. Failed to push OutputTag with id '%s' to operator. " +
 "This can occur when multiple OutputTags with different 
types " +
 "but identical names are being used.",
  e.getMessage(),
  outputTag.getId()));

throw new ExceptionInChainedOperatorException(replace);
 } else {
throw new ExceptionInChainedOperatorException(e);
 }
  } catch (Exception e) {
 throw new ExceptionInChainedOperatorException(e);
  }

   }
}




Re: Could not forward element to next operator

2019-09-29 文章 Biao Liu
问题可能出在被省略的部分,检查一下最底层的 caused by

Thanks,
Biao /'bɪ.aʊ/



On Sun, 29 Sep 2019 at 13:17, <18612537...@163.com> wrote:

> 我看过这个我不是这个问题,作业没有设置水印,作业可以正常运行,最近可能是运行一天多会报这个异常
>
> 发自我的 iPhone
>
> > 在 2019年9月29日,上午11:49,Wesley Peng  写道:
> >
> > Hello,
> >
> > May this article match your issue?
> > https://blog.csdn.net/qq_41910230/article/details/90411237
> >
> > regards.
> >
> >> On Sun, Sep 29, 2019 at 10:33 AM allan <18612537...@163.com> wrote:
> >>
> >> Hi,
> >>
> >> 最近发现作业一直在报错,我的窗口是一分钟的窗口。这是什么原因,谁能帮助一下?flink版本1.6 ,错误如下:
> >>
> >>
> >>
> >>
> TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> >> Could not forward element to next operator}
> >>
> >>   at
> >>
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
> >>
> >>   at
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >>
> >>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >>
> >>   at
> >>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >>
> >>   at
> >>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >>
> >>   at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >>
> >>   at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >>
> >>   at java.lang.Thread.run(Thread.java:748)
> >>
> >> Caused by:
> >>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> >> Could not forward element to next operator
> >>
> >>   at org.apache.flink.streaming.
> >>
> >>
> >>
> runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
> >>
> >>   at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> >>
> >>   at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> >>
> >>   at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>
>
>


Re: task-manager中taskslot的classloader隔离问题

2019-09-29 文章 Biao Liu
同一个 TM 中,相同 job 的 subtask 会共享一个 classloader

Thanks,
Biao /'bɪ.aʊ/



On Sat, 28 Sep 2019 at 09:30, Ever <439674...@qq.com> wrote:

> 有一个job有2个task,每个task分别有3个subtask(并行度为3), 如下图所示。
>
> 每个subtask会占用一个taskslot, 但是同一个job的不同task的subtask可以共享同一个taskslot,
> 所以这里应该是一个taskslot会有2个subtask。
> 那么这两个share同一个taskslot的subtask, 其classloader是同一个,
> 还是说每个subtask都有不同的classloader呢?
>
> 因为我的job中会用到一个静态类(Scala的Object或者java中的单例类),
> 类里面有个包含基础数据的集合成员变量。我想知道这个变量是需要在每个subtask中初始化, 还是只需要在jvm范围内初始化一次。
>
>