[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout

2020-05-25 Thread zhangminglei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-17921:
-
Description: 
As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
would cause akka.timeout.  But that's not the message what we want.

If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} return 
{{null}} and is used in {{RpcGlobalAggregateManager#updateGlobalAggregate}} , 
this would cause the following exception, this is not expected to happen from 
there. If we increase the {{akka.ask.timeout}} to another value, exception is 
still in there. 

{code:java}
java.io.IOException: Error updating global aggregate.
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: 
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
... 8 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
{code}

The following stacktrace would describe the root cause. We can see that 
{{CompletableFuture.waitingGet}} is the key point, it imply that the 
{{Completabilefuture}} will give the current thread to waiting, which will lead 
to the timeout of the akka communication of Flink. Therefore, even if the 
timeout is 1 hour, the problem cannot be solved. 

{code:java}
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0007b76617a8> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTa

[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout

2020-05-25 Thread zhangminglei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-17921:
-
Description: 
As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
would cause akka.timeout.  But that's not the message what we want.

If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} return 
{{null}} and is used in {{RpcGlobalAggregateManager#updateGlobalAggregate}} , 
this would cause the following exception, this is not expected to happen from 
there. If we increase the {{akka.ask.timeout}} to another value, exception is 
still in there. 

{code:java}
java.io.IOException: Error updating global aggregate.
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: 
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
... 8 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
{code}

The following stacktrace would describe the root cause. We can see that 
{{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} will 
give the current thread to waiting, which will lead to the timeout of the akka 
communication of Flink. Therefore, even if the timeout is 1 hour, the problem 
cannot be solved. 

{code:java}
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0007b76617a8> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.ja

[jira] [Commented] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout

2020-05-25 Thread zhangminglei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17115882#comment-17115882
 ] 

zhangminglei commented on FLINK-17921:
--

cc [~jgrier] Could you please take a look on this ? Thank you very much.

> RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected 
> akka.timeout
> ---
>
> Key: FLINK-17921
> URL: https://issues.apache.org/jira/browse/FLINK-17921
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.8.1, 1.10.1
>    Reporter: zhangminglei
>Priority: Major
>
> As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
> would cause akka.timeout.  But that's not the message what we want.
> If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} 
> return {{null}} and used it in 
> {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause 
> following exception, this is not expected to happen from there. If we 
> increase the {{akka.ask.timeout}} to another value, the exception is still in 
> there. 
> {code:java}
> java.io.IOException: Error updating global aggregate.
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> ... 8 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> {code}
> The following stacktrace would describe the root cause. We can see that 
> {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} 
> will give the current thread to waiting, which will lead to the timeout of 
> the akka communication of Flink. Therefore, even if the timeout is 1 hour, 
> the problem cannot be solved. 
> {code:java}
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x0007b76617a8> (a 
> java.util.concurrent.CompletableFut

[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout

2020-05-25 Thread zhangminglei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-17921:
-
Affects Version/s: 1.8.1
   1.10.1

> RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected 
> akka.timeout
> ---
>
> Key: FLINK-17921
> URL: https://issues.apache.org/jira/browse/FLINK-17921
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.8.1, 1.10.1
>    Reporter: zhangminglei
>Priority: Major
>
> As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
> would cause akka.timeout.  But that's not the message what we want.
> If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} 
> return {{null}} and used it in 
> {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause 
> following exception, this is not expected to happen from there. If we 
> increase the {{akka.ask.timeout}} to another value, the exception is still in 
> there. 
> {code:java}
> java.io.IOException: Error updating global aggregate.
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> ... 8 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> {code}
> The following stacktrace would describe the root cause. We can see that 
> {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} 
> will give the current thread to waiting, which will lead to the timeout of 
> the akka communication of Flink. Therefore, even if the timeout is 1 hour, 
> the problem cannot be solved. 
> {code:java}
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x0007b76617a8> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175

[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout

2020-05-25 Thread zhangminglei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-17921:
-
Summary: RpcGlobalAggregateManager#updateGlobalAggregate would cause 
unexpected akka.timeout  (was: RpcGlobalAggregateManager#updateGlobalAggregate 
would cause akka.timeout)

> RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected 
> akka.timeout
> ---
>
> Key: FLINK-17921
> URL: https://issues.apache.org/jira/browse/FLINK-17921
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: zhangminglei
>Priority: Major
>
> As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
> would cause akka.timeout.  But that's not the message what we want.
> If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} 
> return {{null}} and used it in 
> {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause 
> following exception, this is not expected to happen from there. If we 
> increase the {{akka.ask.timeout}} to another value, the exception is still in 
> there. 
> {code:java}
> java.io.IOException: Error updating global aggregate.
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> ... 8 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> {code}
> The following stacktrace would describe the root cause. We can see that 
> {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} 
> will give the current thread to waiting, which will lead to the timeout of 
> the akka communication of Flink. Therefore, even if the timeout is 1 hour, 
> the problem cannot be solved. 
> {code:java}
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x0007b76617a8> (a 
> java.util.concurrent.CompletableFuture

[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout

2020-05-25 Thread zhangminglei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-17921:
-
Description: 
As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
would cause akka.timeout.  But that's not the message what we want.

If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} return 
{{null}} and used it in {{RpcGlobalAggregateManager#updateGlobalAggregate}} , 
this would cause following exception, this is not expected to happen from 
there. If we increase the {{akka.ask.timeout}} to another value, the exception 
is still in there. 

{code:java}
java.io.IOException: Error updating global aggregate.
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: 
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
... 8 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
{code}

The following stacktrace would describe the root cause. We can see that 
{{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} will 
give the current thread to waiting, which will lead to the timeout of the akka 
communication of Flink. Therefore, even if the timeout is 1 hour, the problem 
cannot be solved. 

{code:java}
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0007b76617a8> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.ja

[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout

2020-05-25 Thread zhangminglei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-17921:
-
Description: 
As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
would cause akka.timeout. 

 
If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} return 
{{null}} and used it in {{RpcGlobalAggregateManager#updateGlobalAggregate}} , 
this would cause following exception, this is not expected to happen from 
there. If we increase the {{akka.ask.timeout}} to another value, the exception 
is still in there. 

{code:java}
java.io.IOException: Error updating global aggregate.
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: 
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
... 8 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#-1046052429]] after [1 ms]. Message 
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
{code}

The following stacktrace would describe the root cause. We can see that 
{{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} will 
give the current thread to waiting, which will lead to the timeout of the akka 
communication of Flink. Therefore, even if the timeout is 1 hour, the problem 
cannot be solved. 

{code:java}
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0007b76617a8> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.ja

[jira] [Updated] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout

2020-05-25 Thread zhangminglei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-17921:
-
Component/s: Runtime / Task

> RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout
> 
>
> Key: FLINK-17921
> URL: https://issues.apache.org/jira/browse/FLINK-17921
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>    Reporter: zhangminglei
>Priority: Major
>
> As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
> would cause akka.timeout. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17921) RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout

2020-05-25 Thread zhangminglei (Jira)
zhangminglei created FLINK-17921:


 Summary: RpcGlobalAggregateManager#updateGlobalAggregate would 
cause akka.timeout
 Key: FLINK-17921
 URL: https://issues.apache.org/jira/browse/FLINK-17921
 Project: Flink
  Issue Type: Improvement
Reporter: zhangminglei


As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
would cause akka.timeout. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17919) KafkaConsumerThread should add ratelimiter functionality as well

2020-05-25 Thread zhangminglei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17115860#comment-17115860
 ] 

zhangminglei commented on FLINK-17919:
--

Hi, [~tzulitai] Could you please take a look on this Jira ?  If could, I would 
like to take this ticket. Thank you.

> KafkaConsumerThread should add ratelimiter functionality as well
> 
>
> Key: FLINK-17919
> URL: https://issues.apache.org/jira/browse/FLINK-17919
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>    Reporter: zhangminglei
>Priority: Major
>
> Currently, {{KafkaConsumerThread}} within {{flink-connector-kafka-09}} has 
> the ability of rateLimiter. However, under {{flink-connector-kafka}} does not 
> own it. I would suggest we can add it as well if we could.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17919) KafkaConsumerThread should add ratelimiter functionality as well

2020-05-25 Thread zhangminglei (Jira)
zhangminglei created FLINK-17919:


 Summary: KafkaConsumerThread should add ratelimiter functionality 
as well
 Key: FLINK-17919
 URL: https://issues.apache.org/jira/browse/FLINK-17919
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: zhangminglei


Currently, {{KafkaConsumerThread}} within {{flink-connector-kafka-09}} has the 
ability of rateLimiter. However, under {{flink-connector-kafka}} does not own 
it. I would suggest we can add it as well if we could.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17878) StreamingFileWriter watermark attribute is transient, this might be different with origin value

2020-05-22 Thread zhangminglei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-17878:
-
Component/s: (was: Table SQL / API)
 Connectors / FileSystem

> StreamingFileWriter watermark attribute is transient, this might be different 
> with origin value
> ---
>
> Key: FLINK-17878
> URL: https://issues.apache.org/jira/browse/FLINK-17878
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.0
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> StreamingFileWriter has a 
> private transient long currentWatermark = Long.MIN_VALUE;
>  
> in case developer wants to create a custom bucket assigner, it will receive a 
> currentWatermark as 0, this might be conflict with the original flink 
> approach to handle a min_long.
>  
> should we remove the transient key word?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager

2019-05-08 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9410:
---

Assignee: zhangminglei  (was: lihongli)

> Replace NMClient with NMClientAsync in YarnResourceManager
> --
>
> Key: FLINK-9410
> URL: https://issues.apache.org/jira/browse/FLINK-9410
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>    Assignee: zhangminglei
>Priority: Critical
>
> Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} 
> which is called from within the main thread of the {{ResourceManager}}. Since 
> these operations are blocking, we should replace the client with the 
> {{NMClientAsync}} and make the calls non blocking.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10114) Support Orc for StreamingFileSink

2018-10-27 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-10114:


Assignee: lihongli  (was: vinoyang)

> Support Orc for StreamingFileSink
> -
>
> Key: FLINK-10114
> URL: https://issues.apache.org/jira/browse/FLINK-10114
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>    Reporter: zhangminglei
>Assignee: lihongli
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager

2018-10-24 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9410:
---

Assignee: lihongli

> Replace NMClient with NMClientAsync in YarnResourceManager
> --
>
> Key: FLINK-9410
> URL: https://issues.apache.org/jira/browse/FLINK-9410
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: lihongli
>Priority: Critical
> Fix For: 1.8.0
>
>
> Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} 
> which is called from within the main thread of the {{ResourceManager}}. Since 
> these operations are blocking, we should replace the client with the 
> {{NMClientAsync}} and make the calls non blocking.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager

2018-10-24 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9410:
---

Assignee: (was: zhangminglei)

> Replace NMClient with NMClientAsync in YarnResourceManager
> --
>
> Key: FLINK-9410
> URL: https://issues.apache.org/jira/browse/FLINK-9410
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.8.0
>
>
> Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} 
> which is called from within the main thread of the {{ResourceManager}}. Since 
> these operations are blocking, we should replace the client with the 
> {{NMClientAsync}} and make the calls non blocking.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-10355) The order of the column should start from 1.

2018-09-17 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-10355:
-
Comment: was deleted

(was: Hi, [~lihongli] Thanks for opening this jira. I would not think so since 
second column means the number of 1 there. It should be start from 0 to added 
up. And then one by one. )

> The order of the column should start from 1.
> 
>
> Key: FLINK-10355
> URL: https://issues.apache.org/jira/browse/FLINK-10355
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: lihongli
>Priority: Major
>  Labels: easyfix
> Attachments: B0C32FD9-47FE-4F63-921F-A9E49C0CB5CD.png
>
>
> When  I register an external Table using a CsvTableSource.It throws an 
> exception :"Parsing error for column 1".But I finally found that the second 
> column is the error column.I think that the order of the column should start 
> from 1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10355) The order of the column should start from 1.

2018-09-17 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-10355:
-
Component/s: (was: Java API)
 Table API & SQL

> The order of the column should start from 1.
> 
>
> Key: FLINK-10355
> URL: https://issues.apache.org/jira/browse/FLINK-10355
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: lihongli
>Priority: Major
>  Labels: easyfix
> Attachments: B0C32FD9-47FE-4F63-921F-A9E49C0CB5CD.png
>
>
> When  I register an external Table using a CsvTableSource.It throws an 
> exception :"Parsing error for column 1".But I finally found that the second 
> column is the error column.I think that the order of the column should start 
> from 1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10355) The order of the column should start from 1.

2018-09-17 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-10355:
-
Labels: easyfix  (was: )

> The order of the column should start from 1.
> 
>
> Key: FLINK-10355
> URL: https://issues.apache.org/jira/browse/FLINK-10355
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: lihongli
>Priority: Major
>  Labels: easyfix
> Attachments: B0C32FD9-47FE-4F63-921F-A9E49C0CB5CD.png
>
>
> When  I register an external Table using a CsvTableSource.It throws an 
> exception :"Parsing error for column 1".But I finally found that the second 
> column is the error column.I think that the order of the column should start 
> from 1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10355) The order of the column should start from 1.

2018-09-17 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617741#comment-16617741
 ] 

