[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout
[ https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-17921: - Description: As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} would cause akka.timeout. But that's not the message what we want. If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} return {{null}} and is used in {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause the following exception, this is not expected to happen from there. If we increase the {{akka.ask.timeout}} to another value, exception is still in there. {code:java} java.io.IOException: Error updating global aggregate. at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) ... 8 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) {code} The following stacktrace would describe the root cause. We can see that {{CompletableFuture.waitingGet}} is the key point, it imply that the {{Completabilefuture}} will give the current thread to waiting, which will lead to the timeout of the akka communication of Flink. Therefore, even if the timeout is 1 hour, the problem cannot be solved. {code:java} java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0007b76617a8> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTa
[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout
[ https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-17921: - Description: As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} would cause akka.timeout. But that's not the message what we want. If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} return {{null}} and is used in {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause the following exception, this is not expected to happen from there. If we increase the {{akka.ask.timeout}} to another value, exception is still in there. {code:java} java.io.IOException: Error updating global aggregate. at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) ... 8 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) {code} The following stacktrace would describe the root cause. We can see that {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} will give the current thread to waiting, which will lead to the timeout of the akka communication of Flink. Therefore, even if the timeout is 1 hour, the problem cannot be solved. {code:java} java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0007b76617a8> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.ja
[jira] [Commented] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout
[ https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17115882#comment-17115882 ] zhangminglei commented on FLINK-17921: -- cc [~jgrier] Could you please take a look on this ? Thank you very much. > RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected > akka.timeout > --- > > Key: FLINK-17921 > URL: https://issues.apache.org/jira/browse/FLINK-17921 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.8.1, 1.10.1 > Reporter: zhangminglei >Priority: Major > > As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} > would cause akka.timeout. But that's not the message what we want. > If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} > return {{null}} and used it in > {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause > following exception, this is not expected to happen from there. If we > increase the {{akka.ask.timeout}} to another value, the exception is still in > there. > {code:java} > java.io.IOException: Error updating global aggregate. > at > org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message > of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical > reason for `AskTimeoutException` is that the recipient actor didn't send a > reply. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) > ... 8 more > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message > of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical > reason for `AskTimeoutException` is that the recipient actor didn't send a > reply. > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) > {code} > The following stacktrace would describe the root cause. We can see that > {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} > will give the current thread to waiting, which will lead to the timeout of > the akka communication of Flink. Therefore, even if the timeout is 1 hour, > the problem cannot be solved. > {code:java} > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007b76617a8> (a > java.util.concurrent.CompletableFut
[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout
[ https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-17921: - Affects Version/s: 1.8.1 1.10.1 > RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected > akka.timeout > --- > > Key: FLINK-17921 > URL: https://issues.apache.org/jira/browse/FLINK-17921 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.8.1, 1.10.1 > Reporter: zhangminglei >Priority: Major > > As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} > would cause akka.timeout. But that's not the message what we want. > If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} > return {{null}} and used it in > {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause > following exception, this is not expected to happen from there. If we > increase the {{akka.ask.timeout}} to another value, the exception is still in > there. > {code:java} > java.io.IOException: Error updating global aggregate. > at > org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message > of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical > reason for `AskTimeoutException` is that the recipient actor didn't send a > reply. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) > ... 8 more > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message > of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical > reason for `AskTimeoutException` is that the recipient actor didn't send a > reply. > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) > {code} > The following stacktrace would describe the root cause. We can see that > {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} > will give the current thread to waiting, which will lead to the timeout of > the akka communication of Flink. Therefore, even if the timeout is 1 hour, > the problem cannot be solved. > {code:java} > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007b76617a8> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175
[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout
[ https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-17921: - Summary: RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout (was: RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout) > RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected > akka.timeout > --- > > Key: FLINK-17921 > URL: https://issues.apache.org/jira/browse/FLINK-17921 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: zhangminglei >Priority: Major > > As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} > would cause akka.timeout. But that's not the message what we want. > If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} > return {{null}} and used it in > {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause > following exception, this is not expected to happen from there. If we > increase the {{akka.ask.timeout}} to another value, the exception is still in > there. > {code:java} > java.io.IOException: Error updating global aggregate. > at > org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message > of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical > reason for `AskTimeoutException` is that the recipient actor didn't send a > reply. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) > ... 8 more > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message > of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical > reason for `AskTimeoutException` is that the recipient actor didn't send a > reply. > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) > {code} > The following stacktrace would describe the root cause. We can see that > {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} > will give the current thread to waiting, which will lead to the timeout of > the akka communication of Flink. Therefore, even if the timeout is 1 hour, > the problem cannot be solved. > {code:java} > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007b76617a8> (a > java.util.concurrent.CompletableFuture
[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout
[ https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-17921: - Description: As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} would cause akka.timeout. But that's not the message what we want. If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} return {{null}} and used it in {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause following exception, this is not expected to happen from there. If we increase the {{akka.ask.timeout}} to another value, the exception is still in there. {code:java} java.io.IOException: Error updating global aggregate. at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) ... 8 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) {code} The following stacktrace would describe the root cause. We can see that {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} will give the current thread to waiting, which will lead to the timeout of the akka communication of Flink. Therefore, even if the timeout is 1 hour, the problem cannot be solved. {code:java} java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0007b76617a8> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.ja
[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout
[ https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-17921: - Description: As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} would cause akka.timeout. If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} return {{null}} and used it in {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause following exception, this is not expected to happen from there. If we increase the {{akka.ask.timeout}} to another value, the exception is still in there. {code:java} java.io.IOException: Error updating global aggregate. at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) ... 8 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) {code} The following stacktrace would describe the root cause. We can see that {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} will give the current thread to waiting, which will lead to the timeout of the akka communication of Flink. Therefore, even if the timeout is 1 hour, the problem cannot be solved. {code:java} java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0007b76617a8> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.ja
[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout
[ https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-17921: - Component/s: Runtime / Task > RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout > > > Key: FLINK-17921 > URL: https://issues.apache.org/jira/browse/FLINK-17921 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task > Reporter: zhangminglei >Priority: Major > > As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} > would cause akka.timeout. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout
zhangminglei created FLINK-17921: Summary: RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout Key: FLINK-17921 URL: https://issues.apache.org/jira/browse/FLINK-17921 Project: Flink Issue Type: Improvement Reporter: zhangminglei As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} would cause akka.timeout. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17919) KafkaConsumerThread should add ratelimiter functionality as well
[ https://issues.apache.org/jira/browse/FLINK-17919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17115860#comment-17115860 ] zhangminglei commented on FLINK-17919: -- Hi, [~tzulitai] Could you please take a look on this Jira ? If could, I would like to take this ticket. Thank you. > KafkaConsumerThread should add ratelimiter functionality as well > > > Key: FLINK-17919 > URL: https://issues.apache.org/jira/browse/FLINK-17919 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Reporter: zhangminglei >Priority: Major > > Currently, {{KafkaConsumerThread}} within {{flink-connector-kafka-09}} has > the ability of rateLimiter. However, under {{flink-connector-kafka}} does not > own it. I would suggest we can add it as well if we could. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17919) KafkaConsumerThread should add ratelimiter functionality as well
zhangminglei created FLINK-17919: Summary: KafkaConsumerThread should add ratelimiter functionality as well Key: FLINK-17919 URL: https://issues.apache.org/jira/browse/FLINK-17919 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: zhangminglei Currently, {{KafkaConsumerThread}} within {{flink-connector-kafka-09}} has the ability of rateLimiter. However, under {{flink-connector-kafka}} does not own it. I would suggest we can add it as well if we could. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17878) StreamingFileWriter watermark attribute is transient, this might be different with origin value
[ https://issues.apache.org/jira/browse/FLINK-17878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-17878: - Component/s: (was: Table SQL / API) Connectors / FileSystem > StreamingFileWriter watermark attribute is transient, this might be different > with origin value > --- > > Key: FLINK-17878 > URL: https://issues.apache.org/jira/browse/FLINK-17878 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.11.0 >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > StreamingFileWriter has a > private transient long currentWatermark = Long.MIN_VALUE; > > in case developer wants to create a custom bucket assigner, it will receive a > currentWatermark as 0, this might be conflict with the original flink > approach to handle a min_long. > > should we remove the transient key word? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9410: --- Assignee: zhangminglei (was: lihongli) > Replace NMClient with NMClientAsync in YarnResourceManager > -- > > Key: FLINK-9410 > URL: https://issues.apache.org/jira/browse/FLINK-9410 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Till Rohrmann > Assignee: zhangminglei >Priority: Critical > > Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} > which is called from within the main thread of the {{ResourceManager}}. Since > these operations are blocking, we should replace the client with the > {{NMClientAsync}} and make the calls non blocking. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10114) Support Orc for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-10114: Assignee: lihongli (was: vinoyang) > Support Orc for StreamingFileSink > - > > Key: FLINK-10114 > URL: https://issues.apache.org/jira/browse/FLINK-10114 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors > Reporter: zhangminglei >Assignee: lihongli >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9410: --- Assignee: lihongli > Replace NMClient with NMClientAsync in YarnResourceManager > -- > > Key: FLINK-9410 > URL: https://issues.apache.org/jira/browse/FLINK-9410 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: lihongli >Priority: Critical > Fix For: 1.8.0 > > > Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} > which is called from within the main thread of the {{ResourceManager}}. Since > these operations are blocking, we should replace the client with the > {{NMClientAsync}} and make the calls non blocking. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9410: --- Assignee: (was: zhangminglei) > Replace NMClient with NMClientAsync in YarnResourceManager > -- > > Key: FLINK-9410 > URL: https://issues.apache.org/jira/browse/FLINK-9410 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Critical > Fix For: 1.8.0 > > > Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} > which is called from within the main thread of the {{ResourceManager}}. Since > these operations are blocking, we should replace the client with the > {{NMClientAsync}} and make the calls non blocking. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-10355) The order of the column should start from 1.
[ https://issues.apache.org/jira/browse/FLINK-10355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-10355: - Comment: was deleted (was: Hi, [~lihongli] Thanks for opening this jira. I would not think so since second column means the number of 1 there. It should be start from 0 to added up. And then one by one. ) > The order of the column should start from 1. > > > Key: FLINK-10355 > URL: https://issues.apache.org/jira/browse/FLINK-10355 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: lihongli >Priority: Major > Labels: easyfix > Attachments: B0C32FD9-47FE-4F63-921F-A9E49C0CB5CD.png > > > When I register an external Table using a CsvTableSource.It throws an > exception :"Parsing error for column 1".But I finally found that the second > column is the error column.I think that the order of the column should start > from 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10355) The order of the column should start from 1.
[ https://issues.apache.org/jira/browse/FLINK-10355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-10355: - Component/s: (was: Java API) Table API & SQL > The order of the column should start from 1. > > > Key: FLINK-10355 > URL: https://issues.apache.org/jira/browse/FLINK-10355 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: lihongli >Priority: Major > Labels: easyfix > Attachments: B0C32FD9-47FE-4F63-921F-A9E49C0CB5CD.png > > > When I register an external Table using a CsvTableSource.It throws an > exception :"Parsing error for column 1".But I finally found that the second > column is the error column.I think that the order of the column should start > from 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10355) The order of the column should start from 1.
[ https://issues.apache.org/jira/browse/FLINK-10355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-10355: - Labels: easyfix (was: ) > The order of the column should start from 1. > > > Key: FLINK-10355 > URL: https://issues.apache.org/jira/browse/FLINK-10355 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: lihongli >Priority: Major > Labels: easyfix > Attachments: B0C32FD9-47FE-4F63-921F-A9E49C0CB5CD.png > > > When I register an external Table using a CsvTableSource.It throws an > exception :"Parsing error for column 1".But I finally found that the second > column is the error column.I think that the order of the column should start > from 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10355) The order of the column should start from 1.
[ https://issues.apache.org/jira/browse/FLINK-10355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617741#comment-16617741 ] zhangminglei commented on FLINK-10355: -- Hi, [~lihongli] Thanks for opening this jira. I would not think so since second column means the number of 1 there. It should be start from 0 to added up. And then one by one. > The order of the column should start from 1. > > > Key: FLINK-10355 > URL: https://issues.apache.org/jira/browse/FLINK-10355 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 1.6.0 >Reporter: lihongli >Priority: Major > Attachments: B0C32FD9-47FE-4F63-921F-A9E49C0CB5CD.png > > > When I register an external Table using a CsvTableSource.It throws an > exception :"Parsing error for column 1".But I finally found that the second > column is the error column.I think that the order of the column should start > from 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10114) Support Orc for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602721#comment-16602721 ] zhangminglei commented on FLINK-10114: -- Thank you so much @vinoyang. Lots of users need this functionality indeed. > Support Orc for StreamingFileSink > - > > Key: FLINK-10114 > URL: https://issues.apache.org/jira/browse/FLINK-10114 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors > Reporter: zhangminglei >Assignee: vinoyang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10114) Support Orc for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599433#comment-16599433 ] zhangminglei commented on FLINK-10114: -- I can't continue to work on this JIRA because I've just joined Alibaba for some job reasons. Anyone who wants to do this can finish this pr. Thank you very much. > Support Orc for StreamingFileSink > - > > Key: FLINK-10114 > URL: https://issues.apache.org/jira/browse/FLINK-10114 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors > Reporter: zhangminglei >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10114) Support Orc for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-10114: Assignee: (was: zhangminglei) > Support Orc for StreamingFileSink > - > > Key: FLINK-10114 > URL: https://issues.apache.org/jira/browse/FLINK-10114 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors > Reporter: zhangminglei >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10114) Support Orc for StreamingFileSink
zhangminglei created FLINK-10114: Summary: Support Orc for StreamingFileSink Key: FLINK-10114 URL: https://issues.apache.org/jira/browse/FLINK-10114 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: zhangminglei Assignee: zhangminglei -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9411) Support parquet rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei resolved FLINK-9411. - Resolution: Won't Fix > Support parquet rolling sink writer > --- > > Key: FLINK-9411 > URL: https://issues.apache.org/jira/browse/FLINK-9411 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector > Reporter: zhangminglei >Assignee: Triones Deng >Priority: Major > > Like support orc rolling sink writer in FLINK-9407 , we should also support > parquet rolling sink writer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7829) Remove (or at least deprecate) DataStream.writeToFile/Csv
[ https://issues.apache.org/jira/browse/FLINK-7829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-7829: Description: These methods are seductive for users but they should never actually use them in a production streaming job. For those cases the {{StreamingFileSink}} should be used. (was: These methods are seductive for users but they should never actually use them in a production streaming job. For those cases the {{BucketingSink}} should be used.) > Remove (or at least deprecate) DataStream.writeToFile/Csv > - > > Key: FLINK-7829 > URL: https://issues.apache.org/jira/browse/FLINK-7829 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Priority: Major > Fix For: 1.7.0 > > > These methods are seductive for users but they should never actually use them > in a production streaming job. For those cases the {{StreamingFileSink}} > should be used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9675) Avoid FileInputStream/FileOutputStream
[ https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9675: Affects Version/s: 1.4.1 1.4.2 1.5.1 > Avoid FileInputStream/FileOutputStream > -- > > Key: FLINK-9675 > URL: https://issues.apache.org/jira/browse/FLINK-9675 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.1, 1.4.2, 1.5.1 >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Minor > Labels: filesystem, pull-request-available > Fix For: 1.7.0 > > > They rely on finalizers (before Java 11), which create unnecessary GC load. > The alternatives, Files.newInputStream, are as easy to use and don't have > this issue. > And here is a benchmark > https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9675) Avoid FileInputStream/FileOutputStream
[ https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9675: Fix Version/s: 1.7.0 > Avoid FileInputStream/FileOutputStream > -- > > Key: FLINK-9675 > URL: https://issues.apache.org/jira/browse/FLINK-9675 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.1, 1.4.2, 1.5.1 >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Minor > Labels: filesystem, pull-request-available > Fix For: 1.7.0 > > > They rely on finalizers (before Java 11), which create unnecessary GC load. > The alternatives, Files.newInputStream, are as easy to use and don't have > this issue. > And here is a benchmark > https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7829) Remove (or at least deprecate) DataStream.writeToFile/Csv
[ https://issues.apache.org/jira/browse/FLINK-7829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574080#comment-16574080 ] zhangminglei commented on FLINK-7829: - [~aljoscha] We should replace make the use of {{BucketingSink}} to {{StreamingFileSink}} since {{BucketingSink}} can not support exactly once. > Remove (or at least deprecate) DataStream.writeToFile/Csv > - > > Key: FLINK-7829 > URL: https://issues.apache.org/jira/browse/FLINK-7829 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Priority: Major > Fix For: 1.7.0 > > > These methods are seductive for users but they should never actually use them > in a production streaming job. For those cases the {{BucketingSink}} should > be used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5789) Make Bucketing Sink independent of Hadoop's FileSystem
[ https://issues.apache.org/jira/browse/FLINK-5789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571483#comment-16571483 ] zhangminglei commented on FLINK-5789: - Thank you [~till.rohrmann] > Make Bucketing Sink independent of Hadoop's FileSystem > -- > > Key: FLINK-5789 > URL: https://issues.apache.org/jira/browse/FLINK-5789 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.4, 1.2.0 >Reporter: Stephan Ewen >Priority: Major > > The {{BucketingSink}} is hard wired to Hadoop's FileSystem, bypassing Flink's > file system abstraction. > This causes several issues: > - The bucketing sink will behave different than other file sinks with > respect to configuration > - Directly supported file systems (not through hadoop) like the MapR File > System does not work in the same way with the BuketingSink as other file > systems > - The previous point is all the more problematic in the effort to make > Hadoop an optional dependency and with in other stacks (Mesos, Kubernetes, > AWS, GCE, Azure) with ideally no Hadoop dependency. > We should port the {{BucketingSink}} to use Flink's FileSystem classes. > To support the *truncate* functionality that is needed for the exactly-once > semantics of the Bucketing Sink, we should extend Flink's FileSystem > abstraction to have the methods > - {{boolean supportsTruncate()}} > - {{void truncate(Path, long)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5789) Make Bucketing Sink independent of Hadoop's FileSystem
[ https://issues.apache.org/jira/browse/FLINK-5789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571065#comment-16571065 ] zhangminglei commented on FLINK-5789: - I would ask, in the future, We would not recommend users to use {{BucketingSink}}, By using {{StreamingFileSink}} instead ? > Make Bucketing Sink independent of Hadoop's FileSystem > -- > > Key: FLINK-5789 > URL: https://issues.apache.org/jira/browse/FLINK-5789 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.4, 1.2.0 >Reporter: Stephan Ewen >Priority: Major > > The {{BucketingSink}} is hard wired to Hadoop's FileSystem, bypassing Flink's > file system abstraction. > This causes several issues: > - The bucketing sink will behave different than other file sinks with > respect to configuration > - Directly supported file systems (not through hadoop) like the MapR File > System does not work in the same way with the BuketingSink as other file > systems > - The previous point is all the more problematic in the effort to make > Hadoop an optional dependency and with in other stacks (Mesos, Kubernetes, > AWS, GCE, Azure) with ideally no Hadoop dependency. > We should port the {{BucketingSink}} to use Flink's FileSystem classes. > To support the *truncate* functionality that is needed for the exactly-once > semantics of the Bucketing Sink, we should extend Flink's FileSystem > abstraction to have the methods > - {{boolean supportsTruncate()}} > - {{void truncate(Path, long)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9982) NPE in EnumValueSerializer#copy
[ https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9982: --- Assignee: dalongliu (was: zhangminglei) > NPE in EnumValueSerializer#copy > --- > > Key: FLINK-9982 > URL: https://issues.apache.org/jira/browse/FLINK-9982 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1 > Reporter: zhangminglei >Assignee: dalongliu >Priority: Major > > When execute the flink job in flink 1.3.2 version. We met the below error. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50) > at > org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:304) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:393) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:233) > at > org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9982) NPE in EnumValueSerializer#copy
[ https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16563648#comment-16563648 ] zhangminglei commented on FLINK-9982: - [~lsy] Could you tell [~Zentol] what the enum field in your case class null ? I do not have the environment and program now since I left the job. > NPE in EnumValueSerializer#copy > --- > > Key: FLINK-9982 > URL: https://issues.apache.org/jira/browse/FLINK-9982 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1 > Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > > When execute the flink job in flink 1.3.2 version. We met the below error. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50) > at > org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:304) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:393) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:233) > at > org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9614) Improve the error message for Compiler#compile
[ https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei resolved FLINK-9614. - Resolution: Won't Fix > Improve the error message for Compiler#compile > -- > > Key: FLINK-9614 > URL: https://issues.apache.org/jira/browse/FLINK-9614 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: zhangminglei > Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > When the below sql has too long. Like > case when case when . > when host in > ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247') > then 'condition' > Then cause the {{StackOverflowError}}. And the current code is below, we can > solve this by setting -Xss 20m, instead of {{This is a bug..}} > {code:java} > trait Compiler[T] { > @throws(classOf[CompileException]) > def compile(cl: ClassLoader, name: String, code: String): Class[T] = { > require(cl != null, "Classloader must not be null.") > val compiler = new SimpleCompiler() > compiler.setParentClassLoader(cl) > try { > compiler.cook(code) > } catch { > case t: Throwable => > throw new InvalidProgramException("Table program cannot be compiled. > " + > "This is a bug. Please file an issue.", t) > } > compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9982) NPE in EnumValueSerializer#copy
[ https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9982: --- Assignee: zhangminglei (was: dalongliu) > NPE in EnumValueSerializer#copy > --- > > Key: FLINK-9982 > URL: https://issues.apache.org/jira/browse/FLINK-9982 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1 > Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > > When execute the flink job in flink 1.3.2 version. We met the below error. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50) > at > org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:304) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:393) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:233) > at > org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9985) Incorrect parameter order in document
[ https://issues.apache.org/jira/browse/FLINK-9985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9985: --- Assignee: zhangminglei (was: dalongliu) > Incorrect parameter order in document > - > > Key: FLINK-9985 > URL: https://issues.apache.org/jira/browse/FLINK-9985 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.1 >Reporter: zhangminglei > Assignee: zhangminglei >Priority: Major > > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#incremental-window-aggregation-with-foldfunction > {code:java} > public Tuple3 fold(Tuple3 acc, > SensorReading s) { > Integer cur = acc.getField(2); > acc.setField(2, cur + 1); // incorrect parameter order , it should be > acc.setField(cur + 1, 2) > return acc; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9985) Incorrect parameter order in document
[ https://issues.apache.org/jira/browse/FLINK-9985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9985: --- Assignee: dalongliu > Incorrect parameter order in document > - > > Key: FLINK-9985 > URL: https://issues.apache.org/jira/browse/FLINK-9985 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.1 >Reporter: zhangminglei >Assignee: dalongliu >Priority: Major > > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#incremental-window-aggregation-with-foldfunction > {code:java} > public Tuple3 fold(Tuple3 acc, > SensorReading s) { > Integer cur = acc.getField(2); > acc.setField(2, cur + 1); // incorrect parameter order , it should be > acc.setField(cur + 1, 2) > return acc; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9982) NPE in EnumValueSerializer#copy
[ https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9982: --- Assignee: dalongliu > NPE in EnumValueSerializer#copy > --- > > Key: FLINK-9982 > URL: https://issues.apache.org/jira/browse/FLINK-9982 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1 > Reporter: zhangminglei >Assignee: dalongliu >Priority: Major > > When execute the flink job in flink 1.3.2 version. We met the below error. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50) > at > org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:304) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:393) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:233) > at > org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9825) Upgrade checkstyle version to 8.6
[ https://issues.apache.org/jira/browse/FLINK-9825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9825: --- Assignee: dalongliu > Upgrade checkstyle version to 8.6 > - > > Key: FLINK-9825 > URL: https://issues.apache.org/jira/browse/FLINK-9825 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Assignee: dalongliu >Priority: Minor > > We should upgrade checkstyle version to 8.6+ so that we can use the "match > violation message to this regex" feature for suppression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9825) Upgrade checkstyle version to 8.6
[ https://issues.apache.org/jira/browse/FLINK-9825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560532#comment-16560532 ] zhangminglei commented on FLINK-9825: - [~lsy] Now, You can push a PR to this issue. If you face any problem, you can talk with anyone that relevant to this topic you think. But before ask, you should try to solve by yourself and try at least once. We are all looking forward to your contributions! > Upgrade checkstyle version to 8.6 > - > > Key: FLINK-9825 > URL: https://issues.apache.org/jira/browse/FLINK-9825 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Priority: Minor > > We should upgrade checkstyle version to 8.6+ so that we can use the "match > violation message to this regex" feature for suppression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9985) Incorrect parameter order in document
[ https://issues.apache.org/jira/browse/FLINK-9985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559782#comment-16559782 ] zhangminglei commented on FLINK-9985: - Thanks [~lsy] contribution to flink! You can feel free to open a PR to this issue. Then the committer will give you the permission , then next time, you can assign the jira that you would like to work on it then. > Incorrect parameter order in document > - > > Key: FLINK-9985 > URL: https://issues.apache.org/jira/browse/FLINK-9985 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.1 >Reporter: zhangminglei >Priority: Major > > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#incremental-window-aggregation-with-foldfunction > {code:java} > public Tuple3 fold(Tuple3 acc, > SensorReading s) { > Integer cur = acc.getField(2); > acc.setField(2, cur + 1); // incorrect parameter order , it should be > acc.setField(cur + 1, 2) > return acc; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9985) Incorrect parameter order in document
[ https://issues.apache.org/jira/browse/FLINK-9985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9985: Description: https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#incremental-window-aggregation-with-foldfunction {code:java} public Tuple3 fold(Tuple3 acc, SensorReading s) { Integer cur = acc.getField(2); acc.setField(2, cur + 1); // incorrect parameter order , it should be acc.setField(cur + 1, 2) return acc; } {code} > Incorrect parameter order in document > - > > Key: FLINK-9985 > URL: https://issues.apache.org/jira/browse/FLINK-9985 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.1 >Reporter: zhangminglei >Priority: Major > > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#incremental-window-aggregation-with-foldfunction > {code:java} > public Tuple3 fold(Tuple3 acc, > SensorReading s) { > Integer cur = acc.getField(2); > acc.setField(2, cur + 1); // incorrect parameter order , it should be > acc.setField(cur + 1, 2) > return acc; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9982) NPE in EnumValueSerializer#copy
[ https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559771#comment-16559771 ] zhangminglei commented on FLINK-9982: - Hi, [~till.rohrmann] Could you add [~lsy] to contribution list ? It is the first time for him to contribute to apache flink. Thank you . > NPE in EnumValueSerializer#copy > --- > > Key: FLINK-9982 > URL: https://issues.apache.org/jira/browse/FLINK-9982 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1 > Reporter: zhangminglei >Priority: Major > > When execute the flink job in flink 1.3.2 version. We met the below error. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50) > at > org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:304) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:393) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:233) > at > org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9985) Incorrect parameter order in document
zhangminglei created FLINK-9985: --- Summary: Incorrect parameter order in document Key: FLINK-9985 URL: https://issues.apache.org/jira/browse/FLINK-9985 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.5.1 Reporter: zhangminglei -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9982) NPE in EnumValueSerializer#copy
[ https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9982: Description: When execute the flink job in flink 1.3.2 version. We met the below error. {code:java} java.lang.NullPointerException at org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50) at org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92) at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:304) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:393) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:233) at org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385) {code} > NPE in EnumValueSerializer#copy > --- > > Key: FLINK-9982 > URL: https://issues.apache.org/jira/browse/FLINK-9982 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1 > Reporter: zhangminglei >Priority: Major > > When execute the flink job in flink 1.3.2 version. We met the below error. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50) > at > org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWat
[jira] [Updated] (FLINK-9982) NPE in EnumValueSerializer#copy
[ https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9982: Affects Version/s: 1.3.3 1.4.1 1.4.2 1.5.1 > NPE in EnumValueSerializer#copy > --- > > Key: FLINK-9982 > URL: https://issues.apache.org/jira/browse/FLINK-9982 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1 > Reporter: zhangminglei >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9982) NPE in EnumValueSerializer#copy
[ https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559754#comment-16559754 ] zhangminglei commented on FLINK-9982: - Hi, @vinoyang Since the bad network, So, I failed to provided more description here. I will add more then. Thank you > NPE in EnumValueSerializer#copy > --- > > Key: FLINK-9982 > URL: https://issues.apache.org/jira/browse/FLINK-9982 > Project: Flink > Issue Type: Bug > Reporter: zhangminglei >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9982) NPE in EnumValueSerializer#copy
zhangminglei created FLINK-9982: --- Summary: NPE in EnumValueSerializer#copy Key: FLINK-9982 URL: https://issues.apache.org/jira/browse/FLINK-9982 Project: Flink Issue Type: Bug Reporter: zhangminglei -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9825) Upgrade checkstyle version to 8.6
[ https://issues.apache.org/jira/browse/FLINK-9825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559160#comment-16559160 ] zhangminglei commented on FLINK-9825: - Hi, [~Zentol] Could you please give [~lsy] a permission that contribute to apache flink ? It will be the first time to him for getting start with flink. Thank you so much. > Upgrade checkstyle version to 8.6 > - > > Key: FLINK-9825 > URL: https://issues.apache.org/jira/browse/FLINK-9825 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Priority: Minor > > We should upgrade checkstyle version to 8.6+ so that we can use the "match > violation message to this regex" feature for suppression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9825) Upgrade checkstyle version to 8.6
[ https://issues.apache.org/jira/browse/FLINK-9825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9825: --- Assignee: (was: zhangminglei) > Upgrade checkstyle version to 8.6 > - > > Key: FLINK-9825 > URL: https://issues.apache.org/jira/browse/FLINK-9825 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Priority: Minor > > We should upgrade checkstyle version to 8.6+ so that we can use the "match > violation message to this regex" feature for suppression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9651) Add a Kafka table source factory with Protobuf format support
[ https://issues.apache.org/jira/browse/FLINK-9651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559128#comment-16559128 ] zhangminglei commented on FLINK-9651: - Hi, [~twalthr] Thanks and I am on my way to do this. > Add a Kafka table source factory with Protobuf format support > - > > Key: FLINK-9651 > URL: https://issues.apache.org/jira/browse/FLINK-9651 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL > Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9609) Add bucket ready mechanism for BucketingSink when checkpoint complete
[ https://issues.apache.org/jira/browse/FLINK-9609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9609: Fix Version/s: (was: 1.6.1) 1.7.0 > Add bucket ready mechanism for BucketingSink when checkpoint complete > - > > Key: FLINK-9609 > URL: https://issues.apache.org/jira/browse/FLINK-9609 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector, Streaming Connectors >Affects Versions: 1.5.0, 1.4.2 >Reporter: zhangminglei > Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, BucketingSink only support {{notifyCheckpointComplete}}. However, > users want to do some extra work when a bucket is ready. It would be nice if > we can support {{BucketReady}} mechanism for users or we can tell users when > a bucket is ready for use. For example, One bucket is created for every 5 > minutes, at the end of 5 minutes before creating the next bucket, the user > might need to do something as the previous bucket ready, like sending the > timestamp of the bucket ready time to a server or do some other stuff. > Here, Bucket ready means all the part files suffix name under a bucket > neither {{.pending}} nor {{.in-progress}}. Then we can think this bucket is > ready for user use. Like a watermark means no elements with a timestamp older > or equal to the watermark timestamp should arrive at the window. We can also > refer to the concept of watermark here, or we can call this *BucketWatermark* > if we could. > Recently, I found a user who wants this functionality which I would think. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Let-BucketingSink-roll-file-on-each-checkpoint-td19034.html > Below is what he said: > My user case is we read data from message queue, write to HDFS, and our ETL > team will use the data in HDFS. *In the case, ETL need to know if all data is > ready to be read accurately*, so we use a counter to count how many data has > been wrote, if the counter is equal to the number we received, we think HDFS > file is ready. We send the counter message in a custom sink so ETL can know > how many data has been wrote, but if use current BucketingSink, even through > HDFS file is flushed, ETL may still cannot read the data. If we can close > file during checkpoint, then the result is accurately. And for the HDFS small > file problem, it can be controller by use bigger checkpoint interval. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6103 Hi, @tillrohrmann I found more than one user found this issue and affect their work. I suggest we can merge this PR as a temporary solution until the travis turn green. ---
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r204233444 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java --- @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + private TypeDescription schema; + + private String meatSchema; + + private transient org.apache.orc.Writer writer; + + private VectorizedRowBatch rowBatch; + + private CompressionKind compressionKind; + + private long writedRowSize; + + private OrcBatchWriter orcBatchWriter; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of a orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.meatSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { + typeInformations.add(schemaToTypeInfo(typeDescription)); + }); + + return new TableSchema( + fieldNames.toArray(new String[fieldNames.size()]), + typeInformations.toArray(new TypeInformation[typeInformations.size()])); + } + + @Override + public void write(T element) throws IOException { + Boolean isFill = orcBatchWriter.fill(rowBatch, element)
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r204232617 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptio
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r204231919 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java --- @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + private TypeDescription schema; + + private String meatSchema; + + private transient org.apache.orc.Writer writer; + + private VectorizedRowBatch rowBatch; + + private CompressionKind compressionKind; + + private long writedRowSize; + + private OrcBatchWriter orcBatchWriter; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of a orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.meatSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptions.forEach(typeDescription -> { + typeInformations.add(schemaToTypeInfo(typeDescription)); + }); + + return new TableSchema( + fieldNames.toArray(new String[fieldNames.size()]), + typeInformations.toArray(new TypeInformation[typeInformations.size()])); + } + + @Override + public void write(T element) throws IOException { + Boolean isFill = orcBatchWriter.fill(rowBatch, element)
[jira] [Updated] (FLINK-9609) Add bucket ready mechanism for BucketingSink when checkpoint complete
[ https://issues.apache.org/jira/browse/FLINK-9609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9609: Fix Version/s: 1.6.1 > Add bucket ready mechanism for BucketingSink when checkpoint complete > - > > Key: FLINK-9609 > URL: https://issues.apache.org/jira/browse/FLINK-9609 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector, Streaming Connectors >Affects Versions: 1.5.0, 1.4.2 >Reporter: zhangminglei > Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > Fix For: 1.6.1 > > > Currently, BucketingSink only support {{notifyCheckpointComplete}}. However, > users want to do some extra work when a bucket is ready. It would be nice if > we can support {{BucketReady}} mechanism for users or we can tell users when > a bucket is ready for use. For example, One bucket is created for every 5 > minutes, at the end of 5 minutes before creating the next bucket, the user > might need to do something as the previous bucket ready, like sending the > timestamp of the bucket ready time to a server or do some other stuff. > Here, Bucket ready means all the part files suffix name under a bucket > neither {{.pending}} nor {{.in-progress}}. Then we can think this bucket is > ready for user use. Like a watermark means no elements with a timestamp older > or equal to the watermark timestamp should arrive at the window. We can also > refer to the concept of watermark here, or we can call this *BucketWatermark* > if we could. > Recently, I found a user who wants this functionality which I would think. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Let-BucketingSink-roll-file-on-each-checkpoint-td19034.html > Below is what he said: > My user case is we read data from message queue, write to HDFS, and our ETL > team will use the data in HDFS. *In the case, ETL need to know if all data is > ready to be read accurately*, so we use a counter to count how many data has > been wrote, if the counter is equal to the number we received, we think HDFS > file is ready. We send the counter message in a custom sink so ETL can know > how many data has been wrote, but if use current BucketingSink, even through > HDFS file is flushed, ETL may still cannot read the data. If we can close > file during checkpoint, then the result is accurately. And for the HDFS small > file problem, it can be controller by use bigger checkpoint interval. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6378: [FLINK-9236] [pom] upgrade the version of apache parent p...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6378 Could you please push code and trigger the travis again ? ---
[GitHub] flink pull request #6380: [FLINK-9614] [table] Improve the error message for...
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/6380 [FLINK-9614] [table] Improve the error message for Compiler#compile ## What is the purpose of the change Improve the error message for Compiler#compile You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-9614-improve-error Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6380.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6380 commit f4406622c67df9212c545d36b3572e774acf1df7 Author: zhangminglei Date: 2018-07-21T08:39:16Z [FLINK-9614] [table] Improve the error message for Compiler#compile ---
[GitHub] flink issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6336 Hi, @ubyyj Thanks for your contribution! It looks good to me. ---
[jira] [Updated] (FLINK-9900) Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
[ https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9900: Affects Version/s: 1.5.1 > Failed to testRestoreBehaviourWithFaultyStateHandles > (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) > --- > > Key: FLINK-9900 > URL: https://issues.apache.org/jira/browse/FLINK-9900 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.1 >Reporter: zhangminglei >Priority: Major > > https://api.travis-ci.org/v3/job/405843617/log.txt > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec > <<< FAILURE! - in > org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase > > testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) > Time elapsed: 120.036 sec <<< ERROR! > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244) > Results : > Tests in error: > > ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244 > » TestTimedOut > Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6378: [FLINK-9236] [pom] upgrade the version of apache parent p...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6378 @TisonShadowsong is right. You can verify that and i think it might be a bug. ---
[GitHub] flink issue #6378: [FLINK-9236] [pom] upgrade the version of apache parent p...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6378 @YCjia The travis test failed. Could you take a look on what is happening ? ---
[GitHub] flink pull request #6375: [FLINK-9609] [connectors] Add bucket ready mechani...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6375#discussion_r203997376 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -722,9 +727,31 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { } } } + isBucketReady(partitionPaths); } } + @Override + public boolean isBucketReady(Set bucketPathes) { + for (Path path : bucketPathes) { + try { + RemoteIterator files = fs.listFiles(path, false); + while (files.hasNext()) { + LocatedFileStatus fileStatus = files.next(); + if (fileStatus.getPath().getName().endsWith(pendingSuffix) || + fileStatus.getPath().getName().endsWith(inProgressSuffix)) { + return false; + } + } + return true; --- End diff -- Ahhh. Oops! Yes, you are very right! :) ---
[GitHub] flink pull request #6375: [FLINK-9609] [connectors] Add bucket ready mechani...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6375#discussion_r203995868 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -722,9 +727,31 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { } } } + isBucketReady(partitionPaths); } } + @Override + public boolean isBucketReady(Set bucketPathes) { --- End diff -- If we finish the for loop and haven't returned true yet, then return false. I think this is reasonable. If a file end with pending or inProgress, This means that the file is not ready. and we can not use it. ---
[GitHub] flink issue #6375: [FLINK-9609] [connectors] Add bucket ready mechanism for ...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6375 Hi, @tillrohrmann Could you please take a look on this PR ? Actually, I don't know which committer is more familiar with ```BucketingSink```. Thank you so much. ---
[jira] [Updated] (FLINK-9901) Refactor InputStreamReader to Channels.newReader
[ https://issues.apache.org/jira/browse/FLINK-9901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9901: Description: From this benchmark report https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html. We can get a better performance boost by using {{Channels.newReader}}. (was: From this benchmark report https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html. We can get a better performance booth by using {{Channels.newReader}}.) > Refactor InputStreamReader to Channels.newReader > > > Key: FLINK-9901 > URL: https://issues.apache.org/jira/browse/FLINK-9901 > Project: Flink > Issue Type: Sub-task > Reporter: zhangminglei >Priority: Major > > From this benchmark report > https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html. > We can get a better performance boost by using {{Channels.newReader}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6365 Thanks @yanghua for pointing this out! ---
[jira] [Updated] (FLINK-9901) Refactor InputStreamReader to Channels.newReader
[ https://issues.apache.org/jira/browse/FLINK-9901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9901: Description: From this benchmark report https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html. We can get a better performance booth by using {{Channels.newReader}}. > Refactor InputStreamReader to Channels.newReader > > > Key: FLINK-9901 > URL: https://issues.apache.org/jira/browse/FLINK-9901 > Project: Flink > Issue Type: Sub-task > Reporter: zhangminglei >Priority: Major > > From this benchmark report > https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html. > We can get a better performance booth by using {{Channels.newReader}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9901) Refactor InputStreamReader to Channels.newReader
zhangminglei created FLINK-9901: --- Summary: Refactor InputStreamReader to Channels.newReader Key: FLINK-9901 URL: https://issues.apache.org/jira/browse/FLINK-9901 Project: Flink Issue Type: Sub-task Reporter: zhangminglei -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9896) Fix flink documentation error
[ https://issues.apache.org/jira/browse/FLINK-9896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9896: --- Assignee: zhangminglei > Fix flink documentation error > - > > Key: FLINK-9896 > URL: https://issues.apache.org/jira/browse/FLINK-9896 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Hequn Cheng >Assignee: zhangminglei >Priority: Critical > Attachments: image-2018-07-19-23-19-32-259.png > > > Flink version of master has been upgraded to 1.7 snapshot, but documentation > still point to 1.6 > !image-2018-07-19-23-19-32-259.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9900) Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
[ https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9900: Description: https://api.travis-ci.org/v3/job/405843617/log.txt Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec <<< FAILURE! - in org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) Time elapsed: 120.036 sec <<< ERROR! org.junit.runners.model.TestTimedOutException: test timed out after 12 milliseconds at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244) Results : Tests in error: ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244 » TestTimedOut Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29 was: Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec <<< FAILURE! - in org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) Time elapsed: 120.036 sec <<< ERROR! org.junit.runners.model.TestTimedOutException: test timed out after 12 milliseconds at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244) Results : Tests in error: ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244 » TestTimedOut Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29 > Failed to testRestoreBehaviourWithFaultyStateHandles > (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) > --- > > Key: FLINK-9900 > URL: https://issues.apache.org/jira/browse/FLINK-9900 > Project: Flink > Issue Type: Bug >Reporter: zhangminglei >Priority: Major > > https://api.travis-ci.org/v3/job/405843617/log.txt > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec > <<< FAILURE! - in > org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase > > testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) > Time elapsed: 120.036 sec <<< ERROR! > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244) > Results : > Tests in error: > > ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244 > » TestTimedOut > Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9900) Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
zhangminglei created FLINK-9900: --- Summary: Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) Key: FLINK-9900 URL: https://issues.apache.org/jira/browse/FLINK-9900 Project: Flink Issue Type: Bug Reporter: zhangminglei Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec <<< FAILURE! - in org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) Time elapsed: 120.036 sec <<< ERROR! org.junit.runners.model.TestTimedOutException: test timed out after 12 milliseconds at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244) Results : Tests in error: ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244 » TestTimedOut Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6375: [FLINK-9609] [connectors] Add bucket ready mechani...
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/6375 [FLINK-9609] [connectors] Add bucket ready mechanism for BucketingSin⦠## What is the purpose of the change Currently, BucketingSink only support ```notifyCheckpointComplete```. However, users want to do some extra work when a bucket is ready. It would be nice if we can support BucketReady mechanism for users or we can tell users when a bucket is ready for use. For example, One bucket is created for every 5 minutes, at the end of 5 minutes before creating the next bucket, the user might need to do something as the previous bucket ready, like sending the timestamp of the bucket ready time to a server or do some other stuff. Here, Bucket ready means all the part files suffix name under a bucket neither .pending nor .in-progress. Then we can think this bucket is ready for user use. Like a watermark means no elements with a timestamp older or equal to the watermark timestamp should arrive at the window. We can also refer to the concept of watermark here, or we can call this BucketWatermark if we could. ## Brief change log Add an interface ```BucketReady``` . ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-9609-bucketready Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6375.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6375 commit f95894956ac15d09b51b3a232d6f83227582e641 Author: zhangminglei Date: 2018-07-19T14:38:45Z [FLINK-9609] [connectors] Add bucket ready mechanism for BucketingSink when checkpoint complete ---
[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6365 Hi, @yanghua I do not use a inner maven repo. It seems does not work for me.Exclude SNAPSHOT dependency and add a stable glassfish but still useless. ---
[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6365 @yanghua Could you please take on look this issue also ? ---
[jira] [Assigned] (FLINK-9236) Use Apache Parent POM 19
[ https://issues.apache.org/jira/browse/FLINK-9236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9236: --- Assignee: jiayichao (was: Stephen Jason) > Use Apache Parent POM 19 > > > Key: FLINK-9236 > URL: https://issues.apache.org/jira/browse/FLINK-9236 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Assignee: jiayichao >Priority: Major > > Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out. > This will also fix Javadoc generation with JDK 10+ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6365 Hi, @tedyu I found an issue when upgrade the hbase version to either 2.0.1 or 2.1.0. Both can lead problem. ```Failed to read artifact descriptor for org.glassfish:javax.el:jar 3.0.1-b06-SNAPSHOT```. And then cause some problem. Maybe because of my network? ---
[jira] [Commented] (FLINK-9236) Use Apache Parent POM 19
[ https://issues.apache.org/jira/browse/FLINK-9236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548968#comment-16548968 ] zhangminglei commented on FLINK-9236: - Hi, [~jiayichao] we are all looking forward to you first contribution to apache flink! And [~Stephen Jason]. > Use Apache Parent POM 19 > > > Key: FLINK-9236 > URL: https://issues.apache.org/jira/browse/FLINK-9236 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Assignee: Stephen Jason >Priority: Major > > Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out. > This will also fix Javadoc generation with JDK 10+ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548759#comment-16548759 ] zhangminglei commented on FLINK-4534: - Hi, [~gjy] Can we use a lightweight synchronization mechanism to solve this ? For example, use {{volatile}} to void this issue. > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry> entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6335 Hi, @hequn8128 I will create another jira for ```new InputStreamReader``` refactor issue. What do you think ? ---
[jira] [Updated] (FLINK-9675) Avoid FileInputStream/FileOutputStream
[ https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9675: Description: They rely on finalizers (before Java 11), which create unnecessary GC load. The alternatives, Files.newInputStream, are as easy to use and don't have this issue. And here is a benchmark https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html was: They rely on finalizers (before Java 11), which create unnecessary GC load. The alternatives, Files.newInputStream, are as easy to use and don't have this issue. > Avoid FileInputStream/FileOutputStream > -- > > Key: FLINK-9675 > URL: https://issues.apache.org/jira/browse/FLINK-9675 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > Assignee: zhangminglei >Priority: Minor > Labels: filesystem, pull-request-available > > They rely on finalizers (before Java 11), which create unnecessary GC load. > The alternatives, Files.newInputStream, are as easy to use and don't have > this issue. > And here is a benchmark > https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6335 Thank you very much! @hequn8128 ! I will take a look. ---
[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6365 Thanks @yanghua you are right! I just want waiting the travis ending. then give the old and new version's dependency tree. ---
[GitHub] flink pull request #6365: [FLINK-9849] [hbase] Hbase upgrade
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/6365 [FLINK-9849] [hbase] Hbase upgrade ## What is the purpose of the change Upgrade hbase version to 2.0.1 for hbase connector You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-9849-hbase-upgrade Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6365.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6365 commit cb4bc4b641565e6caf823e85d541df3022a59237 Author: zhangminglei Date: 2018-07-18T13:44:37Z [FLINK-9849] [hbase] Hbase upgrade ---
[jira] [Commented] (FLINK-9675) Avoid FileInputStream/FileOutputStream
[ https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16547449#comment-16547449 ] zhangminglei commented on FLINK-9675: - I will create the subtask for the rest of test code path. > Avoid FileInputStream/FileOutputStream > -- > > Key: FLINK-9675 > URL: https://issues.apache.org/jira/browse/FLINK-9675 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > Assignee: zhangminglei >Priority: Minor > Labels: filesystem, pull-request-available > > They rely on finalizers (before Java 11), which create unnecessary GC load. > The alternatives, Files.newInputStream, are as easy to use and don't have > this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9849) Upgrade hbase version to 2.0.1 for hbase connector
[ https://issues.apache.org/jira/browse/FLINK-9849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16547342#comment-16547342 ] zhangminglei commented on FLINK-9849: - Hi, [~yuzhih...@gmail.com] Is the problem raised by this FLINK-2153 solved ? hbase-annotations module issue. Thank you. > Upgrade hbase version to 2.0.1 for hbase connector > -- > > Key: FLINK-9849 > URL: https://issues.apache.org/jira/browse/FLINK-9849 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > Assignee: zhangminglei >Priority: Major > > Currently hbase 1.4.3 is used for hbase connector. > We should upgrade to 2.0.1 which was recently released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6335 Hi, @tedyu I have updated the code. Please check again. Thank you very much. ---
[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6335 @tedyu Could you please take a look on this ? Thank you very much. ---
[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6335 Thanks @yanghua I will push the rest of the code then and fix the travis error. ---
[GitHub] flink pull request #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutput...
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/6335 [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream ## What is the purpose of the change Avoid using FileInputStream/FileOutputStream because they rely on finalizers (before Java 11), which create unnecessary GC load. The alternatives, Files.newInputStream, are as easy to use and don't have this issue. ## Brief change log On the code path, the ```FileInputStream``` used by the test classes has not been modified to maintain the original appearance. This is the first commit of this PR. ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (don't know) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-9675-fisfos Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6335.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6335 commit fe7c90bc3165722ef4e1289ac809fa92c8e3a08d Author: zhangminglei Date: 2018-07-15T02:21:45Z [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream ---
[jira] [Assigned] (FLINK-9849) Upgrade hbase version to 2.0.1 for hbase connector
[ https://issues.apache.org/jira/browse/FLINK-9849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9849: --- Assignee: zhangminglei > Upgrade hbase version to 2.0.1 for hbase connector > -- > > Key: FLINK-9849 > URL: https://issues.apache.org/jira/browse/FLINK-9849 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > Assignee: zhangminglei >Priority: Major > > Currently hbase 1.4.3 is used for hbase connector. > We should upgrade to 2.0.1 which was recently released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5375: [FLINK-7095] [TaskManager] Add Command line parsin...
Github user zhangminglei closed the pull request at: https://github.com/apache/flink/pull/5375 ---
[GitHub] flink issue #5375: [FLINK-7095] [TaskManager] Add Command line parsing tool ...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5375 Hi, @tillrohrmann . You are welcome ~ I still have a lot of other flink jira will be addressed by me in the future. ---
[jira] [Assigned] (FLINK-9825) Upgrade checkstyle version to 8.6
[ https://issues.apache.org/jira/browse/FLINK-9825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9825: --- Assignee: zhangminglei > Upgrade checkstyle version to 8.6 > - > > Key: FLINK-9825 > URL: https://issues.apache.org/jira/browse/FLINK-9825 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > Assignee: zhangminglei >Priority: Minor > > We should upgrade checkstyle version to 8.6+ so that we can use the "match > violation message to this regex" feature for suppression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200892031 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptio
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200890648 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptio
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200889912 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptio
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200889601 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java --- @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.fs.StreamWriterBase; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}. + * + * @param The type of the elements that are being written by the sink. + */ +public class OrcFileWriter extends StreamWriterBase { + + private static final long serialVersionUID = 3L; + + /** +* The description of the types in an ORC file. +*/ + private TypeDescription schema; + + /** +* The schema of an ORC file. +*/ + private String metaSchema; + + /** +* A row batch that will be written to the ORC file. +*/ + private VectorizedRowBatch rowBatch; + + /** +* The writer that fill the records into the batch. +*/ + private OrcBatchWriter orcBatchWriter; + + private transient org.apache.orc.Writer writer; + + private CompressionKind compressionKind; + + /** +* The number of rows that currently being written. +*/ + private long writedRowSize; + + /** +* Creates a new {@code OrcFileWriter} that writes orc files without compression. +* +* @param metaSchema The orc schema. +*/ + public OrcFileWriter(String metaSchema) { + this(metaSchema, CompressionKind.NONE); + } + + /** +* Create a new {@code OrcFileWriter} that writes orc file with the gaven +* schema and compression kind. +* +* @param metaSchema The schema of an orc file. +* @param compressionKind The compression kind to use. +*/ + public OrcFileWriter(String metaSchema, CompressionKind compressionKind) { + this.metaSchema = metaSchema; + this.schema = TypeDescription.fromString(metaSchema); + this.compressionKind = compressionKind; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + writer = OrcFile.createWriter(path, OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind)); + rowBatch = schema.createRowBatch(); + orcBatchWriter = new OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes())); + } + + private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) { + List fieldNames = orcSchema.getFieldNames(); + List typeDescriptions = orcSchema.getChildren(); + List typeInformations = new ArrayList<>(); + + typeDescriptio
[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6075#discussion_r200888987 --- Diff: flink-connectors/flink-orc/pom.xml --- @@ -54,6 +54,14 @@ under the License. true + + org.apache.flink + flink-connector-filesystem_${scala.binary.version} + ${project.version} + + true + + org.apache.orc orc-core --- End diff -- Yes. We can upgrade it. Will update. ---