Re: 怎么执行flink代码里边的测试用例
首先你要进入测试所在module的目录,在你这个例子中是 flink-connnectors\flink-connector-kafka-base\ 然后执行 mvn -Dtest=KafkaProducerTestBase#testExactlyOnceCustomOperator test -Dtest=后面可以跟<类名>#<方法名>执行某个测试用例,也可以跟<类名>执行某个类的所有测试用例 Thank you~ Xintong Song On Sun, Sep 29, 2019 at 4:32 PM gaofeilong198...@163.com < gaofeilong198...@163.com> wrote: > hi, 我的linux环境可以正常执行 mvn clean install -DskipTests -Dfast 和 mvn clean > install -DskipTests 对flink源码进行编译, > > 那么,我想执行flink里边的测试用例,比如 > flink-connectors\flink-connector-kafka-base\src\test\java\org\apache\flink\streaming\connectors\kafka\KafkaProducerTestBase.java#testExactlyOnceCustomOperator, > 应该怎么执行? > > > > gaofeilong198...@163.com >
Re:Re: map不能返回null值吗
ok,我知道了。确定一下,之前没发现,跟了一下代码,所以问一下。多谢! 在 2019-09-29 16:44:53,"Qi Luo" 写道: >Hi Allan, > >map只能返回非null,你可以考虑使用flatMap。 > >Qi > >On Sun, Sep 29, 2019 at 4:31 PM allan <18612537...@163.com> wrote: > >> Hi, >> 发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。 >> >> java.lang.NullPointerException >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) >> at >> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) >> at >> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) >> at org.apache.flink.streaming.runtime.io >> .StreamInputProcessor.processInput(StreamInputProcessor.java:202) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> >> >> 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。 >> 难道map 不能返回null值吗? >> >> >> @Override >>protected void pushToOperator(StreamRecord record) { >> try { >> // we know that the given outputTag matches our OutputTag so the >> record >> // must be of the type that our operator (and Serializer) expects. >> @SuppressWarnings("unchecked") >> StreamRecord castRecord = (StreamRecord) record; >> >> numRecordsIn.inc(); >> StreamRecord copy = >> castRecord.copy(serializer.copy(castRecord.getValue())); >> operator.setKeyContextElement1(copy); >> operator.processElement(copy); >> } catch (ClassCastException e) { >> if (outputTag != null) { >> // Enrich error message >> ClassCastException replace = new ClassCastException( >>String.format( >> "%s. Failed to push OutputTag with id '%s' to operator. >> " + >> "This can occur when multiple OutputTags with >> different types " + >> "but identical names are being used.", >> e.getMessage(), >> outputTag.getId())); >> >> throw new ExceptionInChainedOperatorException(replace); >> } else { >> throw new ExceptionInChainedOperatorException(e); >> } >> } catch (Exception e) { >> throw new ExceptionInChainedOperatorException(e); >> } >> >>} >> } >> >> >>
Re: map不能返回null值吗
Hi Allan, map只能返回非null,你可以考虑使用flatMap。 Qi On Sun, Sep 29, 2019 at 4:31 PM allan <18612537...@163.com> wrote: > Hi, > 发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。 > > java.lang.NullPointerException > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at org.apache.flink.streaming.runtime.io > .StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > > > 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。 > 难道map 不能返回null值吗? > > > @Override >protected void pushToOperator(StreamRecord record) { > try { > // we know that the given outputTag matches our OutputTag so the > record > // must be of the type that our operator (and Serializer) expects. > @SuppressWarnings("unchecked") > StreamRecord castRecord = (StreamRecord) record; > > numRecordsIn.inc(); > StreamRecord copy = > castRecord.copy(serializer.copy(castRecord.getValue())); > operator.setKeyContextElement1(copy); > operator.processElement(copy); > } catch (ClassCastException e) { > if (outputTag != null) { > // Enrich error message > ClassCastException replace = new ClassCastException( >String.format( > "%s. Failed to push OutputTag with id '%s' to operator. > " + > "This can occur when multiple OutputTags with > different types " + > "but identical names are being used.", > e.getMessage(), > outputTag.getId())); > > throw new ExceptionInChainedOperatorException(replace); > } else { > throw new ExceptionInChainedOperatorException(e); > } > } catch (Exception e) { > throw new ExceptionInChainedOperatorException(e); > } > >} > } > > >
怎么执行flink代码里边的测试用例
hi, 我的linux环境可以正常执行 mvn clean install -DskipTests -Dfast 和 mvn clean install -DskipTests 对flink源码进行编译, 那么,我想执行flink里边的测试用例,比如 flink-connectors\flink-connector-kafka-base\src\test\java\org\apache\flink\streaming\connectors\kafka\KafkaProducerTestBase.java#testExactlyOnceCustomOperator, 应该怎么执行? gaofeilong198...@163.com
Could not forward element to next operator
Hi, 最近发现作业一直在报错,我的窗口是一分钟的窗口。这是什么原因,谁能帮助一下?错误如下: TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming. runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
map不能返回null值吗
Hi, 发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。 java.lang.NullPointerException at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。 难道map 不能返回null值吗? @Override protected void pushToOperator(StreamRecord record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator (and Serializer) expects. @SuppressWarnings("unchecked") StreamRecord castRecord = (StreamRecord) record; numRecordsIn.inc(); StreamRecord copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { if (outputTag != null) { // Enrich error message ClassCastException replace = new ClassCastException( String.format( "%s. Failed to push OutputTag with id '%s' to operator. " + "This can occur when multiple OutputTags with different types " + "but identical names are being used.", e.getMessage(), outputTag.getId())); throw new ExceptionInChainedOperatorException(replace); } else { throw new ExceptionInChainedOperatorException(e); } } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } } }
Re: Could not forward element to next operator
问题可能出在被省略的部分,检查一下最底层的 caused by Thanks, Biao /'bɪ.aʊ/ On Sun, 29 Sep 2019 at 13:17, <18612537...@163.com> wrote: > 我看过这个我不是这个问题,作业没有设置水印,作业可以正常运行,最近可能是运行一天多会报这个异常 > > 发自我的 iPhone > > > 在 2019年9月29日,上午11:49,Wesley Peng 写道: > > > > Hello, > > > > May this article match your issue? > > https://blog.csdn.net/qq_41910230/article/details/90411237 > > > > regards. > > > >> On Sun, Sep 29, 2019 at 10:33 AM allan <18612537...@163.com> wrote: > >> > >> Hi, > >> > >> 最近发现作业一直在报错,我的窗口是一分钟的窗口。这是什么原因,谁能帮助一下?flink版本1.6 ,错误如下: > >> > >> > >> > >> > TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > >> Could not forward element to next operator} > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288) > >> > >> at > >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > >> > >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> > >> at > >> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > >> > >> at > >> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > >> > >> at > >> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > >> > >> at > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > >> > >> at java.lang.Thread.run(Thread.java:748) > >> > >> Caused by: > >> > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > >> Could not forward element to next operator > >> > >> at org.apache.flink.streaming. > >> > >> > >> > runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > >> > >> at > >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > >> > >> at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > > >
Re: task-manager中taskslot的classloader隔离问题
同一个 TM 中,相同 job 的 subtask 会共享一个 classloader Thanks, Biao /'bɪ.aʊ/ On Sat, 28 Sep 2019 at 09:30, Ever <439674...@qq.com> wrote: > 有一个job有2个task,每个task分别有3个subtask(并行度为3), 如下图所示。 > > 每个subtask会占用一个taskslot, 但是同一个job的不同task的subtask可以共享同一个taskslot, > 所以这里应该是一个taskslot会有2个subtask。 > 那么这两个share同一个taskslot的subtask, 其classloader是同一个, > 还是说每个subtask都有不同的classloader呢? > > 因为我的job中会用到一个静态类(Scala的Object或者java中的单例类), > 类里面有个包含基础数据的集合成员变量。我想知道这个变量是需要在每个subtask中初始化, 还是只需要在jvm范围内初始化一次。 > >