zhangminglei commented on FLINK-10355:
--

Hi, [~lihongli] Thanks for opening this jira. I would not think so since second 
column means the number of 1 there. It should be start from 0 to added up. And 
then one by one. 

> The order of the column should start from 1.
> 
>
> Key: FLINK-10355
> URL: https://issues.apache.org/jira/browse/FLINK-10355
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 1.6.0
>Reporter: lihongli
>Priority: Major
> Attachments: B0C32FD9-47FE-4F63-921F-A9E49C0CB5CD.png
>
>
> When  I register an external Table using a CsvTableSource.It throws an 
> exception :"Parsing error for column 1".But I finally found that the second 
> column is the error column.I think that the order of the column should start 
> from 1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10114) Support Orc for StreamingFileSink

2018-09-04 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602721#comment-16602721
 ] 

zhangminglei commented on FLINK-10114:
--

Thank you so much @vinoyang. Lots of users need this functionality indeed.

> Support Orc for StreamingFileSink
> -
>
> Key: FLINK-10114
> URL: https://issues.apache.org/jira/browse/FLINK-10114
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>    Reporter: zhangminglei
>Assignee: vinoyang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10114) Support Orc for StreamingFileSink

2018-08-31 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599433#comment-16599433
 ] 

zhangminglei commented on FLINK-10114:
--

I can't continue to work on this JIRA because I've just joined Alibaba for some 
job reasons. Anyone who wants to do this can finish this pr. Thank you very 
much.

> Support Orc for StreamingFileSink
> -
>
> Key: FLINK-10114
> URL: https://issues.apache.org/jira/browse/FLINK-10114
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>    Reporter: zhangminglei
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10114) Support Orc for StreamingFileSink

