[jira] [Created] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility
Huang Wei created FLINK-2720: Summary: Add Storm-CountMetric in flink-stormcompatibility Key: FLINK-2720 URL: https://issues.apache.org/jira/browse/FLINK-2720 Project: Flink Issue Type: New Feature Components: Storm Compatibility Reporter: Huang Wei Assignee: Huang Wei Fix For: 0.10 Add the CountMetric for the first step of storm metrics: 1.Do a wrapper FlinkCountMetric for CountMetric 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` method for registering the metric. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2600) Failing ElasticsearchSinkITCase.testNodeClient test case
[ https://issues.apache.org/jira/browse/FLINK-2600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734764#comment-14734764 ] Huang Wei commented on FLINK-2600: -- Hi, https://travis-ci.org/apache/flink/builds/79250039 Is it the same issue? Log following: `Running org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkITCase Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 7.352 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkITCase testNodeClient(org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkITCase) Time elapsed: 1.372 sec <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:414) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManager.scala:285) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: An error occured in ElasticsearchSink. at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink.close(ElasticsearchSink.java:307) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75) at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:185) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: IndexMissingException[[my-index] missing] at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink$1.afterBulk(ElasticsearchSink.java:240) at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:316) at org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:299) at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:281) at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:264) at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:260) at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:246) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink.invoke(ElasticsearchSink.java:286) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:37) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:163) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) Results : Tests in error: ElasticsearchSinkITCase.testNodeClient » JobExecution Job execution failed. Tests run: 3, Failures: 0, Errors: 1, Skipped: 0` > Failing ElasticsearchSinkITCase.testNodeClient test case > > >
[jira] [Commented] (FLINK-2600) Failing ElasticsearchSinkITCase.testNodeClient test case
[ https://issues.apache.org/jira/browse/FLINK-2600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734782#comment-14734782 ] Huang Wei commented on FLINK-2600: -- If retry I think it is better that record the fail in somewhere even the test is passed finally. After all, if there is a problem it needs to be solved. > Failing ElasticsearchSinkITCase.testNodeClient test case > > > Key: FLINK-2600 > URL: https://issues.apache.org/jira/browse/FLINK-2600 > Project: Flink > Issue Type: Bug >Reporter: Till Rohrmann >Assignee: Aljoscha Krettek > Labels: test-stability > > I observed that the {{ElasticsearchSinkITCase.testNodeClient}} test case > fails on Travis. The stack trace is > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:414) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManager.scala:285) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: An error occured in ElasticsearchSink. > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink.close(ElasticsearchSink.java:307) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:185) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: IndexMissingException[[my-index] > missing] > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink$1.afterBulk(ElasticsearchSink.java:240) > at > org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:316) > at > org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:299) > at > org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:281) > at > org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:264) > at > org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:260) > at > org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:246) > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink.invoke(ElasticsearchSink.java:286) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:37) > at >
[jira] [Created] (FLINK-2626) Add a AverageAccumulator for FLINK-Accumulator
Huang Wei created FLINK-2626: Summary: Add a AverageAccumulator for FLINK-Accumulator Key: FLINK-2626 URL: https://issues.apache.org/jira/browse/FLINK-2626 Project: Flink Issue Type: New Feature Components: Core Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Add a AverageAccumulator for Flink-Accumulator to calculate the average of values. The feature is like Storm-MultiReducedMetric. Feature in detail: 1.This class is named AverageAccumulator(any suggestion? I`m not good at English name) and implements SimpleAccumulator. 2.This class support long, integer and double input. 3.This class will return a double value as the average when the method getLocalValue() called. 4.Member method description: * add(): A variety of add() method to support different inputs(e.g. Double, double, Long, long, Integer, int). It will add the input value and make the count + 1 each time. * public Double getLocalValue(): To return the sum of value dividing count as average and the result will be "double". * public void resetLocal(): To reset the value and count. It won`t be called automatically and it will be better to call this every time the getLocalValue() method called. * public void merge(Accumulatorother): To merge another accumulator(only support AverageAccumulator). * public AverageAccumulator clone(): To return the whole current AverageAccumulator class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api
[ https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14718110#comment-14718110 ] Huang Wei commented on FLINK-2480: -- Sorry, didn`t see this before. Yes, I plan to. I will do some coverage check first. Improving tests coverage for org.apache.flink.streaming.api --- Key: FLINK-2480 URL: https://issues.apache.org/jira/browse/FLINK-2480 Project: Flink Issue Type: Test Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 504h Remaining Estimate: 504h The streaming API is quite a bit newer than the other code so it is not that well covered with tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2536) Add a retry for SocketClientSink
Huang Wei created FLINK-2536: Summary: Add a retry for SocketClientSink Key: FLINK-2536 URL: https://issues.apache.org/jira/browse/FLINK-2536 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 I found the SocketClientSink doesn`t use a re-connect when disconnect from the socket server or get exception. I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2529) fix on some unused code for flink-runtime
[ https://issues.apache.org/jira/browse/FLINK-2529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Wei updated FLINK-2529: - Description: In file BlobServer.java, I found the Thread.currentThread() will never return null in my learned knowledge. So I think shutdownHook != null“ is not necessary in code 'if (shutdownHook != null shutdownHook != Thread.currentThread())'; And I am not complete sure. Maybe I am wrong. fix on some unused code for flink-runtime - Key: FLINK-2529 URL: https://issues.apache.org/jira/browse/FLINK-2529 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 0.10 Reporter: Huang Wei Priority: Minor Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h In file BlobServer.java, I found the Thread.currentThread() will never return null in my learned knowledge. So I think shutdownHook != null“ is not necessary in code 'if (shutdownHook != null shutdownHook != Thread.currentThread())'; And I am not complete sure. Maybe I am wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2534) Improve execution code in CompactingHashTable.java
Huang Wei created FLINK-2534: Summary: Improve execution code in CompactingHashTable.java Key: FLINK-2534 URL: https://issues.apache.org/jira/browse/FLINK-2534 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 I found some improved code in CompactingHashTable.java since this code will execute many times when flink runs. In my opinion, some codes in for and while can be optimized to reduce the times of execution and it is effective to increase the performance. For example, the code following: 'while(numBuckets % numPartitions != 0) { numBuckets++; }' can be optimized into a formula: numBuckets += numPartitions - (numBuckets % numPartitions); -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2529) fix on some unused code for flink-runtime
Huang Wei created FLINK-2529: Summary: fix on some unused code for flink-runtime Key: FLINK-2529 URL: https://issues.apache.org/jira/browse/FLINK-2529 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 0.10 Reporter: Huang Wei Priority: Minor Fix For: 0.10 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2495) Add a null point check in API DataStream.union
[ https://issues.apache.org/jira/browse/FLINK-2495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661555#comment-14661555 ] Huang Wei commented on FLINK-2495: -- [~fhueske] It does not matter. I think as you too. Is it better that print a log and throw a exception when there is a null input in a union? And need I push this test code into Flink? [~Zentol] Yes, maybe there are issues more than one. Should we make a discuss about how to deal with it better? Add a null point check in API DataStream.union -- Key: FLINK-2495 URL: https://issues.apache.org/jira/browse/FLINK-2495 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h The API(public DataStreamOUT union(DataStreamOUT... streams)) is a external interface for user. The parameter streams maybe null and it will throw NullPointerException error. This test below can be intuitive to explain this problem: package org.apache.flink.streaming.api; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.junit.Test; /** * Created by HuangWHWHW on 2015/8/7. */ public class test { public static class sourceFunction extends RichParallelSourceFunctionString { public sourceFunction() { } @Override public void run(SourceContextString sourceContext) throws Exception { sourceContext.collect(a); } @Override public void cancel() { } } @Test public void testUnion(){ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamString source = env.addSource(new sourceFunction()); DataStreamString temp1 = null; DataStreamString temp2 = source.map(new MapFunctionString, String() { @Override public String map(String value) throws Exception { if (value == a) { return This is for test temp2.; } return null; } }); DataStreamString sink = temp2.union(temp1); sink.print(); try { env.execute(); }catch (Exception e){ e.printStackTrace(); } } } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2495) Add a null point check in API DataStream.union
Huang Wei created FLINK-2495: Summary: Add a null point check in API DataStream.union Key: FLINK-2495 URL: https://issues.apache.org/jira/browse/FLINK-2495 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 The API(public DataStreamOUT union(DataStreamOUT... streams)) is a external interface for user. The parameter streams maybe null and it will throw NullPointerException error. This test below can be intuitive to explain this problem: package org.apache.flink.streaming.api; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.junit.Test; /** * Created by HuangWHWHW on 2015/8/7. */ public class test { public static class sourceFunction extends RichParallelSourceFunctionString { public sourceFunction() { } @Override public void run(SourceContextString sourceContext) throws Exception { sourceContext.collect(a); } @Override public void cancel() { } } @Test public void testUnion(){ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamString source = env.addSource(new sourceFunction()); DataStreamString temp1 = null; DataStreamString temp2 = source.map(new MapFunctionString, String() { @Override public String map(String value) throws Exception { if (value == a) { return This is for test temp2.; } return null; } }); DataStreamString sink = temp2.union(temp1); sink.print(); try { env.execute(); }catch (Exception e){ e.printStackTrace(); } } } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
Huang Wei created FLINK-2490: Summary: Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket Key: FLINK-2490 URL: https://issues.apache.org/jira/browse/FLINK-2490 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Priority: Minor Fix For: 0.10 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api
Huang Wei created FLINK-2480: Summary: Improving tests coverage for org.apache.flink.streaming.api Key: FLINK-2480 URL: https://issues.apache.org/jira/browse/FLINK-2480 Project: Flink Issue Type: Test Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10 The streaming API is quite a bit newer than the other code so it is not that well covered with tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2477) Add test for SocketClientSink
Huang Wei created FLINK-2477: Summary: Add test for SocketClientSink Key: FLINK-2477 URL: https://issues.apache.org/jira/browse/FLINK-2477 Project: Flink Issue Type: Test Components: Streaming Affects Versions: 0.10 Environment: win7 32bit;linux Reporter: Huang Wei Priority: Minor Fix For: 0.10 Add some tests for SocketClientSink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2456) The flink-hbase module dependencies hadoop-2 specifies a repository ID
Huang Wei created FLINK-2456: Summary: The flink-hbase module dependencies hadoop-2 specifies a repository ID Key: FLINK-2456 URL: https://issues.apache.org/jira/browse/FLINK-2456 Project: Flink Issue Type: Wish Components: Build System Affects Versions: 0.10 Environment: win7 32bit;linux Reporter: Huang Wei Fix For: 0.10 In some special network environment, we can only use maven mirrors to download dependencies for mvn package. It cannot complete construction using one mirror. Error like: [ERROR] Failed to execute goal on project flink-hbase: Could not resolve dependencies for project org.apache.flink:flink-hbase:jar:0.10-SNAPSHOT: Failure to find org.apache.hbase:hbase-server:jar:0.98.11-hadoop2 in http://mirrors.ibiblio.org/pub/mirrors/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of ibiblio.org has elapsed or updates are forced - [Help 1] We need to specify a repository ID to hadoop-2 plugin so that it can be downloaded over another mirror. This will not affect the normal network environment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2391) Storm-compatibility:method FlinkTopologyBuilder.createTopology() throws java.lang.NullPointerException
Huang Wei created FLINK-2391: Summary: Storm-compatibility:method FlinkTopologyBuilder.createTopology() throws java.lang.NullPointerException Key: FLINK-2391 URL: https://issues.apache.org/jira/browse/FLINK-2391 Project: Flink Issue Type: Bug Components: flink-contrib Affects Versions: 0.10 Environment: win7 32bit;linux Reporter: Huang Wei Fix For: 0.10 core dumped at FlinkOutputFieldsDeclarer.java : 160(package FlinkOutputFieldsDeclarer). code : fieldIndexes[i] = this.outputSchema.fieldIndex(groupingFields.get(i)); in this line, the var this.outputSchema may be null. -- This message was sent by Atlassian JIRA (v6.3.4#6332)