2018-08-31 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-10114:


Assignee: (was: zhangminglei)

> Support Orc for StreamingFileSink
> -
>
> Key: FLINK-10114
> URL: https://issues.apache.org/jira/browse/FLINK-10114
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>    Reporter: zhangminglei
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10114) Support Orc for StreamingFileSink

2018-08-09 Thread zhangminglei (JIRA)
zhangminglei created FLINK-10114:


 Summary: Support Orc for StreamingFileSink
 Key: FLINK-10114
 URL: https://issues.apache.org/jira/browse/FLINK-10114
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: zhangminglei
Assignee: zhangminglei






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9411) Support parquet rolling sink writer

2018-08-09 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei resolved FLINK-9411.
-
Resolution: Won't Fix

> Support parquet rolling sink writer
> ---
>
> Key: FLINK-9411
> URL: https://issues.apache.org/jira/browse/FLINK-9411
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>    Reporter: zhangminglei
>Assignee: Triones Deng
>Priority: Major
>
> Like support orc rolling sink writer in FLINK-9407 , we should also support 
> parquet rolling sink writer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7829) Remove (or at least deprecate) DataStream.writeToFile/Csv

2018-08-09 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-7829:

Description: These methods are seductive for users but they should never 
actually use them in a production streaming job. For those cases the 
{{StreamingFileSink}} should be used.  (was: These methods are seductive for 
users but they should never actually use them in a production streaming job. 
For those cases the {{BucketingSink}} should be used.)

> Remove (or at least deprecate) DataStream.writeToFile/Csv
> -
>
> Key: FLINK-7829
> URL: https://issues.apache.org/jira/browse/FLINK-7829
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Priority: Major
> Fix For: 1.7.0
>
>
> These methods are seductive for users but they should never actually use them 
> in a production streaming job. For those cases the {{StreamingFileSink}} 
> should be used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9675) Avoid FileInputStream/FileOutputStream

2018-08-08 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9675:

Affects Version/s: 1.4.1
   1.4.2
   1.5.1

> Avoid FileInputStream/FileOutputStream
> --
>
> Key: FLINK-9675
> URL: https://issues.apache.org/jira/browse/FLINK-9675
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.1, 1.4.2, 1.5.1
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Minor
>  Labels: filesystem, pull-request-available
> Fix For: 1.7.0
>
>
> They rely on finalizers (before Java 11), which create unnecessary GC load.
> The alternatives, Files.newInputStream, are as easy to use and don't have 
> this issue.
> And here is a benchmark 
> https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9675) Avoid FileInputStream/FileOutputStream

2018-08-08 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9675:

Fix Version/s: 1.7.0

> Avoid FileInputStream/FileOutputStream
> --
>
> Key: FLINK-9675
> URL: https://issues.apache.org/jira/browse/FLINK-9675
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.1, 1.4.2, 1.5.1
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Minor
>  Labels: filesystem, pull-request-available
> Fix For: 1.7.0
>
>
> They rely on finalizers (before Java 11), which create unnecessary GC load.
> The alternatives, Files.newInputStream, are as easy to use and don't have 
> this issue.
> And here is a benchmark 
> https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7829) Remove (or at least deprecate) DataStream.writeToFile/Csv

2018-08-08 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574080#comment-16574080
 ] 

zhangminglei commented on FLINK-7829:
-

[~aljoscha] We should replace make the use of {{BucketingSink}} to 
{{StreamingFileSink}} since {{BucketingSink}} can not support exactly once.

> Remove (or at least deprecate) DataStream.writeToFile/Csv
> -
>
> Key: FLINK-7829
> URL: https://issues.apache.org/jira/browse/FLINK-7829
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Priority: Major
> Fix For: 1.7.0
>
>
> These methods are seductive for users but they should never actually use them 
> in a production streaming job. For those cases the {{BucketingSink}} should 
> be used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5789) Make Bucketing Sink independent of Hadoop's FileSystem

2018-08-07 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571483#comment-16571483
 ] 

zhangminglei commented on FLINK-5789:
-

Thank you [~till.rohrmann]

> Make Bucketing Sink independent of Hadoop's FileSystem
> --
>
> Key: FLINK-5789
> URL: https://issues.apache.org/jira/browse/FLINK-5789
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.4, 1.2.0
>Reporter: Stephan Ewen
>Priority: Major
>
> The {{BucketingSink}} is hard wired to Hadoop's FileSystem, bypassing Flink's 
> file system abstraction.
> This causes several issues:
>   - The bucketing sink will behave different than other file sinks with 
> respect to configuration
>   - Directly supported file systems (not through hadoop) like the MapR File 
> System does not work in the same way with the BuketingSink as other file 
> systems
>   - The previous point is all the more problematic in the effort to make 
> Hadoop an optional dependency and with in other stacks (Mesos, Kubernetes, 
> AWS, GCE, Azure) with ideally no Hadoop dependency.
> We should port the {{BucketingSink}} to use Flink's FileSystem classes.
> To support the *truncate* functionality that is needed for the exactly-once 
> semantics of the Bucketing Sink, we should extend Flink's FileSystem 
> abstraction to have the methods
>   - {{boolean supportsTruncate()}}
>   - {{void truncate(Path, long)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5789) Make Bucketing Sink independent of Hadoop's FileSystem

2018-08-06 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571065#comment-16571065
 ] 

zhangminglei commented on FLINK-5789:
-

I would ask, in the future, We would not recommend users to use 
{{BucketingSink}}, By using {{StreamingFileSink}} instead ?

> Make Bucketing Sink independent of Hadoop's FileSystem
> --
>
> Key: FLINK-5789
> URL: https://issues.apache.org/jira/browse/FLINK-5789
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.4, 1.2.0
>Reporter: Stephan Ewen
>Priority: Major
>
> The {{BucketingSink}} is hard wired to Hadoop's FileSystem, bypassing Flink's 
> file system abstraction.
> This causes several issues:
>   - The bucketing sink will behave different than other file sinks with 
> respect to configuration
>   - Directly supported file systems (not through hadoop) like the MapR File 
> System does not work in the same way with the BuketingSink as other file 
> systems
>   - The previous point is all the more problematic in the effort to make 
> Hadoop an optional dependency and with in other stacks (Mesos, Kubernetes, 
> AWS, GCE, Azure) with ideally no Hadoop dependency.
> We should port the {{BucketingSink}} to use Flink's FileSystem classes.
> To support the *truncate* functionality that is needed for the exactly-once 
> semantics of the Bucketing Sink, we should extend Flink's FileSystem 
> abstraction to have the methods
>   - {{boolean supportsTruncate()}}
>   - {{void truncate(Path, long)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9982) NPE in EnumValueSerializer#copy

2018-07-31 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9982:
---

Assignee: dalongliu  (was: zhangminglei)

> NPE in EnumValueSerializer#copy
> ---
>
> Key: FLINK-9982
> URL: https://issues.apache.org/jira/browse/FLINK-9982
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1
>    Reporter: zhangminglei
>Assignee: dalongliu
>Priority: Major
>
> When execute the flink job in flink 1.3.2 version. We met the below error.
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50)
>   at 
> org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92)
>   at 
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:304)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:393)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:233)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9982) NPE in EnumValueSerializer#copy

2018-07-31 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16563648#comment-16563648
 ] 

zhangminglei commented on FLINK-9982:
-

[~lsy] Could you tell [~Zentol] what the enum field in your case class null ? I 
do not have the environment and program now since I left the job.

> NPE in EnumValueSerializer#copy
> ---
>
> Key: FLINK-9982
> URL: https://issues.apache.org/jira/browse/FLINK-9982
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1
>    Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>
> When execute the flink job in flink 1.3.2 version. We met the below error.
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50)
>   at 
> org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92)
>   at 
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:304)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:393)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:233)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9614) Improve the error message for Compiler#compile

2018-07-29 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei resolved FLINK-9614.
-
Resolution: Won't Fix

> Improve the error message for Compiler#compile
> --
>
> Key: FLINK-9614
> URL: https://issues.apache.org/jira/browse/FLINK-9614
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: zhangminglei
>    Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
>
> When the below sql has too long. Like
> case when  case when .
>  when host in 
> ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247')
>  then 'condition'
> Then cause the {{StackOverflowError}}. And the current code is below, we can 
> solve this by setting -Xss 20m, instead of {{This is a bug..}}
> {code:java}
> trait Compiler[T] {
>   @throws(classOf[CompileException])
>   def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
> require(cl != null, "Classloader must not be null.")
> val compiler = new SimpleCompiler()
> compiler.setParentClassLoader(cl)
> try {
>   compiler.cook(code)
> } catch {
>   case t: Throwable =>
> throw new InvalidProgramException("Table program cannot be compiled. 
> " +
>   "This is a bug. Please file an issue.", t)
> }
> compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9982) NPE in EnumValueSerializer#copy

2018-07-28 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9982:
---

Assignee: zhangminglei  (was: dalongliu)

> NPE in EnumValueSerializer#copy
> ---
>
> Key: FLINK-9982
> URL: https://issues.apache.org/jira/browse/FLINK-9982
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1
>    Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>
> When execute the flink job in flink 1.3.2 version. We met the below error.
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50)
>   at 
> org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92)
>   at 
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:304)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:393)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:233)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9985) Incorrect parameter order in document

2018-07-28 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9985:
---

Assignee: zhangminglei  (was: dalongliu)

> Incorrect parameter order in document
> -
>
> Key: FLINK-9985
> URL: https://issues.apache.org/jira/browse/FLINK-9985
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.1
>Reporter: zhangminglei
>    Assignee: zhangminglei
>Priority: Major
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#incremental-window-aggregation-with-foldfunction
> {code:java}
> public Tuple3 fold(Tuple3 acc, 
> SensorReading s) {
>   Integer cur = acc.getField(2);
>   acc.setField(2, cur + 1); // incorrect parameter order , it should be 
> acc.setField(cur + 1, 2)
>   return acc;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9985) Incorrect parameter order in document

2018-07-27 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9985:
---

Assignee: dalongliu

> Incorrect parameter order in document
> -
>
> Key: FLINK-9985
> URL: https://issues.apache.org/jira/browse/FLINK-9985
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.1
>Reporter: zhangminglei
>Assignee: dalongliu
>Priority: Major
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#incremental-window-aggregation-with-foldfunction
> {code:java}
> public Tuple3 fold(Tuple3 acc, 
> SensorReading s) {
>   Integer cur = acc.getField(2);
>   acc.setField(2, cur + 1); // incorrect parameter order , it should be 
> acc.setField(cur + 1, 2)
>   return acc;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9982) NPE in EnumValueSerializer#copy

2018-07-27 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9982:
---

Assignee: dalongliu

> NPE in EnumValueSerializer#copy
> ---
>
> Key: FLINK-9982
> URL: https://issues.apache.org/jira/browse/FLINK-9982
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1
>    Reporter: zhangminglei
>Assignee: dalongliu
>Priority: Major
>
> When execute the flink job in flink 1.3.2 version. We met the below error.
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50)
>   at 
> org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92)
>   at 
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:304)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:393)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:233)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9825) Upgrade checkstyle version to 8.6

2018-07-27 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9825:
---

Assignee: dalongliu

> Upgrade checkstyle version to 8.6
> -
>
> Key: FLINK-9825
> URL: https://issues.apache.org/jira/browse/FLINK-9825
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Assignee: dalongliu
>Priority: Minor
>
> We should upgrade checkstyle version to 8.6+ so that we can use the "match 
> violation message to this regex" feature for suppression. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9825) Upgrade checkstyle version to 8.6

2018-07-27 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560532#comment-16560532
 ] 

zhangminglei commented on FLINK-9825:
-

[~lsy] Now, You can push a PR to this issue. If you face any problem, you can 
talk with anyone that relevant to this topic you think. But before ask, you 
should try to solve by yourself and try at least once. We are all looking 
forward to your contributions!

> Upgrade checkstyle version to 8.6
> -
>
> Key: FLINK-9825
> URL: https://issues.apache.org/jira/browse/FLINK-9825
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Priority: Minor
>
> We should upgrade checkstyle version to 8.6+ so that we can use the "match 
> violation message to this regex" feature for suppression. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9985) Incorrect parameter order in document

2018-07-27 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559782#comment-16559782
 ] 

zhangminglei commented on FLINK-9985:
-

Thanks [~lsy] contribution to flink! You can feel free to open a PR to this 
issue. Then the committer will give you the permission , then next time, you 
can assign the jira that you would like to work on it then.

> Incorrect parameter order in document
> -
>
> Key: FLINK-9985
> URL: https://issues.apache.org/jira/browse/FLINK-9985
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.1
>Reporter: zhangminglei
>Priority: Major
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#incremental-window-aggregation-with-foldfunction
> {code:java}
> public Tuple3 fold(Tuple3 acc, 
> SensorReading s) {
>   Integer cur = acc.getField(2);
>   acc.setField(2, cur + 1); // incorrect parameter order , it should be 
> acc.setField(cur + 1, 2)
>   return acc;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9985) Incorrect parameter order in document

2018-07-27 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9985:

Description: 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#incremental-window-aggregation-with-foldfunction

{code:java}
public Tuple3 fold(Tuple3 acc, 
SensorReading s) {
  Integer cur = acc.getField(2);
  acc.setField(2, cur + 1); // incorrect parameter order , it should be 
acc.setField(cur + 1, 2)
  return acc;
  }
{code}


> Incorrect parameter order in document
> -
>
> Key: FLINK-9985
> URL: https://issues.apache.org/jira/browse/FLINK-9985
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.1
>Reporter: zhangminglei
>Priority: Major
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#incremental-window-aggregation-with-foldfunction
> {code:java}
> public Tuple3 fold(Tuple3 acc, 
> SensorReading s) {
>   Integer cur = acc.getField(2);
>   acc.setField(2, cur + 1); // incorrect parameter order , it should be 
> acc.setField(cur + 1, 2)
>   return acc;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9982) NPE in EnumValueSerializer#copy

2018-07-27 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559771#comment-16559771
 ] 

zhangminglei commented on FLINK-9982:
-

Hi, [~till.rohrmann] Could you add [~lsy] to contribution list ? It is the 
first time for him to contribute to apache flink. Thank you .

> NPE in EnumValueSerializer#copy
> ---
>
> Key: FLINK-9982
> URL: https://issues.apache.org/jira/browse/FLINK-9982
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1
>    Reporter: zhangminglei
>Priority: Major
>
> When execute the flink job in flink 1.3.2 version. We met the below error.
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50)
>   at 
> org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92)
>   at 
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:304)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:393)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:233)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9985) Incorrect parameter order in document

2018-07-27 Thread zhangminglei (JIRA)
zhangminglei created FLINK-9985:
---

 Summary: Incorrect parameter order in document
 Key: FLINK-9985
 URL: https://issues.apache.org/jira/browse/FLINK-9985
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.5.1
Reporter: zhangminglei






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9982) NPE in EnumValueSerializer#copy

2018-07-27 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9982:

Description: 
When execute the flink job in flink 1.3.2 version. We met the below error.

{code:java}
java.lang.NullPointerException
at 
org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50)
at 
org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at 
org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92)
at 
org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:304)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:393)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:233)
at 
org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385)
{code}


> NPE in EnumValueSerializer#copy
> ---
>
> Key: FLINK-9982
> URL: https://issues.apache.org/jira/browse/FLINK-9982
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1
>    Reporter: zhangminglei
>Priority: Major
>
> When execute the flink job in flink 1.3.2 version. We met the below error.
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:50)
>   at 
> org.apache.flink.api.scala.typeutils.EnumValueSerializer.copy(EnumValueSerializer.scala:36)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:92)
>   at 
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.copy(GenericArraySerializer.java:42)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWat

[jira] [Updated] (FLINK-9982) NPE in EnumValueSerializer#copy

2018-07-27 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9982:

Affects Version/s: 1.3.3
   1.4.1
   1.4.2
   1.5.1

> NPE in EnumValueSerializer#copy
> ---
>
> Key: FLINK-9982
> URL: https://issues.apache.org/jira/browse/FLINK-9982
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.3, 1.4.1, 1.4.2, 1.5.1
>    Reporter: zhangminglei
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9982) NPE in EnumValueSerializer#copy

2018-07-27 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559754#comment-16559754
 ] 

zhangminglei commented on FLINK-9982:
-

Hi, @vinoyang Since the bad network, So, I failed to provided more description 
here. I will add more then. Thank you 

> NPE in EnumValueSerializer#copy
> ---
>
> Key: FLINK-9982
> URL: https://issues.apache.org/jira/browse/FLINK-9982
> Project: Flink
>  Issue Type: Bug
>    Reporter: zhangminglei
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9982) NPE in EnumValueSerializer#copy

2018-07-27 Thread zhangminglei (JIRA)
zhangminglei created FLINK-9982:
---

 Summary: NPE in EnumValueSerializer#copy
 Key: FLINK-9982
 URL: https://issues.apache.org/jira/browse/FLINK-9982
 Project: Flink
  Issue Type: Bug
Reporter: zhangminglei






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9825) Upgrade checkstyle version to 8.6

2018-07-26 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559160#comment-16559160
 ] 

zhangminglei commented on FLINK-9825:
-

Hi, [~Zentol] Could you please give [~lsy] a permission that contribute to 
apache flink ? It will be the first time to him for getting start with flink. 
Thank you so much.

> Upgrade checkstyle version to 8.6
> -
>
> Key: FLINK-9825
> URL: https://issues.apache.org/jira/browse/FLINK-9825
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Priority: Minor
>
> We should upgrade checkstyle version to 8.6+ so that we can use the "match 
> violation message to this regex" feature for suppression. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9825) Upgrade checkstyle version to 8.6

2018-07-26 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9825:
---

Assignee: (was: zhangminglei)

> Upgrade checkstyle version to 8.6
> -
>
> Key: FLINK-9825
> URL: https://issues.apache.org/jira/browse/FLINK-9825
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Priority: Minor
>
> We should upgrade checkstyle version to 8.6+ so that we can use the "match 
> violation message to this regex" feature for suppression. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9651) Add a Kafka table source factory with Protobuf format support

2018-07-26 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559128#comment-16559128
 ] 

zhangminglei commented on FLINK-9651:
-

Hi, [~twalthr] Thanks and I am on my way to do this.

> Add a Kafka table source factory with Protobuf format support
> -
>
> Key: FLINK-9651
> URL: https://issues.apache.org/jira/browse/FLINK-9651
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>    Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9609) Add bucket ready mechanism for BucketingSink when checkpoint complete

2018-07-23 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9609:

Fix Version/s: (was: 1.6.1)
   1.7.0

> Add bucket ready mechanism for BucketingSink when checkpoint complete
> -
>
> Key: FLINK-9609
> URL: https://issues.apache.org/jira/browse/FLINK-9609
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector, Streaming Connectors
>Affects Versions: 1.5.0, 1.4.2
>Reporter: zhangminglei
>    Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, BucketingSink only support {{notifyCheckpointComplete}}. However, 
> users want to do some extra work when a bucket is ready. It would be nice if 
> we can support {{BucketReady}} mechanism for users or we can tell users when 
> a bucket is ready for use. For example, One bucket is created for every 5 
> minutes, at the end of 5 minutes before creating the next bucket, the user 
> might need to do something as the previous bucket ready, like sending the 
> timestamp of the bucket ready time to a server or do some other stuff.
> Here, Bucket ready means all the part files suffix name under a bucket 
> neither {{.pending}} nor {{.in-progress}}. Then we can think this bucket is 
> ready for user use. Like a watermark means no elements with a timestamp older 
> or equal to the watermark timestamp should arrive at the window. We can also 
> refer to the concept of watermark here, or we can call this *BucketWatermark* 
> if we could.
> Recently, I found a user who wants this functionality which I would think.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Let-BucketingSink-roll-file-on-each-checkpoint-td19034.html
> Below is what he said:
> My user case is we read data from message queue, write to HDFS, and our ETL 
> team will use the data in HDFS. *In the case, ETL need to know if all data is 
> ready to be read accurately*, so we use a counter to count how many data has 
> been wrote, if the counter is equal to the number we received, we think HDFS 
> file is ready. We send the counter message in a custom sink so ETL can know 
> how many data has been wrote, but if use current BucketingSink, even through 
> HDFS file is flushed, ETL may still cannot read the data. If we can close 
> file during checkpoint, then the result is accurately. And for the HDFS small 
> file problem, it can be controller by use bigger checkpoint interval. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

2018-07-22 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6103
  
Hi, @tillrohrmann I found more than one user found this issue and affect 
their work. I suggest we can merge this PR as a temporary solution until the 
travis turn green.


---


[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-22 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r204233444
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java
 ---
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   private TypeDescription schema;
+
+   private String meatSchema;
+
+   private transient org.apache.orc.Writer writer;
+
+   private VectorizedRowBatch rowBatch;
+
+   private CompressionKind compressionKind;
+
+   private long writedRowSize;
+
+   private OrcBatchWriter orcBatchWriter;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of a orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.meatSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   typeInformations.add(schemaToTypeInfo(typeDescription));
+   });
+
+   return new TableSchema(
+   fieldNames.toArray(new String[fieldNames.size()]),
+   typeInformations.toArray(new 
TypeInformation[typeInformations.size()]));
+   }
+
+   @Override
+   public void write(T element) throws IOException {
+   Boolean isFill = orcBatchWriter.fill(rowBatch, element)

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-22 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r204232617
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptio

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-22 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r204231919
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/OrcFileWriter.java
 ---
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   private TypeDescription schema;
+
+   private String meatSchema;
+
+   private transient org.apache.orc.Writer writer;
+
+   private VectorizedRowBatch rowBatch;
+
+   private CompressionKind compressionKind;
+
+   private long writedRowSize;
+
+   private OrcBatchWriter orcBatchWriter;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of a orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.meatSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptions.forEach(typeDescription -> {
+   typeInformations.add(schemaToTypeInfo(typeDescription));
+   });
+
+   return new TableSchema(
+   fieldNames.toArray(new String[fieldNames.size()]),
+   typeInformations.toArray(new 
TypeInformation[typeInformations.size()]));
+   }
+
+   @Override
+   public void write(T element) throws IOException {
+   Boolean isFill = orcBatchWriter.fill(rowBatch, element)

[jira] [Updated] (FLINK-9609) Add bucket ready mechanism for BucketingSink when checkpoint complete

2018-07-21 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9609:

Fix Version/s: 1.6.1

> Add bucket ready mechanism for BucketingSink when checkpoint complete
> -
>
> Key: FLINK-9609
> URL: https://issues.apache.org/jira/browse/FLINK-9609
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector, Streaming Connectors
>Affects Versions: 1.5.0, 1.4.2
>Reporter: zhangminglei
>    Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.1
>
>
> Currently, BucketingSink only support {{notifyCheckpointComplete}}. However, 
> users want to do some extra work when a bucket is ready. It would be nice if 
> we can support {{BucketReady}} mechanism for users or we can tell users when 
> a bucket is ready for use. For example, One bucket is created for every 5 
> minutes, at the end of 5 minutes before creating the next bucket, the user 
> might need to do something as the previous bucket ready, like sending the 
> timestamp of the bucket ready time to a server or do some other stuff.
> Here, Bucket ready means all the part files suffix name under a bucket 
> neither {{.pending}} nor {{.in-progress}}. Then we can think this bucket is 
> ready for user use. Like a watermark means no elements with a timestamp older 
> or equal to the watermark timestamp should arrive at the window. We can also 
> refer to the concept of watermark here, or we can call this *BucketWatermark* 
> if we could.
> Recently, I found a user who wants this functionality which I would think.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Let-BucketingSink-roll-file-on-each-checkpoint-td19034.html
> Below is what he said:
> My user case is we read data from message queue, write to HDFS, and our ETL 
> team will use the data in HDFS. *In the case, ETL need to know if all data is 
> ready to be read accurately*, so we use a counter to count how many data has 
> been wrote, if the counter is equal to the number we received, we think HDFS 
> file is ready. We send the counter message in a custom sink so ETL can know 
> how many data has been wrote, but if use current BucketingSink, even through 
> HDFS file is flushed, ETL may still cannot read the data. If we can close 
> file during checkpoint, then the result is accurately. And for the HDFS small 
> file problem, it can be controller by use bigger checkpoint interval. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6378: [FLINK-9236] [pom] upgrade the version of apache parent p...

2018-07-21 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6378
  
Could you please push code and trigger the travis again ? 


---


[GitHub] flink pull request #6380: [FLINK-9614] [table] Improve the error message for...

2018-07-21 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/6380

[FLINK-9614] [table] Improve the error message for Compiler#compile

## What is the purpose of the change
Improve the error message for Compiler#compile

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-9614-improve-error

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6380.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6380


commit f4406622c67df9212c545d36b3572e774acf1df7
Author: zhangminglei 
Date:   2018-07-21T08:39:16Z

[FLINK-9614] [table] Improve the error message for Compiler#compile




---


[GitHub] flink issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause...

2018-07-20 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6336
  
Hi, @ubyyj Thanks for your contribution! It looks good to me.


---


[jira] [Updated] (FLINK-9900) Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)

2018-07-20 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9900:

Affects Version/s: 1.5.1

> Failed to testRestoreBehaviourWithFaultyStateHandles 
> (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) 
> ---
>
> Key: FLINK-9900
> URL: https://issues.apache.org/jira/browse/FLINK-9900
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.1
>Reporter: zhangminglei
>Priority: Major
>
> https://api.travis-ci.org/v3/job/405843617/log.txt
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec 
> <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
>  
> testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
>  Time elapsed: 120.036 sec <<< ERROR!
>  org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>  at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)
> Results :
> Tests in error: 
>  
> ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244
>  » TestTimedOut
> Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6378: [FLINK-9236] [pom] upgrade the version of apache parent p...

2018-07-20 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6378
  
@TisonShadowsong is right. You can verify that and i think it might be a 
bug.


---


[GitHub] flink issue #6378: [FLINK-9236] [pom] upgrade the version of apache parent p...

2018-07-20 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6378
  
@YCjia The travis test failed. Could you take a look on what is happening ?


---


[GitHub] flink pull request #6375: [FLINK-9609] [connectors] Add bucket ready mechani...

2018-07-20 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6375#discussion_r203997376
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -722,9 +727,31 @@ public void notifyCheckpointComplete(long 
checkpointId) throws Exception {
}
}
}
+   isBucketReady(partitionPaths);
}
}
 
+   @Override
+   public boolean isBucketReady(Set bucketPathes) {
+   for (Path path : bucketPathes) {
+   try {
+   RemoteIterator files = 
fs.listFiles(path, false);
+   while (files.hasNext()) {
+   LocatedFileStatus fileStatus = 
files.next();
+   if 
(fileStatus.getPath().getName().endsWith(pendingSuffix) ||
+   
fileStatus.getPath().getName().endsWith(inProgressSuffix)) {
+   return false;
+   }
+   }
+   return true;
--- End diff --

Ahhh. Oops! Yes, you are very right! :)


---


[GitHub] flink pull request #6375: [FLINK-9609] [connectors] Add bucket ready mechani...

2018-07-20 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6375#discussion_r203995868
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -722,9 +727,31 @@ public void notifyCheckpointComplete(long 
checkpointId) throws Exception {
}
}
}
+   isBucketReady(partitionPaths);
}
}
 
+   @Override
+   public boolean isBucketReady(Set bucketPathes) {
--- End diff --

If we finish the for loop and haven't returned true yet, then return false. 
I think this is reasonable. If a file end with pending or inProgress, This 
means that the file is not ready. and we can not use it.




---


[GitHub] flink issue #6375: [FLINK-9609] [connectors] Add bucket ready mechanism for ...

2018-07-20 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6375
  
Hi, @tillrohrmann Could you please take a look on this PR ? Actually, I 
don't know which committer is more familiar with ```BucketingSink```. Thank you 
so much.


---


[jira] [Updated] (FLINK-9901) Refactor InputStreamReader to Channels.newReader

2018-07-19 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9901:

Description: From this benchmark report 
https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html.
 We can get a better performance boost by using {{Channels.newReader}}.  (was: 
From this benchmark report 
https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html.
 We can get a better performance booth by using {{Channels.newReader}}.)

> Refactor InputStreamReader to Channels.newReader
> 
>
> Key: FLINK-9901
> URL: https://issues.apache.org/jira/browse/FLINK-9901
> Project: Flink
>  Issue Type: Sub-task
>    Reporter: zhangminglei
>Priority: Major
>
> From this benchmark report 
> https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html.
>  We can get a better performance boost by using {{Channels.newReader}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1

2018-07-19 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6365
  
Thanks @yanghua for pointing this out!


---


[jira] [Updated] (FLINK-9901) Refactor InputStreamReader to Channels.newReader

2018-07-19 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9901:

Description: From this benchmark report 
https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html.
 We can get a better performance booth by using {{Channels.newReader}}.

> Refactor InputStreamReader to Channels.newReader
> 
>
> Key: FLINK-9901
> URL: https://issues.apache.org/jira/browse/FLINK-9901
> Project: Flink
>  Issue Type: Sub-task
>    Reporter: zhangminglei
>Priority: Major
>
> From this benchmark report 
> https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html.
>  We can get a better performance booth by using {{Channels.newReader}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9901) Refactor InputStreamReader to Channels.newReader

2018-07-19 Thread zhangminglei (JIRA)
zhangminglei created FLINK-9901:
---

 Summary: Refactor InputStreamReader to Channels.newReader
 Key: FLINK-9901
 URL: https://issues.apache.org/jira/browse/FLINK-9901
 Project: Flink
  Issue Type: Sub-task
Reporter: zhangminglei






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9896) Fix flink documentation error

2018-07-19 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9896:
---

Assignee: zhangminglei

> Fix flink documentation error
> -
>
> Key: FLINK-9896
> URL: https://issues.apache.org/jira/browse/FLINK-9896
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Hequn Cheng
>Assignee: zhangminglei
>Priority: Critical
> Attachments: image-2018-07-19-23-19-32-259.png
>
>
> Flink version of master has been upgraded to 1.7 snapshot, but documentation 
> still point to 1.6
>  !image-2018-07-19-23-19-32-259.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9900) Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)

2018-07-19 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9900:

Description: 
https://api.travis-ci.org/v3/job/405843617/log.txt

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec <<< 
FAILURE! - in 
org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
 
testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
 Time elapsed: 120.036 sec <<< ERROR!
 org.junit.runners.model.TestTimedOutException: test timed out after 12 
milliseconds
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
 at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
 at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
 at 
org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)

Results :

Tests in error: 
 ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244 
» TestTimedOut

Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29

  was:
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec <<< 
FAILURE! - in 
org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
  Time elapsed: 120.036 sec  <<< ERROR!
org.junit.runners.model.TestTimedOutException: test timed out after 12 
milliseconds
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)


Results :

Tests in error: 
  
ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244 
» TestTimedOut

Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29



> Failed to testRestoreBehaviourWithFaultyStateHandles 
> (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) 
> ---
>
> Key: FLINK-9900
> URL: https://issues.apache.org/jira/browse/FLINK-9900
> Project: Flink
>  Issue Type: Bug
>Reporter: zhangminglei
>Priority: Major
>
> https://api.travis-ci.org/v3/job/405843617/log.txt
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec 
> <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
>  
> testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
>  Time elapsed: 120.036 sec <<< ERROR!
>  org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>  at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)
> Results :
> Tests in error: 
>  
> ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244
>  » TestTimedOut
> Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9900) Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)

2018-07-19 Thread zhangminglei (JIRA)
zhangminglei created FLINK-9900:
---

 Summary: Failed to testRestoreBehaviourWithFaultyStateHandles 
(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) 
 Key: FLINK-9900
 URL: https://issues.apache.org/jira/browse/FLINK-9900
 Project: Flink
  Issue Type: Bug
Reporter: zhangminglei


Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec <<< 
FAILURE! - in 
org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
  Time elapsed: 120.036 sec  <<< ERROR!
org.junit.runners.model.TestTimedOutException: test timed out after 12 
milliseconds
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)


Results :

Tests in error: 
  
ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244 
» TestTimedOut

Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6375: [FLINK-9609] [connectors] Add bucket ready mechani...

2018-07-19 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/6375

[FLINK-9609] [connectors] Add bucket ready mechanism for BucketingSin…

## What is the purpose of the change
Currently, BucketingSink only support ```notifyCheckpointComplete```. 
However, users want to do some extra work when a bucket is ready. It would be 
nice if we can support BucketReady mechanism for users or we can tell users 
when a bucket is ready for use. For example, One bucket is created for every 5 
minutes, at the end of 5 minutes before creating the next bucket, the user 
might need to do something as the previous bucket ready, like sending the 
timestamp of the bucket ready time to a server or do some other stuff.

Here, Bucket ready means all the part files suffix name under a bucket 
neither .pending nor .in-progress. Then we can think this bucket is ready for 
user use. Like a watermark means no elements with a timestamp older or equal to 
the watermark timestamp should arrive at the window. We can also refer to the 
concept of watermark here, or we can call this BucketWatermark if we could.

## Brief change log
Add an interface ```BucketReady``` .


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-9609-bucketready

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6375.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6375


commit f95894956ac15d09b51b3a232d6f83227582e641
Author: zhangminglei 
Date:   2018-07-19T14:38:45Z

[FLINK-9609] [connectors] Add bucket ready mechanism for BucketingSink when 
checkpoint complete




---


[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1

2018-07-19 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6365
  
Hi, @yanghua I do not use a inner maven repo. It seems does not work for 
me.Exclude SNAPSHOT dependency and add a stable glassfish but still useless.


---


[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1

2018-07-19 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6365
  
@yanghua Could you please take on look this issue also ? 


---


[jira] [Assigned] (FLINK-9236) Use Apache Parent POM 19

2018-07-19 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9236:
---

Assignee: jiayichao  (was: Stephen Jason)

> Use Apache Parent POM 19
> 
>
> Key: FLINK-9236
> URL: https://issues.apache.org/jira/browse/FLINK-9236
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Assignee: jiayichao
>Priority: Major
>
> Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out.
> This will also fix Javadoc generation with JDK 10+



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1

2018-07-19 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6365
  
Hi, @tedyu I found an issue when upgrade the hbase version to either 2.0.1 
or 2.1.0. Both can lead problem. ```Failed to read artifact descriptor for 
org.glassfish:javax.el:jar 3.0.1-b06-SNAPSHOT```. And then cause some problem. 
Maybe because of my network? 


---


[jira] [Commented] (FLINK-9236) Use Apache Parent POM 19

2018-07-19 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548968#comment-16548968
 ] 

zhangminglei commented on FLINK-9236:
-

Hi, [~jiayichao] we are all looking forward to you first contribution to apache 
flink! And [~Stephen Jason].

> Use Apache Parent POM 19
> 
>
> Key: FLINK-9236
> URL: https://issues.apache.org/jira/browse/FLINK-9236
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Stephen Jason
>Priority: Major
>
> Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out.
> This will also fix Javadoc generation with JDK 10+



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2018-07-18 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548759#comment-16548759
 ] 

zhangminglei commented on FLINK-4534:
-

Hi, [~gjy] Can we use a lightweight synchronization mechanism to solve this ? 
For example, use {{volatile}} to void this issue.

> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Major
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry> entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream

2018-07-18 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6335
  
Hi, @hequn8128 I will create another jira for ```new InputStreamReader``` 
refactor issue. What do you think ?


---


[jira] [Updated] (FLINK-9675) Avoid FileInputStream/FileOutputStream

2018-07-18 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9675:

Description: 
They rely on finalizers (before Java 11), which create unnecessary GC load.

The alternatives, Files.newInputStream, are as easy to use and don't have this 
issue.

And here is a benchmark 
https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html

  was:
They rely on finalizers (before Java 11), which create unnecessary GC load.

The alternatives, Files.newInputStream, are as easy to use and don't have this 
issue.


> Avoid FileInputStream/FileOutputStream
> --
>
> Key: FLINK-9675
> URL: https://issues.apache.org/jira/browse/FLINK-9675
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>    Assignee: zhangminglei
>Priority: Minor
>  Labels: filesystem, pull-request-available
>
> They rely on finalizers (before Java 11), which create unnecessary GC load.
> The alternatives, Files.newInputStream, are as easy to use and don't have 
> this issue.
> And here is a benchmark 
> https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream

2018-07-18 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6335
  
Thank you very much! @hequn8128 ! I will take a look.


---


[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1

2018-07-18 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6365
  
Thanks @yanghua you are right! I just want waiting the travis ending. then 
give the old and new version's dependency tree.


---


[GitHub] flink pull request #6365: [FLINK-9849] [hbase] Hbase upgrade

2018-07-18 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/6365

[FLINK-9849] [hbase] Hbase upgrade

## What is the purpose of the change

Upgrade hbase version to 2.0.1 for hbase connector



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-9849-hbase-upgrade

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6365.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6365


commit cb4bc4b641565e6caf823e85d541df3022a59237
Author: zhangminglei 
Date:   2018-07-18T13:44:37Z

[FLINK-9849] [hbase] Hbase upgrade




---


[jira] [Commented] (FLINK-9675) Avoid FileInputStream/FileOutputStream

2018-07-18 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16547449#comment-16547449
 ] 

zhangminglei commented on FLINK-9675:
-

I will create the subtask for the rest of test code path.

> Avoid FileInputStream/FileOutputStream
> --
>
> Key: FLINK-9675
> URL: https://issues.apache.org/jira/browse/FLINK-9675
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>    Assignee: zhangminglei
>Priority: Minor
>  Labels: filesystem, pull-request-available
>
> They rely on finalizers (before Java 11), which create unnecessary GC load.
> The alternatives, Files.newInputStream, are as easy to use and don't have 
> this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9849) Upgrade hbase version to 2.0.1 for hbase connector

2018-07-17 Thread zhangminglei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16547342#comment-16547342
 ] 

zhangminglei commented on FLINK-9849:
-

Hi, [~yuzhih...@gmail.com] Is the problem raised by this FLINK-2153 solved ? 
hbase-annotations module issue. Thank you.

> Upgrade hbase version to 2.0.1 for hbase connector
> --
>
> Key: FLINK-9849
> URL: https://issues.apache.org/jira/browse/FLINK-9849
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>    Assignee: zhangminglei
>Priority: Major
>
> Currently hbase 1.4.3 is used for hbase connector.
> We should upgrade to 2.0.1 which was recently released.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream

2018-07-17 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6335
  
Hi, @tedyu I have updated the code. Please check again. Thank you very much.


---


[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream

2018-07-15 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6335
  
@tedyu Could you please take a look on this ? Thank you very much.


---


[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream

2018-07-15 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6335
  
Thanks @yanghua I will push the rest of the code then and fix the travis 
error.


---


[GitHub] flink pull request #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutput...

2018-07-14 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/6335

[FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream

## What is the purpose of the change

Avoid using FileInputStream/FileOutputStream because they rely on 
finalizers (before Java 11), which create unnecessary GC load. The 
alternatives, Files.newInputStream, are as easy to use and don't have this 
issue.

## Brief change log
On the code path, the ```FileInputStream``` used by the test classes has 
not been modified to maintain the original appearance.

This is the first commit of this PR.

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (don't know)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-9675-fisfos

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6335.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6335


commit fe7c90bc3165722ef4e1289ac809fa92c8e3a08d
Author: zhangminglei 
Date:   2018-07-15T02:21:45Z

[FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream




---


[jira] [Assigned] (FLINK-9849) Upgrade hbase version to 2.0.1 for hbase connector

2018-07-13 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9849:
---

Assignee: zhangminglei

> Upgrade hbase version to 2.0.1 for hbase connector
> --
>
> Key: FLINK-9849
> URL: https://issues.apache.org/jira/browse/FLINK-9849
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>    Assignee: zhangminglei
>Priority: Major
>
> Currently hbase 1.4.3 is used for hbase connector.
> We should upgrade to 2.0.1 which was recently released.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5375: [FLINK-7095] [TaskManager] Add Command line parsin...

2018-07-13 Thread zhangminglei
Github user zhangminglei closed the pull request at:

https://github.com/apache/flink/pull/5375


---


[GitHub] flink issue #5375: [FLINK-7095] [TaskManager] Add Command line parsing tool ...

2018-07-13 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5375
  
Hi, @tillrohrmann . You are welcome ~ I still have a lot of other flink 
jira will be addressed by me in the future.


---


[jira] [Assigned] (FLINK-9825) Upgrade checkstyle version to 8.6

2018-07-11 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9825:
---

Assignee: zhangminglei

> Upgrade checkstyle version to 8.6
> -
>
> Key: FLINK-9825
> URL: https://issues.apache.org/jira/browse/FLINK-9825
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>    Assignee: zhangminglei
>Priority: Minor
>
> We should upgrade checkstyle version to 8.6+ so that we can use the "match 
> violation message to this regex" feature for suppression. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-09 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200892031
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptio

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-09 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200890648
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptio

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-09 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200889912
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptio

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-09 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200889601
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcFileWriter.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.orc.OrcBatchReader.schemaToTypeInfo;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link OrcFile}.
+ *
+ * @param  The type of the elements that are being written by the sink.
+ */
+public class OrcFileWriter extends StreamWriterBase {
+
+   private static final long serialVersionUID = 3L;
+
+   /**
+* The description of the types in an ORC file.
+*/
+   private TypeDescription schema;
+
+   /**
+* The schema of an ORC file.
+*/
+   private String metaSchema;
+
+   /**
+* A row batch that will be written to the ORC file.
+*/
+   private VectorizedRowBatch rowBatch;
+
+   /**
+* The writer that fill the records into the batch.
+*/
+   private OrcBatchWriter orcBatchWriter;
+
+   private transient org.apache.orc.Writer writer;
+
+   private CompressionKind compressionKind;
+
+   /**
+* The number of rows that currently being written.
+*/
+   private long writedRowSize;
+
+   /**
+* Creates a new {@code OrcFileWriter} that writes orc files without 
compression.
+*
+* @param metaSchema The orc schema.
+*/
+   public OrcFileWriter(String metaSchema) {
+   this(metaSchema, CompressionKind.NONE);
+   }
+
+   /**
+* Create a new {@code OrcFileWriter} that writes orc file with the 
gaven
+* schema and compression kind.
+*
+* @param metaSchema  The schema of an orc file.
+* @param compressionKind The compression kind to use.
+*/
+   public OrcFileWriter(String metaSchema, CompressionKind 
compressionKind) {
+   this.metaSchema = metaSchema;
+   this.schema = TypeDescription.fromString(metaSchema);
+   this.compressionKind = compressionKind;
+   }
+
+   @Override
+   public void open(FileSystem fs, Path path) throws IOException {
+   writer = OrcFile.createWriter(path, 
OrcFile.writerOptions(fs.getConf()).setSchema(schema).compress(compressionKind));
+   rowBatch = schema.createRowBatch();
+   orcBatchWriter = new 
OrcBatchWriter(Arrays.asList(orcSchemaToTableSchema(schema).getTypes()));
+   }
+
+   private TableSchema orcSchemaToTableSchema(TypeDescription orcSchema) {
+   List fieldNames = orcSchema.getFieldNames();
+   List typeDescriptions = 
orcSchema.getChildren();
+   List typeInformations = new ArrayList<>();
+
+   typeDescriptio

[GitHub] flink pull request #6075: [FLINK-9407] [hdfs connector] Support orc rolling ...

2018-07-09 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6075#discussion_r200888987
  
--- Diff: flink-connectors/flink-orc/pom.xml ---
@@ -54,6 +54,14 @@ under the License.
true

 
+   
+   org.apache.flink
+   
flink-connector-filesystem_${scala.binary.version}
+   ${project.version}
+   
+   true
+   
+

org.apache.orc
orc-core
--- End diff --

Yes. We can upgrade it. Will update.


---


  1   2   3   4   5   6   >