[jira] [Assigned] (FLINK-19135) (Stream)ExecutionEnvironment.execute() should not throw ExecutionException

2020-09-03 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-19135:


Assignee: Aljoscha Krettek

> (Stream)ExecutionEnvironment.execute() should not throw ExecutionException
> --
>
> Key: FLINK-19135
> URL: https://issues.apache.org/jira/browse/FLINK-19135
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet, API / DataStream
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>
> In FLINK-14850 we changed the {{execute()}} method to be basically
> {code}
> final JobClient jobClient = executeAsync(...);
> return jobClient.getJobExecutionResult(userClassloader).get();
> {code}
> Unfortunately, this means that {{execute()}} now throws an 
> {{ExecutionException}} instead of a {{ProgramInvocationException}} or 
> {{JobExecutionException}} as before. The {{ExecutionException}} is wrapping 
> the other exceptions that we were throwing before.
> We didn't notice this in tests because most tests use 
> {{Test(Stream)Environment}} which overrides the {{execute()}} method and so 
> doesn't go through the {{PipelineExecutor}} logic or the normal code path of 
> delegating to {{executeAsync()}}.
> We should fix this to go back to the old behaviour.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-19123) TestStreamEnvironment does not use shared MiniCluster for executeAsync()

2020-09-02 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-19123:


Assignee: Aljoscha Krettek

> TestStreamEnvironment does not use shared MiniCluster for executeAsync()
> 
>
> Key: FLINK-19123
> URL: https://issues.apache.org/jira/browse/FLINK-19123
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Coordination, Tests
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.12.0
>
>
> TestStreamEnvironment does override {{execute()}} but not {{executeAsync()}} 
> . Now, {{execute()}} goes against the {{MiniCluster}} session that was 
> started by a {{MiniClusterWithClientResource}} or some other method that uses 
> {{TestStreamEnvironment}}. However, {{executeAsync()}} will go through the 
> normal {{StreamExecutionEnvironment}} logic and tries to find an executor, 
> does not know that it is a testing environment.
> Up until recently, you would have gotten an exception that tells you that no 
> executor is configured, then we would have found out that we need to override 
> {{executeAsync()}} in {{TestStreamEnvironment}}. However, we currently 
> configure a local executor in the constructor: 
> [https://github.com/apache/flink/blob/2160c3294ef87143ab9a4e8138cb618651499792/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java#L59].
>  With this, you basically get the “local environment” behaviour when you call 
> {{executeAsync()}}, which starts a cluster for the job and shuts it down when 
> the job finishes. This basically makes the {{TestStreamEnvironment}} cluster 
> sharing useless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19124) Some JobClient methods are not ergonomic, require ClassLoader argument

2020-09-02 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17189153#comment-17189153
 ] 

Aljoscha Krettek commented on FLINK-19124:
--

I'd be very happy to work in this with a contributor if someone is interested.

> Some JobClient methods are not ergonomic, require ClassLoader argument
> --
>
> Key: FLINK-19124
> URL: https://issues.apache.org/jira/browse/FLINK-19124
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Aljoscha Krettek
>Priority: Major
>  Labels: starter
>
> Both {{getAccumulators()}} and {{getJobExecutionResult()}} result require the 
> user to give a {{ClassLoader}}. In a lot of cases the {{JobClient}} is 
> created in a context where we already know the user-code {{ClassLoader}}. We 
> should see whether we can remove the class loader from those methods. This 
> might require that users give a class loader when constructing a 
> {{JobClient}} themselves for an already running job but the trade-off seems 
> to be worth it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19123) TestStreamEnvironment does not use shared MiniCluster for executeAsync()

2020-09-02 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-19123:
-
Fix Version/s: 1.12.0

> TestStreamEnvironment does not use shared MiniCluster for executeAsync()
> 
>
> Key: FLINK-19123
> URL: https://issues.apache.org/jira/browse/FLINK-19123
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Coordination, Tests
>Reporter: Aljoscha Krettek
>Priority: Major
> Fix For: 1.12.0
>
>
> TestStreamEnvironment does override {{execute()}} but not {{executeAsync()}} 
> . Now, {{execute()}} goes against the {{MiniCluster}} session that was 
> started by a {{MiniClusterWithClientResource}} or some other method that uses 
> {{TestStreamEnvironment}}. However, {{executeAsync()}} will go through the 
> normal {{StreamExecutionEnvironment}} logic and tries to find an executor, 
> does not know that it is a testing environment.
> Up until recently, you would have gotten an exception that tells you that no 
> executor is configured, then we would have found out that we need to override 
> {{executeAsync()}} in {{TestStreamEnvironment}}. However, we currently 
> configure a local executor in the constructor: 
> [https://github.com/apache/flink/blob/2160c3294ef87143ab9a4e8138cb618651499792/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java#L59].
>  With this, you basically get the “local environment” behaviour when you call 
> {{executeAsync()}}, which starts a cluster for the job and shuts it down when 
> the job finishes. This basically makes the {{TestStreamEnvironment}} cluster 
> sharing useless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19124) Some JobClient methods are not ergonomic, require ClassLoader argument

2020-09-02 Thread Aljoscha Krettek (Jira)
Aljoscha Krettek created FLINK-19124:


 Summary: Some JobClient methods are not ergonomic, require 
ClassLoader argument
 Key: FLINK-19124
 URL: https://issues.apache.org/jira/browse/FLINK-19124
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Aljoscha Krettek


Both {{getAccumulators()}} and {{getJobExecutionResult()}} result require the 
user to give a {{ClassLoader}}. In a lot of cases the {{JobClient}} is created 
in a context where we already know the user-code {{ClassLoader}}. We should see 
whether we can remove the class loader from those methods. This might require 
that users give a class loader when constructing a {{JobClient}} themselves for 
an already running job but the trade-off seems to be worth it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19123) TestStreamEnvironment does not use shared MiniCluster for executeAsync()

2020-09-02 Thread Aljoscha Krettek (Jira)
Aljoscha Krettek created FLINK-19123:


 Summary: TestStreamEnvironment does not use shared MiniCluster for 
executeAsync()
 Key: FLINK-19123
 URL: https://issues.apache.org/jira/browse/FLINK-19123
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream, Runtime / Coordination, Tests
Reporter: Aljoscha Krettek


TestStreamEnvironment does override {{execute()}} but not {{executeAsync()}} . 
Now, {{execute()}} goes against the {{MiniCluster}} session that was started by 
a {{MiniClusterWithClientResource}} or some other method that uses 
{{TestStreamEnvironment}}. However, {{executeAsync()}} will go through the 
normal {{StreamExecutionEnvironment}} logic and tries to find an executor, does 
not know that it is a testing environment.

Up until recently, you would have gotten an exception that tells you that no 
executor is configured, then we would have found out that we need to override 
{{executeAsync()}} in {{TestStreamEnvironment}}. However, we currently 
configure a local executor in the constructor: 
[https://github.com/apache/flink/blob/2160c3294ef87143ab9a4e8138cb618651499792/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java#L59].
 With this, you basically get the “local environment” behaviour when you call 
{{executeAsync()}}, which starts a cluster for the job and shuts it down when 
the job finishes. This basically makes the {{TestStreamEnvironment}} cluster 
sharing useless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19065) java.lang.IllegalStateException: Auto generated UIDs have been disabled on join

2020-09-01 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188427#comment-17188427
 ] 

Aljoscha Krettek commented on FLINK-19065:
--

Thanks! I found the culprit: 
https://github.com/apache/flink/blob/11b2fcab5841e38d712e8e919c11e6ecd4f69a5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L332.

The problem is that we're simulating a two-input operation by tagging elements 
from the two sides and then treating them differently in the real (one-input) 
operation. Even though I wrote this code (5 years ago) I don't like that we're 
using user-facing methods internally like this.

A simple fix would be to set some UIDs on the operations that we create 
internally but we have to make sure that they are actually unique.

> java.lang.IllegalStateException: Auto generated UIDs have been disabled on 
> join
> ---
>
> Key: FLINK-19065
> URL: https://issues.apache.org/jira/browse/FLINK-19065
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Maris
>Priority: Major
>
> Join operation with AutoGeneratedUID disabled leads to 
> {code:java}
> java.lang.IllegalStateException: Auto generated UIDs have been disabled but 
> no UID or hash has been assigned to operator Map
> {code}
> code to reproduce
> {code:java}
> class JoinSpec extends AnyFlatSpec with Matchers with Serializable {
>   it should "be able to join streams" in {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.disableAutoGeneratedUIDs()
> val a = env.fromCollection(List("1", "2", 
> "3")).name("a").uid("source-uid")
> val b = env.fromCollection(List("1", "2", 
> "3")).name("b").uid("source-uid2")
> val c = a
>   .join(b)
>   .where(identity)
>   .equalTo(identity)
>   .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))((a, b) => 
> a+b)
>   .uid("joined").name("joined")
> c.addSink(s => println(s))
>   .name("ab")
>   .uid("ab")
> println(env.getExecutionPlan)
> env.execute
> succeed
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-19099) consume kafka message repeat

2020-09-01 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-19099.

Resolution: Not A Problem

I'm closing this for now since I don't think it's a problem with Kafka or 
Flink. Please re-open if you disagree.

> consume kafka message repeat
> 
>
> Key: FLINK-19099
> URL: https://issues.apache.org/jira/browse/FLINK-19099
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.11.0
>Reporter: zouwenlong
>Priority: Major
> Attachments: image-2020-08-31-19-23-59-038.png, 
> image-2020-08-31-19-25-09-548.png
>
>
> when taksmanager be killed ,my job consume some message , but the offset in 
> not commit,
> then restart it ,my job consume kafka message repeat,  I used checkpoint  and 
> set 5 seconds ,
> I think this is a very common problem,how to solve this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19099) consume kafka message repeat

2020-09-01 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188410#comment-17188410
 ] 

Aljoscha Krettek commented on FLINK-19099:
--

Did you completely cancel your job and start it fresh? Without starting it from 
a savepoint or retained checkpoint?

If you want the Flink job to keep the read offset you need to take a savepoint 
before stopping the job or stop it with a savepoint and then restart the job 
later from that savepoint.

> consume kafka message repeat
> 
>
> Key: FLINK-19099
> URL: https://issues.apache.org/jira/browse/FLINK-19099
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.11.0
>Reporter: zouwenlong
>Priority: Major
> Attachments: image-2020-08-31-19-23-59-038.png, 
> image-2020-08-31-19-25-09-548.png
>
>
> when taksmanager be killed ,my job consume some message , but the offset in 
> not commit,
> then restart it ,my job consume kafka message repeat,  I used checkpoint  and 
> set 5 seconds ,
> I think this is a very common problem,how to solve this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19083) Remove deprecated DataStream#split

2020-09-01 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188362#comment-17188362
 ] 

Aljoscha Krettek commented on FLINK-19083:
--

Suggested release note: "The DataStream#split() operation has been removed 
after being marked as deprecated for a couple of versions. Please use Side 
Outputs instead: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html;

> Remove deprecated DataStream#split
> --
>
> Key: FLINK-19083
> URL: https://issues.apache.org/jira/browse/FLINK-19083
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Dawid Wysakowicz
>Priority: Major
> Fix For: 2.0.0
>
>
> {{DataStream#split()}} has been deprecated in favour of using Side Outputs 
> because: 
>  - It is less performant, {{split()}} creates and checks against {{Strings}} 
> for the splitting logic.
>  - {{split()}} was and is buggy : see FLINK-5031 and FLINK-11084, for example
>  - The semantics of consecutive splits are not very clear in general.
>  - Side outputs are more general and everything that could be done using 
> {{split()}} can be achieved with side outputs with strictly better 
> performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19083) Remove deprecated DataStream#split

2020-09-01 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-19083:
-
Description: 
{{DataStream#split()}} has been deprecated in favour of using Side Outputs 
because: 
 - It is less performant, {{split()}} creates and checks against {{Strings}} 
for the splitting logic.
 - {{split()}} was and is buggy : see FLINK-5031 and FLINK-11084, for example
 - The semantics of consecutive splits are not very clear in general.
 - Side outputs are more general and everything that could be done using 
{{split()}} can be achieved with side outputs with strictly better performance.


  was:
{{DataStream#split()}} has been deprecated in favour of using Side Outputs 
because: 
 - It is less performant, {{split()}} creates and checks against {{Strings}} 
for the splitting logic
 - Has bugs: see FLINK-5031
 - The semantics of consecutive splits are not very clear in general
 - Side outputs are more general and everything that could be done using 
{{split()}} can be achieved with side outputs with strictly better performance.



> Remove deprecated DataStream#split
> --
>
> Key: FLINK-19083
> URL: https://issues.apache.org/jira/browse/FLINK-19083
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Dawid Wysakowicz
>Priority: Major
> Fix For: 2.0.0
>
>
> {{DataStream#split()}} has been deprecated in favour of using Side Outputs 
> because: 
>  - It is less performant, {{split()}} creates and checks against {{Strings}} 
> for the splitting logic.
>  - {{split()}} was and is buggy : see FLINK-5031 and FLINK-11084, for example
>  - The semantics of consecutive splits are not very clear in general.
>  - Side outputs are more general and everything that could be done using 
> {{split()}} can be achieved with side outputs with strictly better 
> performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19083) Remove deprecated DataStream#split

2020-09-01 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-19083:
-
Description: 
{{DataStream#split()}} has been deprecated in favour of using Side Outputs 
because: 
 - It is less performant, {{split()}} creates and checks against {{Strings}} 
for the splitting logic
 - Has bugs: see FLINK-5031
 - The semantics of consecutive splits are not very clear in general
 - Side outputs are more general and everything that could be done using 
{{split()}} can be achieved with side outputs with strictly better performance.


> Remove deprecated DataStream#split
> --
>
> Key: FLINK-19083
> URL: https://issues.apache.org/jira/browse/FLINK-19083
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Dawid Wysakowicz
>Priority: Major
> Fix For: 2.0.0
>
>
> {{DataStream#split()}} has been deprecated in favour of using Side Outputs 
> because: 
>  - It is less performant, {{split()}} creates and checks against {{Strings}} 
> for the splitting logic
>  - Has bugs: see FLINK-5031
>  - The semantics of consecutive splits are not very clear in general
>  - Side outputs are more general and everything that could be done using 
> {{split()}} can be achieved with side outputs with strictly better 
> performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18959) Fail to archiveExecutionGraph because job is not finished when dispatcher close

2020-09-01 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188277#comment-17188277
 ] 

Aljoscha Krettek commented on FLINK-18959:
--

Agreed on everything. But that's a new Jira issue, right? For this one, we 
should just change the CANCEL path to go through all the normal steps.

> Fail to archiveExecutionGraph because job is not finished when dispatcher 
> close
> ---
>
> Key: FLINK-18959
> URL: https://issues.apache.org/jira/browse/FLINK-18959
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.12.0, 1.11.1
>Reporter: Liu
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
> Attachments: flink-debug-log
>
>
> When job is cancelled, we expect to see it in flink's history server. But I 
> can not see my job after it is cancelled.
> After digging into the problem, I find that the function 
> archiveExecutionGraph is not executed. Below is the brief log:
> {panel:title=log}
> 2020-08-14 15:10:06,406 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher- 15] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state RUNNING to CANCELLING.
> 2020-08-14 15:10:06,415 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Shutting down per-job cluster 
> because the job was canceled.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping dispatcher 
> akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping all currently running jobs 
> of dispatcher akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,631 INFO org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Stopping the JobMaster for job 
> EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,632 DEBUG org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Disconnect TaskExecutor 
> container_e144_1590060720089_2161_01_06 because: Stopping JobMaster for 
> job EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,646 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher-29] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state CANCELLING to CANCELED.
> 2020-08-14 15:10:06,664 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-4] - There is a newer JobManagerRunner 
> for the job 6f784d4cc5bae88a332d254b21660372.
> {panel}
> From the log, we can see that job is not finished when dispatcher closes. The 
> process is as following:
>  * Receive cancel command and send it to all tasks async.
>  * In MiniDispatcher, begin to shutting down per-job cluster.
>  * Stopping dispatcher and remove job.
>  * Job is cancelled and callback is executed in method startJobManagerRunner.
>  * Because job is removed before, so currentJobManagerRunner is null which 
> not equals to the original jobManagerRunner. In this case, 
> archivedExecutionGraph will not be uploaded.
> In normal cases, I find that job is cancelled first and then dispatcher is 
> stopped so that archivedExecutionGraph will succeed. But I think that the 
> order is not constrained and it is hard to know which comes first. 
> Above is what I suspected. If so, then we should fix it.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18959) Fail to archiveExecutionGraph because job is not finished when dispatcher close

2020-08-28 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17186453#comment-17186453
 ] 

Aljoscha Krettek commented on FLINK-18959:
--

I think for this issue we would just change the behaviour for cancellation. In 
the long run, I think we should not block on shutdown anymore because this can 
lead to stuck clusters if the client (or connection) is down.

The job/cluster lifecycle should be independent of the client once it is 
submitted and we should rely on the (long-running) cluster management system to 
allow retrieval of final job results: When running Flink in standalone/session 
mode, the Flink cluster itself is the long-running system that can return job 
results and that can be queried for past jobs. When using YARN, this should be 
the long-running system that allows querying jobs and job results, same for 
Kubernetes. We could also think about delegating to the history server for 
these purposes. WDYT?


> Fail to archiveExecutionGraph because job is not finished when dispatcher 
> close
> ---
>
> Key: FLINK-18959
> URL: https://issues.apache.org/jira/browse/FLINK-18959
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.12.0, 1.11.1
>Reporter: Liu
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
> Attachments: flink-debug-log
>
>
> When job is cancelled, we expect to see it in flink's history server. But I 
> can not see my job after it is cancelled.
> After digging into the problem, I find that the function 
> archiveExecutionGraph is not executed. Below is the brief log:
> {panel:title=log}
> 2020-08-14 15:10:06,406 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher- 15] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state RUNNING to CANCELLING.
> 2020-08-14 15:10:06,415 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Shutting down per-job cluster 
> because the job was canceled.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping dispatcher 
> akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping all currently running jobs 
> of dispatcher akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,631 INFO org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Stopping the JobMaster for job 
> EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,632 DEBUG org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Disconnect TaskExecutor 
> container_e144_1590060720089_2161_01_06 because: Stopping JobMaster for 
> job EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,646 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher-29] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state CANCELLING to CANCELED.
> 2020-08-14 15:10:06,664 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-4] - There is a newer JobManagerRunner 
> for the job 6f784d4cc5bae88a332d254b21660372.
> {panel}
> From the log, we can see that job is not finished when dispatcher closes. The 
> process is as following:
>  * Receive cancel command and send it to all tasks async.
>  * In MiniDispatcher, begin to shutting down per-job cluster.
>  * Stopping dispatcher and remove job.
>  * Job is cancelled and callback is executed in method startJobManagerRunner.
>  * Because job is removed before, so currentJobManagerRunner is null which 
> not equals to the original jobManagerRunner. In this case, 
> archivedExecutionGraph will not be uploaded.
> In normal cases, I find that job is cancelled first and then dispatcher is 
> stopped so that archivedExecutionGraph will succeed. But I think that the 
> order is not constrained and it is hard to know which comes first. 
> Above is what I suspected. If so, then we should fix it.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19065) java.lang.IllegalStateException: Auto generated UIDs have been disabled on join

2020-08-28 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17186351#comment-17186351
 ] 

Aljoscha Krettek commented on FLINK-19065:
--

Could you quickly try and run that on a cluster without 
{{disableAutoGeneratedUIDs()}}? There's a {{map()}} used internally somewhere 
but I can't find it off the top of my head right now. 

> java.lang.IllegalStateException: Auto generated UIDs have been disabled on 
> join
> ---
>
> Key: FLINK-19065
> URL: https://issues.apache.org/jira/browse/FLINK-19065
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Maris
>Priority: Major
>
> Join operation with AutoGeneratedUID disabled leads to 
> {code:java}
> java.lang.IllegalStateException: Auto generated UIDs have been disabled but 
> no UID or hash has been assigned to operator Map
> {code}
> code to reproduce
> {code:java}
> class JoinSpec extends AnyFlatSpec with Matchers with Serializable {
>   it should "be able to join streams" in {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.disableAutoGeneratedUIDs()
> val a = env.fromCollection(List("1", "2", 
> "3")).name("a").uid("source-uid")
> val b = env.fromCollection(List("1", "2", 
> "3")).name("b").uid("source-uid2")
> val c = a
>   .join(b)
>   .where(identity)
>   .equalTo(identity)
>   .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))((a, b) => 
> a+b)
>   .uid("joined").name("joined")
> c.addSink(s => println(s))
>   .name("ab")
>   .uid("ab")
> println(env.getExecutionPlan)
> env.execute
> succeed
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19069) finalizeOnMaster takes too much time and client timeouts

2020-08-28 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17186344#comment-17186344
 ] 

Aljoscha Krettek commented on FLINK-19069:
--

Does this really only affect Flink 1.9 or should we set _affects version_ to 
1.10.x and 1.11.x as well?

> finalizeOnMaster takes too much time and client timeouts
> 
>
> Key: FLINK-19069
> URL: https://issues.apache.org/jira/browse/FLINK-19069
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.9.0
>Reporter: Jiayi Liao
>Priority: Major
>
> Currently we execute {{finalizeOnMaster}} in JM's main thread, which may 
> stuck the JM for a very long time and client timeouts eventually. 
> For example, we'd like to write data to HDFS  and commit files on JM, which 
> takes more than ten minutes to commit tens of thousands files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19060) Checkpoint not triggered when use broadcast stream

2020-08-27 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185740#comment-17185740
 ] 

Aljoscha Krettek commented on FLINK-19060:
--

Hi, would you mind also trying this with an earlier Flink version to see if the 
issue also exists there? Broadcast state/streams are available since Flink 1.5.

> Checkpoint not triggered when use broadcast stream
> --
>
> Key: FLINK-19060
> URL: https://issues.apache.org/jira/browse/FLINK-19060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: henvealf
>Priority: Major
> Attachments: image-2020-08-27-16-41-23-699.png, 
> image-2020-08-27-16-44-37-442.png, image-2020-08-27-16-45-28-134.png, 
> image-2020-08-27-16-51-10-512.png
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Code:
> !image-2020-08-27-16-51-10-512.png!
> KafkaSourceConfig:
> consumer.setStartFromGroupOffsets()
> Web UI:
>     !image-2020-08-27-16-45-28-134.png!
> Checkpoint always doesn't happen. Did I write something wrong?
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19006) project transformation does not support conversion to Tuple25 type

2020-08-27 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185727#comment-17185727
 ] 

Aljoscha Krettek commented on FLINK-19006:
--

This code is more than 6 years old by now.  We can fix it, but my general 
opinion is that we should remove these database-like or relational methods from 
the DataStream API. See also this thread: 
https://lists.apache.org/thread.html/r16bc0d1755a72e29087a97f11d26a41c35f873ef29b8fd5e0357e286%40%3Cdev.flink.apache.org%3E

> project transformation does not support conversion to Tuple25 type
> --
>
> Key: FLINK-19006
> URL: https://issues.apache.org/jira/browse/FLINK-19006
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: ming li
>Priority: Major
>
> In the {{DataStream#project}} method, it will judge whether the length of 
> {{fieldIndexes}} is between 1 and {{Tuple.MAX_ARITY-1}}, and then call 
> {{projectTupleXX}} according to the length of {{fieldIndexes}}. This limits 
> the maximum length of {{Tuple}} to 24.
> {code:java}
> protected StreamProjection(DataStream dataStream, int[] fieldIndexes) {
>if (!dataStream.getType().isTupleType()) {
>   throw new RuntimeException("Only Tuple DataStreams can be projected");
>}
>if (fieldIndexes.length == 0) {
>   throw new IllegalArgumentException("project() needs to select at least 
> one (1) field.");
>} else if (fieldIndexes.length > Tuple.MAX_ARITY - 1) {
>   throw new IllegalArgumentException(
> "project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") 
> fields.");
>}
>int maxFieldIndex = (dataStream.getType()).getArity();
>for (int i = 0; i < fieldIndexes.length; i++) {
>   Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex);
>}
>this.dataStream = dataStream;
>this.fieldIndexes = fieldIndexes;
> }{code}
> This problem also appears in {{ProjectOperator}}.
> {code:java}
> public Projection(DataSet ds, int[] fieldIndexes) {
>if (!(ds.getType() instanceof TupleTypeInfo)) {
>   throw new UnsupportedOperationException("project() can only be applied 
> to DataSets of Tuples.");
>}
>if (fieldIndexes.length == 0) {
>   throw new IllegalArgumentException("project() needs to select at least 
> one (1) field.");
>} else if (fieldIndexes.length > Tuple.MAX_ARITY - 1) {
>   throw new IllegalArgumentException(
>  "project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") 
> fields.");
>}
>int maxFieldIndex = ds.getType().getArity();
>for (int fieldIndexe : fieldIndexes) {
>   Preconditions.checkElementIndex(fieldIndexe, maxFieldIndex);
>}
>this.ds = ds;
>this.fieldIndexes = fieldIndexes;
> }{code}
> I think the length we limit should be 1 to {{Tuple.MAX_ARITY}} instead of 1 
> to {{Tuple.MAX_ARITY-1}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-19052) Performance issue with PojoSerializer

2020-08-27 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-19052.

Fix Version/s: 1.12.0
 Assignee: Roman Grebennikov
   Resolution: Fixed

master: 162e3ead63e5766cf2116af09922a88537889e53

> Performance issue with PojoSerializer
> -
>
> Key: FLINK-19052
> URL: https://issues.apache.org/jira/browse/FLINK-19052
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Affects Versions: 1.11.1
> Environment: Flink 1.12 master on 26.08.2020
>Reporter: Roman Grebennikov
>Assignee: Roman Grebennikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
> Attachments: image-2020-08-26-10-46-19-800.png, 
> image-2020-08-26-10-49-59-400.png
>
>
> Currently PojoSerializer.createInstance() uses reflection call to create a 
> class instance. As this method is called on each stream element on 
> deserialization, reflection overhead can become noticeable in 
> serialization-bound cases when:
>  # Pojo class is small, so instantiation is noticeable.
>  # The job is not having heavy CPU-bound event processing.
> See this flamegraph built for 
> flink-benchmarks/SerializationFrameworkMiniBenchmarks.serializerPojo 
> benchmark:
> !image-2020-08-26-10-46-19-800.png!
> This Reflection.getCallerClass method consumes a lot of CPU, mostly doing a 
> security check if we allowed to do this reflective call.
>  
> There is no true reason to perform this check on each deserializing event, so 
> to speed things up we can just cache the constructor using MetaHandle, so 
> this check will be performed only once. With this tiny fix, the 
> getCallerClass is gone:
>  
> !image-2020-08-26-10-49-59-400.png!
>  
> The benchmark result:
> {noformat}
> serializerPojo thrpt 100 487.706 ± 30.480 ops/ms
> serializerPojo thrpt 100 569.828 ± 8.815 ops/m{noformat}
> Which is +15% to throughput.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19053) Flink Kafka Connector Dependency Error

2020-08-27 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185721#comment-17185721
 ] 

Aljoscha Krettek commented on FLINK-19053:
--

Yes, we should fix that!

> Flink Kafka Connector Dependency Error
> --
>
> Key: FLINK-19053
> URL: https://issues.apache.org/jira/browse/FLINK-19053
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.11.1
>Reporter: liugh
>Priority: Major
> Attachments: 0.10.png, 0.11.png
>
>
> From the Flink doucumention 
> URL:[https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html]
> We can see the "Dependency" section as follows:
> {code}
> 
>   org.apache.flink
>   flink-connector-kafka-010_2.11
>   1.11.0
> 
> {code}
> {code}
> 
>   org.apache.flink
>   flink-connector-kafka-011_2.11
>   1.11.0
> 
> {code}
> However,I couldn't get the correct jar in the pom.xml which was configured 
> the dependency as shown above.
> Then I searched it in the [https://mvnrepository.com/] and aliyun maven,I 
> found the dependency should be as follows:
> {code}
> 
>   org.apache.flink
>   flink-connector-kafka-0.10_2.11
>   1.11.0
> 
> {code}
> {code}
> 
>   org.apache.flink
>   flink-connector-kafka-0.11_2.11
>   1.11.0
> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-17159) ES6 ElasticsearchSinkITCase unstable

2020-08-25 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-17159.

Fix Version/s: 1.11.2
   1.12.0
   Resolution: Fixed

Fix on release-1.11: 6c97f22913197e7a5a948f67b7a0b7f8fb480fa7

Please re-open if the issue persists.

> ES6 ElasticsearchSinkITCase unstable
> 
>
> Key: FLINK-17159
> URL: https://issues.apache.org/jira/browse/FLINK-17159
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0, 1.11.2
>
>
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7482=logs=64110e28-73be-50d7-9369-8750330e0bf1=aa84fb9a-59ae-5696-70f7-011bc086e59b]
> {code:java}
> 2020-04-15T02:37:04.4289477Z [ERROR] 
> testElasticsearchSinkWithSmile(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase)
>   Time elapsed: 0.145 s  <<< ERROR!
> 2020-04-15T02:37:04.4290310Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-04-15T02:37:04.4290790Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> 2020-04-15T02:37:04.4291404Z  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659)
> 2020-04-15T02:37:04.4291956Z  at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77)
> 2020-04-15T02:37:04.4292548Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
> 2020-04-15T02:37:04.4293254Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticSearchSinkTest(ElasticsearchSinkTestBase.java:128)
> 2020-04-15T02:37:04.4293990Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticsearchSinkSmileTest(ElasticsearchSinkTestBase.java:106)
> 2020-04-15T02:37:04.4295096Z  at 
> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase.testElasticsearchSinkWithSmile(ElasticsearchSinkITCase.java:45)
> 2020-04-15T02:37:04.4295923Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-04-15T02:37:04.4296489Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-04-15T02:37:04.4297076Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-04-15T02:37:04.4297513Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-04-15T02:37:04.4297951Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-04-15T02:37:04.4298688Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-04-15T02:37:04.4299374Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-04-15T02:37:04.4300069Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-04-15T02:37:04.4300960Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-04-15T02:37:04.4301705Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-15T02:37:04.4302204Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-04-15T02:37:04.4302661Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-04-15T02:37:04.4303234Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-04-15T02:37:04.4303706Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-04-15T02:37:04.4304127Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-04-15T02:37:04.4304716Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-04-15T02:37:04.4305394Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-04-15T02:37:04.4305965Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-04-15T02:37:04.4306425Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2020-04-15T02:37:04.4306942Z  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2020-04-15T02:37:04.4307466Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4307920Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4308375Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 

[jira] [Commented] (FLINK-17159) ES6 ElasticsearchSinkITCase unstable

2020-08-25 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184125#comment-17184125
 ] 

Aljoscha Krettek commented on FLINK-17159:
--

Potential fix on master: f6467816334ae04da9e72ee759ad007d60bfdca7

> ES6 ElasticsearchSinkITCase unstable
> 
>
> Key: FLINK-17159
> URL: https://issues.apache.org/jira/browse/FLINK-17159
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7482=logs=64110e28-73be-50d7-9369-8750330e0bf1=aa84fb9a-59ae-5696-70f7-011bc086e59b]
> {code:java}
> 2020-04-15T02:37:04.4289477Z [ERROR] 
> testElasticsearchSinkWithSmile(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase)
>   Time elapsed: 0.145 s  <<< ERROR!
> 2020-04-15T02:37:04.4290310Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-04-15T02:37:04.4290790Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> 2020-04-15T02:37:04.4291404Z  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659)
> 2020-04-15T02:37:04.4291956Z  at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77)
> 2020-04-15T02:37:04.4292548Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
> 2020-04-15T02:37:04.4293254Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticSearchSinkTest(ElasticsearchSinkTestBase.java:128)
> 2020-04-15T02:37:04.4293990Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticsearchSinkSmileTest(ElasticsearchSinkTestBase.java:106)
> 2020-04-15T02:37:04.4295096Z  at 
> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase.testElasticsearchSinkWithSmile(ElasticsearchSinkITCase.java:45)
> 2020-04-15T02:37:04.4295923Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-04-15T02:37:04.4296489Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-04-15T02:37:04.4297076Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-04-15T02:37:04.4297513Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-04-15T02:37:04.4297951Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-04-15T02:37:04.4298688Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-04-15T02:37:04.4299374Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-04-15T02:37:04.4300069Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-04-15T02:37:04.4300960Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-04-15T02:37:04.4301705Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-15T02:37:04.4302204Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-04-15T02:37:04.4302661Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-04-15T02:37:04.4303234Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-04-15T02:37:04.4303706Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-04-15T02:37:04.4304127Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-04-15T02:37:04.4304716Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-04-15T02:37:04.4305394Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-04-15T02:37:04.4305965Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-04-15T02:37:04.4306425Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2020-04-15T02:37:04.4306942Z  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2020-04-15T02:37:04.4307466Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4307920Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4308375Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4308782Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-15T02:37:04.4309182Z  at 
> 

[jira] [Commented] (FLINK-18830) JoinCoGroupFunction and FlatJoinCoGroupFunction work incorrectly for outer join when one side of coGroup is empty

2020-08-25 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183991#comment-17183991
 ] 

Aljoscha Krettek commented on FLINK-18830:
--

The documentation describes {{DataStream.join()}} as an _inner join_: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html.
 The relevant excerpt is:
{quote}Some notes on semantics:

* The creation of pairwise combinations of elements of the two streams behaves 
like an inner-join, meaning elements from one stream will not be emitted if 
they don’t have a corresponding element from the other stream to be joined with.
{quote}

Is this about using {{JoinCoGroupFunction}} in other context or about changing 
the behaviour of the DataStream API?

> JoinCoGroupFunction and FlatJoinCoGroupFunction work incorrectly for outer 
> join when one side of coGroup is empty
> -
>
> Key: FLINK-18830
> URL: https://issues.apache.org/jira/browse/FLINK-18830
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: liupengcheng
>Priority: Major
>
> Currently, The {{JoinCoGroupFunction}} and {{FlatJoinCoGroupFunction}} in 
> JoinedStreams doesn't respect the join type, it's been implemented as doing 
> join within a two-level loop. However, this is incorrect for outer join when 
> one side of the coGroup is empty.
> {code}
>   public void coGroup(Iterable first, Iterable second, 
> Collector out) throws Exception {
>   for (T1 val1: first) {
>   for (T2 val2: second) {
>   wrappedFunction.join(val1, val2, out);
>   }
>   }
>   }
> {code}
> The above code is the current implementation, suppose the first input is 
> non-empty, and the second input is an empty iterator, then the join 
> function(`wrappedFunction`) will never be called. This will cause no data to 
> be emitted for a left outer join.
> So I propose to consider join type here, and handle this case, e.g., for left 
> outer join, we can emit record with right side set to null here if the right 
> side is empty or can not find any match in the right side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-17159) ES6 ElasticsearchSinkITCase unstable

2020-08-25 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-17159:


Assignee: Aljoscha Krettek

> ES6 ElasticsearchSinkITCase unstable
> 
>
> Key: FLINK-17159
> URL: https://issues.apache.org/jira/browse/FLINK-17159
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7482=logs=64110e28-73be-50d7-9369-8750330e0bf1=aa84fb9a-59ae-5696-70f7-011bc086e59b]
> {code:java}
> 2020-04-15T02:37:04.4289477Z [ERROR] 
> testElasticsearchSinkWithSmile(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase)
>   Time elapsed: 0.145 s  <<< ERROR!
> 2020-04-15T02:37:04.4290310Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-04-15T02:37:04.4290790Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> 2020-04-15T02:37:04.4291404Z  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659)
> 2020-04-15T02:37:04.4291956Z  at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77)
> 2020-04-15T02:37:04.4292548Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
> 2020-04-15T02:37:04.4293254Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticSearchSinkTest(ElasticsearchSinkTestBase.java:128)
> 2020-04-15T02:37:04.4293990Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticsearchSinkSmileTest(ElasticsearchSinkTestBase.java:106)
> 2020-04-15T02:37:04.4295096Z  at 
> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase.testElasticsearchSinkWithSmile(ElasticsearchSinkITCase.java:45)
> 2020-04-15T02:37:04.4295923Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-04-15T02:37:04.4296489Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-04-15T02:37:04.4297076Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-04-15T02:37:04.4297513Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-04-15T02:37:04.4297951Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-04-15T02:37:04.4298688Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-04-15T02:37:04.4299374Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-04-15T02:37:04.4300069Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-04-15T02:37:04.4300960Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-04-15T02:37:04.4301705Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-15T02:37:04.4302204Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-04-15T02:37:04.4302661Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-04-15T02:37:04.4303234Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-04-15T02:37:04.4303706Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-04-15T02:37:04.4304127Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-04-15T02:37:04.4304716Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-04-15T02:37:04.4305394Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-04-15T02:37:04.4305965Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-04-15T02:37:04.4306425Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2020-04-15T02:37:04.4306942Z  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2020-04-15T02:37:04.4307466Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4307920Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4308375Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4308782Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-15T02:37:04.4309182Z  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 

[jira] [Commented] (FLINK-12130) Apply command line options to configuration before installing security modules

2020-08-25 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183825#comment-17183825
 ] 

Aljoscha Krettek commented on FLINK-12130:
--

Sure, that would be good!

> Apply command line options to configuration before installing security modules
> --
>
> Key: FLINK-12130
> URL: https://issues.apache.org/jira/browse/FLINK-12130
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Major
>
> Currently if the user configures Kerberos credentials through command line, 
> it won't work.
> {code:java}
> // flink run -m yarn-cluster -yD 
> security.kerberos.login.keytab=/path/to/keytab -yD 
> security.kerberos.login.principal=xxx /path/to/test.jar
> {code}
> Above command would cause security failure if you do not have a ticket cache 
> w/ kinit.
> Maybe we could call 
> _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
>   before _SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));_
> Here is a demo patch: 
> [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18959) Fail to archiveExecutionGraph because job is not finished when dispatcher close

2020-08-25 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183821#comment-17183821
 ] 

Aljoscha Krettek commented on FLINK-18959:
--

Yes, exactly. The cluster should shut down in both modes. Maybe we can even get 
rid of the split between NORMAL and DETACHED by now.

> Fail to archiveExecutionGraph because job is not finished when dispatcher 
> close
> ---
>
> Key: FLINK-18959
> URL: https://issues.apache.org/jira/browse/FLINK-18959
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.12.0, 1.11.1
>Reporter: Liu
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
> Attachments: flink-debug-log
>
>
> When job is cancelled, we expect to see it in flink's history server. But I 
> can not see my job after it is cancelled.
> After digging into the problem, I find that the function 
> archiveExecutionGraph is not executed. Below is the brief log:
> {panel:title=log}
> 2020-08-14 15:10:06,406 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher- 15] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state RUNNING to CANCELLING.
> 2020-08-14 15:10:06,415 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Shutting down per-job cluster 
> because the job was canceled.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping dispatcher 
> akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping all currently running jobs 
> of dispatcher akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,631 INFO org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Stopping the JobMaster for job 
> EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,632 DEBUG org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Disconnect TaskExecutor 
> container_e144_1590060720089_2161_01_06 because: Stopping JobMaster for 
> job EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,646 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher-29] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state CANCELLING to CANCELED.
> 2020-08-14 15:10:06,664 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-4] - There is a newer JobManagerRunner 
> for the job 6f784d4cc5bae88a332d254b21660372.
> {panel}
> From the log, we can see that job is not finished when dispatcher closes. The 
> process is as following:
>  * Receive cancel command and send it to all tasks async.
>  * In MiniDispatcher, begin to shutting down per-job cluster.
>  * Stopping dispatcher and remove job.
>  * Job is cancelled and callback is executed in method startJobManagerRunner.
>  * Because job is removed before, so currentJobManagerRunner is null which 
> not equals to the original jobManagerRunner. In this case, 
> archivedExecutionGraph will not be uploaded.
> In normal cases, I find that job is cancelled first and then dispatcher is 
> stopped so that archivedExecutionGraph will succeed. But I think that the 
> order is not constrained and it is hard to know which comes first. 
> Above is what I suspected. If so, then we should fix it.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18959) Fail to archiveExecutionGraph because job is not finished when dispatcher close

2020-08-24 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183342#comment-17183342
 ] 

Aljoscha Krettek commented on FLINK-18959:
--

If that means the cluster still shuts down by itself upon cancellation, then 
yes.

> Fail to archiveExecutionGraph because job is not finished when dispatcher 
> close
> ---
>
> Key: FLINK-18959
> URL: https://issues.apache.org/jira/browse/FLINK-18959
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.12.0, 1.11.1
>Reporter: Liu
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
> Attachments: flink-debug-log
>
>
> When job is cancelled, we expect to see it in flink's history server. But I 
> can not see my job after it is cancelled.
> After digging into the problem, I find that the function 
> archiveExecutionGraph is not executed. Below is the brief log:
> {panel:title=log}
> 2020-08-14 15:10:06,406 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher- 15] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state RUNNING to CANCELLING.
> 2020-08-14 15:10:06,415 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Shutting down per-job cluster 
> because the job was canceled.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping dispatcher 
> akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping all currently running jobs 
> of dispatcher akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,631 INFO org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Stopping the JobMaster for job 
> EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,632 DEBUG org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Disconnect TaskExecutor 
> container_e144_1590060720089_2161_01_06 because: Stopping JobMaster for 
> job EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,646 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher-29] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state CANCELLING to CANCELED.
> 2020-08-14 15:10:06,664 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-4] - There is a newer JobManagerRunner 
> for the job 6f784d4cc5bae88a332d254b21660372.
> {panel}
> From the log, we can see that job is not finished when dispatcher closes. The 
> process is as following:
>  * Receive cancel command and send it to all tasks async.
>  * In MiniDispatcher, begin to shutting down per-job cluster.
>  * Stopping dispatcher and remove job.
>  * Job is cancelled and callback is executed in method startJobManagerRunner.
>  * Because job is removed before, so currentJobManagerRunner is null which 
> not equals to the original jobManagerRunner. In this case, 
> archivedExecutionGraph will not be uploaded.
> In normal cases, I find that job is cancelled first and then dispatcher is 
> stopped so that archivedExecutionGraph will succeed. But I think that the 
> order is not constrained and it is hard to know which comes first. 
> Above is what I suspected. If so, then we should fix it.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17260) StreamingKafkaITCase failure on Azure

2020-08-24 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183306#comment-17183306
 ] 

Aljoscha Krettek commented on FLINK-17260:
--

I believe you're right: KAFKA-4815. We should remove testing for the 0.10 
producer, or maybe remove that producer altogether.

> StreamingKafkaITCase failure on Azure
> -
>
> Key: FLINK-17260
> URL: https://issues.apache.org/jira/browse/FLINK-17260
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Roman Khachatryan
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/7544/logs/165]
>  
> {code:java}
> 2020-04-16T00:12:32.2848429Z [INFO] Running 
> org.apache.flink.tests.util.kafka.StreamingKafkaITCase
> 2020-04-16T00:14:47.9100927Z [ERROR] Tests run: 3, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 135.621 s <<< FAILURE! - in 
> org.apache.flink.tests.util.k afka.StreamingKafkaITCase
> 2020-04-16T00:14:47.9103036Z [ERROR] testKafka[0: 
> kafka-version:0.10.2.0](org.apache.flink.tests.util.kafka.StreamingKafkaITCase)
>   Time elapsed: 46.222 s  <<<  FAILURE!
> 2020-04-16T00:14:47.9104033Z java.lang.AssertionError: 
> expected:<[elephant,27,64213]> but was:<[]>
> 2020-04-16T00:14:47.9104638Zat org.junit.Assert.fail(Assert.java:88)
> 2020-04-16T00:14:47.9105148Zat 
> org.junit.Assert.failNotEquals(Assert.java:834)
> 2020-04-16T00:14:47.9105701Zat 
> org.junit.Assert.assertEquals(Assert.java:118)
> 2020-04-16T00:14:47.9106239Zat 
> org.junit.Assert.assertEquals(Assert.java:144)
> 2020-04-16T00:14:47.9107177Zat 
> org.apache.flink.tests.util.kafka.StreamingKafkaITCase.testKafka(StreamingKafkaITCase.java:162)
> 2020-04-16T00:14:47.9107845Zat 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-04-16T00:14:47.9108434Zat 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-04-16T00:14:47.9109318Zat 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-04-16T00:14:47.9109914Zat 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-04-16T00:14:47.9110434Zat 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-04-16T00:14:47.9110985Zat 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-04-16T00:14:47.9111548Zat 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-04-16T00:14:47.9112083Zat 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-04-16T00:14:47.9112629Zat 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-16T00:14:47.9113145Zat 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-16T00:14:47.9113637Zat 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-04-16T00:14:47.9114072Zat 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-16T00:14:47.9114490Zat 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-04-16T00:14:47.9115256Zat 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-04-16T00:14:47.9115791Zat 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-04-16T00:14:47.9116292Zat 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-04-16T00:14:47.9116736Zat 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-04-16T00:14:47.9117779Zat 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-04-16T00:14:47.9118274Zat 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-04-16T00:14:47.9118766Zat 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-04-16T00:14:47.9119204Zat 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2020-04-16T00:14:47.9119625Zat 
> org.junit.runners.Suite.runChild(Suite.java:128)
> 2020-04-16T00:14:47.9120005Zat 
> org.junit.runners.Suite.runChild(Suite.java:27)
> 2020-04-16T00:14:47.9120428Zat 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-04-16T00:14:47.9120876Zat 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-04-16T00:14:47.9121350Zat 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-04-16T00:14:47.9121805Zat 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-04-16T00:14:47.9122273Zat 
> 

[jira] [Reopened] (FLINK-12130) Apply command line options to configuration before installing security modules

2020-08-24 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-12130:
--

> Apply command line options to configuration before installing security modules
> --
>
> Key: FLINK-12130
> URL: https://issues.apache.org/jira/browse/FLINK-12130
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Major
>
> Currently if the user configures Kerberos credentials through command line, 
> it won't work.
> {code:java}
> // flink run -m yarn-cluster -yD 
> security.kerberos.login.keytab=/path/to/keytab -yD 
> security.kerberos.login.principal=xxx /path/to/test.jar
> {code}
> Above command would cause security failure if you do not have a ticket cache 
> w/ kinit.
> Maybe we could call 
> _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
>   before _SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));_
> Here is a demo patch: 
> [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12130) Apply command line options to configuration before installing security modules

2020-08-24 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183260#comment-17183260
 ] 

Aljoscha Krettek commented on FLINK-12130:
--

I think this still needs work then.

> Apply command line options to configuration before installing security modules
> --
>
> Key: FLINK-12130
> URL: https://issues.apache.org/jira/browse/FLINK-12130
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Major
>
> Currently if the user configures Kerberos credentials through command line, 
> it won't work.
> {code:java}
> // flink run -m yarn-cluster -yD 
> security.kerberos.login.keytab=/path/to/keytab -yD 
> security.kerberos.login.principal=xxx /path/to/test.jar
> {code}
> Above command would cause security failure if you do not have a ticket cache 
> w/ kinit.
> Maybe we could call 
> _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
>   before _SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));_
> Here is a demo patch: 
> [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14054) Enable checkpointing via job configuration

2020-08-24 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-14054.

Fix Version/s: 1.10.0
   Resolution: Fixed

This should be fixed by FLINK-14788.

Thanks [~qinjunjerry] for the reminder!

> Enable checkpointing via job configuration
> --
>
> Key: FLINK-14054
> URL: https://issues.apache.org/jira/browse/FLINK-14054
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Configuration
>Reporter: Jun Qin
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently enabling checkpointing can only be done via the job code, see the 
> following quote from this Flink 
> [checkpointing|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing]
>  doc:
> {quote}By default, checkpointing is disabled. To enable checkpointing, call 
> {{enableCheckpointing(n)}} on the {{StreamExecutionEnvironment}}, where _n_ 
> is the checkpoint interval in milliseconds.
> {quote}
> This makes enabling checkingpointing after the job code has been released 
> difficult: one has to change and rebuild the job code.
> In addition, not only for developer, making checkpointing enabling 
> configurable is also of interest for operation teams:
>  * They may want to enable checkpointing for production but disable in test 
> (e.g., to save storage space)
>  * They may want to try out with and without checkpointing to evaluate the 
> impact to the job behaviour and performance.  
> Therefore, this request.  Thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11779) CLI ignores -m parameter if high-availability is ZOOKEEPER

2020-08-24 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183250#comment-17183250
 ] 

Aljoscha Krettek commented on FLINK-11779:
--

Agreed, I think we need to make this more prominent in the documentation, then, 
as the original creator of the issue also suggested.

> CLI ignores -m parameter if high-availability is ZOOKEEPER 
> ---
>
> Key: FLINK-11779
> URL: https://issues.apache.org/jira/browse/FLINK-11779
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Gary Yao
>Assignee: leesf
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> *Description*
> The CLI will ignores the host/port provided by the {{-m}} parameter if 
> {{high-availability: ZOOKEEPER}} is configured in {{flink-conf.yaml}}
> *Expected behavior*
> * TBD: either document this behavior or give precedence to {{-m}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18960) flink sideoutput union

2020-08-24 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17181004#comment-17181004
 ] 

Aljoscha Krettek edited comment on FLINK-18960 at 8/24/20, 12:43 PM:
-

hi [~xiaohang.li], I checked the issue, it does existed in old versions, but it 
should be fixed in FLINK-17578, you may try to upgrade to 1.10.2 or 1.11. 


was (Author: gaoyunhaii):
hi [~xiaohang.li], I checked the issue, it does existed in old versions, but it 
should be fixed in https://issues.apache.org/jira/browse/FLINK-17578, you may 
try to upgrade to 1.10.2 or 1.11. 

> flink sideoutput union
> --
>
> Key: FLINK-18960
> URL: https://issues.apache.org/jira/browse/FLINK-18960
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.10.1
>Reporter: xiaohang.li
>Priority: Minor
>
> Flink sideoutput union seems not works right. If we union the sideoutput from 
> the same operator, the output is the result of last side output times by the 
> number of unions, which is not expected. For example,
> {code:java}
> val side = new OutputTag[String]("side")
>  val side2 = new OutputTag[String]("side2")
>  val side3 = new OutputTag[String]("side3")
>  val ds = env.socketTextStream("master",9001)
>  val res = ds.process(new ProcessFunction[String,String] {
>  override def processElement(value: String, ctx: ProcessFunction[String, 
> String]#Context, out: Collector[String]): Unit = {
>  if(value.contains("hello"))
> { ctx.output(side,value) }
> else if(value.contains("world"))
> { ctx.output(side2,value) }
> else if(value.contains("flink"))
> { ctx.output(side3,value) }
> out.collect(value)
>  }
>  })
> val res1 = res.getSideOutput(side)
>  val res2 = res.getSideOutput(side2)
>  val res3 = res.getSideOutput(side3)
> println( ">"+res1.getClass)
>  println( ">"+res2.getClass)
> res1.print("res1")
>  res2.print("res2")
>  res3.print("res3")
> res2.union(res1).union(res3).print("all")
> {code}
>  
>  If we input 
> {code:java}
> hello
> world
> flink
> {code}
> The output will be 
>  
> {code:java}
> res1> hello
>  res2> world
>  res3> flink
>  all> flink
>  all> flink
>  all> flink
> {code}
>  
> But the expected output would be 
> {code:java}
> res1> hello
> res2> world
> res3> flink
> all> hello 
> all> world 
> all> flink
> {code}
>  
>  
> if we add a _map_ after the sideoutput and then union them, the output would 
> be right, but adding map should be not needed. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18934) Idle stream does not advance watermark in connected stream

2020-08-24 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183225#comment-17183225
 ] 

Aljoscha Krettek commented on FLINK-18934:
--

Is there any update on this?

> Idle stream does not advance watermark in connected stream
> --
>
> Key: FLINK-18934
> URL: https://issues.apache.org/jira/browse/FLINK-18934
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: Truong Duc Kien
>Priority: Major
>
> Per Flink documents, when a stream is idle, it will allow watermarks of 
> downstream operator to advance. However, when I connect an active data stream 
> with an idle data stream, the output watermark of the CoProcessOperator does 
> not increase.
> Here's a small test that reproduces the problem.
> https://github.com/kien-truong/flink-idleness-testing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18909) SequenceGenerator doesn't work as expected

2020-08-24 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183221#comment-17183221
 ] 

Aljoscha Krettek commented on FLINK-18909:
--

[~lzljs3620320] Could you please take a look?

> SequenceGenerator doesn't work as expected
> --
>
> Key: FLINK-18909
> URL: https://issues.apache.org/jira/browse/FLINK-18909
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: Alan
>Priority: Minor
>
> When using a 
> org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator
> with 0 and Long.MaxValue the following exception is rasied:
> <>
> new SequenceGenerator(0, Long.MAX_VALUE);
>  
> Caused by: java.lang.IllegalArgumentExceptionCaused by: 
> java.lang.IllegalArgumentException at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at 
> org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator.safeDivide(SequenceGenerator.java:110)
>  at 
> org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator.open(SequenceGenerator.java:83)
>  at 
> org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.initializeState(DataGeneratorSource.java:64)
>  
> This is because the Long overflows. in the check: 
> long totalNoOfElements = Math.abs(end - start + 1);
> Which is incorrect. - this code should use BigInteger.  
>  
> Using
> new SequenceGenerator(0, Long.MAX_VALUE-1);
> Still fails on the following check:
> Caused by: java.lang.IllegalArgumentExceptionCaused by: 
> java.lang.IllegalArgumentException at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at 
> org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator.safeDivide(SequenceGenerator.java:111)
>  at 
> org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator.open(SequenceGenerator.java:83)
>  at 
> org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.initializeState(DataGeneratorSource.java:64)
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18959) Fail to archiveExecutionGraph because job is not finished when dispatcher close

2020-08-24 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183202#comment-17183202
 ] 

Aljoscha Krettek commented on FLINK-18959:
--

Yes, this is an unintended side effect of FLINK-15116. Before that change, 
shutting down a per-job cluster was handled by the client, which meant that the 
cluster could get "stuck" in case the client disconnected. With the change, 
we're eagerly shutting down the cluster when cancelling from "within the 
cluster". It seems we're shutting down too eagerly, though, and we should wait 
in the cluster until cancel/shutdown went through all the stages (including 
storing the archived execution graph).

> Fail to archiveExecutionGraph because job is not finished when dispatcher 
> close
> ---
>
> Key: FLINK-18959
> URL: https://issues.apache.org/jira/browse/FLINK-18959
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.12.0, 1.11.1
>Reporter: Liu
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
> Attachments: flink-debug-log
>
>
> When job is cancelled, we expect to see it in flink's history server. But I 
> can not see my job after it is cancelled.
> After digging into the problem, I find that the function 
> archiveExecutionGraph is not executed. Below is the brief log:
> {panel:title=log}
> 2020-08-14 15:10:06,406 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher- 15] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state RUNNING to CANCELLING.
> 2020-08-14 15:10:06,415 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Shutting down per-job cluster 
> because the job was canceled.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping dispatcher 
> akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,629 INFO 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-3] - Stopping all currently running jobs 
> of dispatcher akka.tcp://flink@bjfk-c9865.yz02:38663/user/dispatcher.
> 2020-08-14 15:10:06,631 INFO org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Stopping the JobMaster for job 
> EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,632 DEBUG org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-29] - Disconnect TaskExecutor 
> container_e144_1590060720089_2161_01_06 because: Stopping JobMaster for 
> job EtlAndWindow(6f784d4cc5bae88a332d254b21660372).
> 2020-08-14 15:10:06,646 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph 
> [flink-akka.actor.default-dispatcher-29] - Job EtlAndWindow 
> (6f784d4cc5bae88a332d254b21660372) switched from state CANCELLING to CANCELED.
> 2020-08-14 15:10:06,664 DEBUG 
> org.apache.flink.runtime.dispatcher.MiniDispatcher 
> [flink-akka.actor.default-dispatcher-4] - There is a newer JobManagerRunner 
> for the job 6f784d4cc5bae88a332d254b21660372.
> {panel}
> From the log, we can see that job is not finished when dispatcher closes. The 
> process is as following:
>  * Receive cancel command and send it to all tasks async.
>  * In MiniDispatcher, begin to shutting down per-job cluster.
>  * Stopping dispatcher and remove job.
>  * Job is cancelled and callback is executed in method startJobManagerRunner.
>  * Because job is removed before, so currentJobManagerRunner is null which 
> not equals to the original jobManagerRunner. In this case, 
> archivedExecutionGraph will not be uploaded.
> In normal cases, I find that job is cancelled first and then dispatcher is 
> stopped so that archivedExecutionGraph will succeed. But I think that the 
> order is not constrained and it is hard to know which comes first. 
> Above is what I suspected. If so, then we should fix it.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18770) Emitting element fails in KryoSerializer

2020-08-03 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169849#comment-17169849
 ] 

Aljoscha Krettek commented on FLINK-18770:
--

Thanks for the analysis! Regarding object reuse: your intuition is correct, 
your code needs to be more defensive and not keep a handle to input objects, in 
a nutshell.

To answer your question: there should be no overhead of using object reuse, in 
fact the opposite should be the case because we potentially copy fewer objects.

Btw, I'll be on vacation for the next three weeks, so I'll probably not answer 
here for that time.

> Emitting element fails in KryoSerializer
> 
>
> Key: FLINK-18770
> URL: https://issues.apache.org/jira/browse/FLINK-18770
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.11.1
> Environment: Flink 1.11.1, Linux
>Reporter: Leonid Ilyevsky
>Priority: Major
> Attachments: AppMain.java, FlinkTest.scala, KryoException.txt, 
> SolaceSource.java, run_command.txt
>
>
> I wrote a simple Flink connector for Solace, see attached java file. It works 
> fine under local execution environment. However, when I deployed it in the 
> real Flink cluster, it failed with the Kryo exception, see attached.
> After a few hours of search and debugging, I can see now what is going on.
> The data I want to emit from this source is a simple byte array. In the 
> exception stack you can see that when I call 'collect' on the context, it 
> goes into OperatorChain.java:715, and then to KryoSerializer, where it 
> ultimately fails. I didn't have a chance to learn what KryoSerializer is and 
> why it would not know what to do with byte[], but that is not the point now.
> Then I used debugger in my local test, in order to figure out how it manages 
> to work. I saw that after OperatorChain.java:715 it goes into 
> BytePrimitiveArraySerializer, and then everything is working as expected. 
> Obviously BytePrimitiveArraySerializer makes sense for byte[] data.
> The question is, how can I configure the execution environment under cluster 
> so that it does serialization the same way as the local one? I looked at 
> [https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html]
>  , and I was thinking of setting disableForceKryo, but it says it is disabled 
> by default anyway.
>  
> Another question is, why cluster execution environment has different default 
> settings compare to local? This makes it difficult to rely on local tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18800) Avro serialization schema doesn't support Kafka key/value serialization

2020-08-03 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18800:
-
Component/s: (was: API / Type Serialization System)

> Avro serialization schema doesn't support  Kafka key/value serialization
> 
>
> Key: FLINK-18800
> URL: https://issues.apache.org/jira/browse/FLINK-18800
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile)
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Mohammad Hossein Gerami
>Priority: Major
>
> {color:#ff8b00}AvroSerializationSchema{color} and 
> {color:#ff8b00}ConfluentRegistryAvroSerializationSchema{color} doesn't 
> support Kafka key/value serialization. I implemented a custom Avro 
> serialization schema for solving this problem. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18770) Emitting element fails in KryoSerializer

2020-07-30 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168004#comment-17168004
 ] 

Aljoscha Krettek commented on FLINK-18770:
--

So it seems that the {{KryoSerializer}} just doesn't work for those classes. 
The {{KryoSerialize}} is used by Flink as the fallback serializer when it 
cannot determine a more concrete {{TypeInformation}} for a type. In that case 
you need to make sure to specify a custom TypeSerializer/TypeInformation in all 
the places where your Protobuf objects are sent between operations.

> Emitting element fails in KryoSerializer
> 
>
> Key: FLINK-18770
> URL: https://issues.apache.org/jira/browse/FLINK-18770
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.11.1
> Environment: Flink 1.11.1, Linux
>Reporter: Leonid Ilyevsky
>Priority: Major
> Attachments: AppMain.java, FlinkTest.scala, KryoException.txt, 
> SolaceSource.java, run_command.txt
>
>
> I wrote a simple Flink connector for Solace, see attached java file. It works 
> fine under local execution environment. However, when I deployed it in the 
> real Flink cluster, it failed with the Kryo exception, see attached.
> After a few hours of search and debugging, I can see now what is going on.
> The data I want to emit from this source is a simple byte array. In the 
> exception stack you can see that when I call 'collect' on the context, it 
> goes into OperatorChain.java:715, and then to KryoSerializer, where it 
> ultimately fails. I didn't have a chance to learn what KryoSerializer is and 
> why it would not know what to do with byte[], but that is not the point now.
> Then I used debugger in my local test, in order to figure out how it manages 
> to work. I saw that after OperatorChain.java:715 it goes into 
> BytePrimitiveArraySerializer, and then everything is working as expected. 
> Obviously BytePrimitiveArraySerializer makes sense for byte[] data.
> The question is, how can I configure the execution environment under cluster 
> so that it does serialization the same way as the local one? I looked at 
> [https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html]
>  , and I was thinking of setting disableForceKryo, but it says it is disabled 
> by default anyway.
>  
> Another question is, why cluster execution environment has different default 
> settings compare to local? This makes it difficult to rely on local tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18770) Emitting element fails in KryoSerializer

2020-07-30 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167864#comment-17167864
 ] 

Aljoscha Krettek commented on FLINK-18770:
--

Could you please post the code for the Flink Application as well? It would help 
figuring out why the {{KryoSerializer}} is being used. Also, could you please 
share exactly how you're running it locally and on the cluster?

> Emitting element fails in KryoSerializer
> 
>
> Key: FLINK-18770
> URL: https://issues.apache.org/jira/browse/FLINK-18770
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.11.1
> Environment: Flink 1.11.1, Linux
>Reporter: Leonid Ilyevsky
>Priority: Major
> Attachments: KryoException.txt, SolaceSource.java
>
>
> I wrote a simple Flink connector for Solace, see attached java file. It works 
> fine under local execution environment. However, when I deployed it in the 
> real Flink cluster, it failed with the Kryo exception, see attached.
> After a few hours of search and debugging, I can see now what is going on.
> The data I want to emit from this source is a simple byte array. In the 
> exception stack you can see that when I call 'collect' on the context, it 
> goes into OperatorChain.java:715, and then to KryoSerializer, where it 
> ultimately fails. I didn't have a chance to learn what KryoSerializer is and 
> why it would not know what to do with byte[], but that is not the point now.
> Then I used debugger in my local test, in order to figure out how it manages 
> to work. I saw that after OperatorChain.java:715 it goes into 
> BytePrimitiveArraySerializer, and then everything is working as expected. 
> Obviously BytePrimitiveArraySerializer makes sense for byte[] data.
> The question is, how can I configure the execution environment under cluster 
> so that it does serialization the same way as the local one? I looked at 
> [https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html]
>  , and I was thinking of setting disableForceKryo, but it says it is disabled 
> by default anyway.
>  
> Another question is, why cluster execution environment has different default 
> settings compare to local? This makes it difficult to rely on local tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18770) Emitting element fails in KryoSerializer

2020-07-30 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18770:
-
Component/s: API / Type Serialization System

> Emitting element fails in KryoSerializer
> 
>
> Key: FLINK-18770
> URL: https://issues.apache.org/jira/browse/FLINK-18770
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.11.1
> Environment: Flink 1.11.1, Linux
>Reporter: Leonid Ilyevsky
>Priority: Major
> Attachments: KryoException.txt, SolaceSource.java
>
>
> I wrote a simple Flink connector for Solace, see attached java file. It works 
> fine under local execution environment. However, when I deployed it in the 
> real Flink cluster, it failed with the Kryo exception, see attached.
> After a few hours of search and debugging, I can see now what is going on.
> The data I want to emit from this source is a simple byte array. In the 
> exception stack you can see that when I call 'collect' on the context, it 
> goes into OperatorChain.java:715, and then to KryoSerializer, where it 
> ultimately fails. I didn't have a chance to learn what KryoSerializer is and 
> why it would not know what to do with byte[], but that is not the point now.
> Then I used debugger in my local test, in order to figure out how it manages 
> to work. I saw that after OperatorChain.java:715 it goes into 
> BytePrimitiveArraySerializer, and then everything is working as expected. 
> Obviously BytePrimitiveArraySerializer makes sense for byte[] data.
> The question is, how can I configure the execution environment under cluster 
> so that it does serialization the same way as the local one? I looked at 
> [https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html]
>  , and I was thinking of setting disableForceKryo, but it says it is disabled 
> by default anyway.
>  
> Another question is, why cluster execution environment has different default 
> settings compare to local? This makes it difficult to rely on local tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18606) Remove generic parameter from SinkFunction.Context

2020-07-29 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-18606.

Fix Version/s: 1.12.0
   Resolution: Fixed

master: 4343e00bc07a3f31a585915da75adb232a9fa974

> Remove generic parameter from SinkFunction.Context
> -
>
> Key: FLINK-18606
> URL: https://issues.apache.org/jira/browse/FLINK-18606
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> As discussed on the mailing list 
> https://lists.apache.org/thread.html/ra72d406e262f3b30ef4df95e8e4ba2d765859203499be3b6d5cd59a2%40%3Cdev.flink.apache.org%3E
> The SinkFunction.Context  interface is a generic that does not use this 
> generic parameter.
> In most places where this interface is used the generic parameter is omitted 
> and thus gives many warnings about using "raw types".
> This is to try to remove this generic parameter and asses the impact.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18752) Yarn ship logic should support files

2020-07-29 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167288#comment-17167288
 ] 

Aljoscha Krettek commented on FLINK-18752:
--

That sounds sensible!

[~tison] [~fly_in_gis] [~kkloudas] You remember why it's only directories right 
now?

> Yarn ship logic should support files
> 
>
> Key: FLINK-18752
> URL: https://issues.apache.org/jira/browse/FLINK-18752
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.12.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>
> The --yarnship / -yt CLI parameter only supports shipping directories. In 
> many cases it would be practical to support shipping single files as well.
> Is there any good reason why only directories are supported at the moment?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18749) Correct dependencies in Kubernetes pom

2020-07-29 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-18749:


Assignee: Yang Wang  (was: Aljoscha Krettek)

> Correct dependencies in Kubernetes pom
> --
>
> Key: FLINK-18749
> URL: https://issues.apache.org/jira/browse/FLINK-18749
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.11.1
>Reporter: Yang Wang
>Assignee: Yang Wang
>Priority: Major
> Fix For: 1.11.2
>
>
> Inspired when developing this PR[1], i find some unused dependency(e.g. 
> {{com.github.mifmif:generex:1.0.2}}) in flink-kubernetes/pom.xml. It will be 
> good if could remove it.
>  
> [1]. 
> [https://github.com/apache/flink/pull/12995/commits/8519f65321ba24c5164196a67a05d98fb268f490]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18749) Correct dependencies in Kubernetes pom

2020-07-29 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-18749:


Assignee: Aljoscha Krettek  (was: Yang Wang)

> Correct dependencies in Kubernetes pom
> --
>
> Key: FLINK-18749
> URL: https://issues.apache.org/jira/browse/FLINK-18749
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.11.1
>Reporter: Yang Wang
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.11.2
>
>
> Inspired when developing this PR[1], i find some unused dependency(e.g. 
> {{com.github.mifmif:generex:1.0.2}}) in flink-kubernetes/pom.xml. It will be 
> good if could remove it.
>  
> [1]. 
> [https://github.com/apache/flink/pull/12995/commits/8519f65321ba24c5164196a67a05d98fb268f490]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18741) ProcessWindowFunction's process function exception

2020-07-29 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167276#comment-17167276
 ] 

Aljoscha Krettek commented on FLINK-18741:
--

I don't quite get what's wrong. Is it throwing an exception?

> ProcessWindowFunction's  process function exception
> ---
>
> Key: FLINK-18741
> URL: https://issues.apache.org/jira/browse/FLINK-18741
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use ProcessWindowFunction to achieve PV calculation, but when rewriting 
> process, the user-defined state value cannot be returned。
> code:
> {code:java}
> tem.keyBy(x =>
>   (x._1, x._2, x._4, x._5, x._6, x._7, x._8))
>   .timeWindow(Time.seconds(15 * 60)) //15 min window
>   .process(new ProcessWindowFunction[(String, String, String, String, 
> String, String, String, String, String), CkResult, (String, String, String, 
> String, String, String, String), TimeWindow] {
>   var clickCount: ValueState[Long] = _
> *  var requestCount: ValueState[Long] = _
> *  var returnCount: ValueState[Long] = _
>   var videoCount: ValueState[Long] = _
>   var noVideoCount: ValueState[Long] = _
>   override def open(parameters: Configuration): Unit = {
> clickCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("clickCount", classOf[Long]))
>* requestCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("requestCount", classOf[Long]))*
> returnCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("returnCount", classOf[Long]))
> videoCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("videoCount", classOf[Long]))
> noVideoCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("noVideoCount", classOf[Long]))
>   }
>   override def process(key: (String, String, String, String, String, 
> String, String), context: Context, elements: Iterable[(String, String, 
> String, String, String, String, String, String, String)], out: 
> Collector[CkResult]) = {
> try {
>   var clickNum: Long = clickCount.value
>   val dateNow = 
> LocalDateTime.now().format(DateTimeFormatter.ofPattern("MMdd")).toLong
>   var requestNum: Long = requestCount.value
>   var returnNum: Long = returnCount.value
>   var videoNum: Long = videoCount.value
>   var noVideoNum: Long = noVideoCount.value
>   if (requestNum == null) {
> requestNum = 0
>   }
>   
>   val ecpm = key._7.toDouble.formatted("%.2f").toFloat
>   val created_at = getSecondTimestampTwo(new Date)
>  
> *  elements.foreach(e => {
> if ("adreq".equals(e._3)) {
>   requestNum += 1
>   println(key._1, requestNum)
> }
>   })
>   requestCount.update(requestNum)
>   println(requestNum, key._1)*
>   
>   out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * 
> 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, 
> key._6, key._1, requestCount.value, returnCount.value, fill_rate, 
> noVideoCount.value + videoCount.value,
> expose_rate, clickCount.value, click_rate, ecpm, 
> (noVideoCount.value * ecpm + videoCount.value * ecpm / 
> 1000.toFloat).formatted("%.2f").toFloat, created_at))
> }
> catch {
>   case e: Exception => println(key, e)
> }
>   }
> })
> {code}
> {code:java}
>   elements.foreach(e => {
> if ("adreq".equals(e._3)) {
>   requestNum += 1
>   println(key._1, requestNum)
> // The values printed here like :
> //(key,1)
> //(key,2)
> //(key,3)
> }
>   })
> //But print outside the for loop always like :
> //(key,0)
>   println(requestNum, key._1)
> {code}
> who can help me ,plz thx。



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18742) Some configuration args do not take effect at client

2020-07-29 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167118#comment-17167118
 ] 

Aljoscha Krettek commented on FLINK-18742:
--

[~fly_in_gis] is right, please open a PR [~wangm92] 

> Some configuration args do not take effect at client
> 
>
> Key: FLINK-18742
> URL: https://issues.apache.org/jira/browse/FLINK-18742
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Affects Versions: 1.11.1
>Reporter: Matt Wang
>Assignee: Matt Wang
>Priority: Major
>
> Some configuration args from command line will not work at client, for 
> example, the job sets the {color:#505f79}_classloader.resolve-order_{color} 
> to _{color:#505f79}parent-first,{color}_ it can work at TaskManager, but 
> Client doesn't.
> The *FlinkUserCodeClassLoaders* will be created before calling the method of 
> _{color:#505f79}getEffectiveConfiguration(){color}_ at 
> {color:#505f79}org.apache.flink.client.cli.CliFrontend{color}, so the 
> _{color:#505f79}Configuration{color}_ used by 
> _{color:#505f79}PackagedProgram{color}_ does not include Configuration args.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18742) Some configuration args do not take effect at client

2020-07-29 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-18742:


Assignee: Matt Wang

> Some configuration args do not take effect at client
> 
>
> Key: FLINK-18742
> URL: https://issues.apache.org/jira/browse/FLINK-18742
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Affects Versions: 1.11.1
>Reporter: Matt Wang
>Assignee: Matt Wang
>Priority: Major
>
> Some configuration args from command line will not work at client, for 
> example, the job sets the {color:#505f79}_classloader.resolve-order_{color} 
> to _{color:#505f79}parent-first,{color}_ it can work at TaskManager, but 
> Client doesn't.
> The *FlinkUserCodeClassLoaders* will be created before calling the method of 
> _{color:#505f79}getEffectiveConfiguration(){color}_ at 
> {color:#505f79}org.apache.flink.client.cli.CliFrontend{color}, so the 
> _{color:#505f79}Configuration{color}_ used by 
> _{color:#505f79}PackagedProgram{color}_ does not include Configuration args.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18746) WindowStaggerTest.testWindowStagger failed

2020-07-29 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-18746.

Fix Version/s: 1.12.0
   Resolution: Fixed

The problem was that WindowStagger.RANDOM is inclusive on the start and 
exclusive on the end of the interval while the test was assuming that the start 
is also exclusive.

master: 6efa8393be1dd7b648ad3c612c6df22068b601d2

cc [~TengHu]

> WindowStaggerTest.testWindowStagger failed
> --
>
> Key: FLINK-18746
> URL: https://issues.apache.org/jira/browse/FLINK-18746
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=4975=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=7c61167f-30b3-5893-cc38-a9e3d057e392
> {code}
> 2020-07-28T21:16:30.1350624Z [ERROR] Tests run: 1, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 0.145 s <<< FAILURE! - in 
> org.apache.flink.streaming.runtime.operators.windowing.WindowStaggerTest
> 2020-07-28T21:16:30.1352065Z [ERROR] 
> testWindowStagger(org.apache.flink.streaming.runtime.operators.windowing.WindowStaggerTest)
>   Time elapsed: 0.012 s  <<< FAILURE!
> 2020-07-28T21:16:30.1352701Z java.lang.AssertionError
> 2020-07-28T21:16:30.1353104Z  at org.junit.Assert.fail(Assert.java:86)
> 2020-07-28T21:16:30.1353810Z  at org.junit.Assert.assertTrue(Assert.java:41)
> 2020-07-28T21:16:30.1354289Z  at org.junit.Assert.assertTrue(Assert.java:52)
> 2020-07-28T21:16:30.1354914Z  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowStaggerTest.testWindowStagger(WindowStaggerTest.java:38)
> 2020-07-28T21:16:30.1355520Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-07-28T21:16:30.1356060Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-07-28T21:16:30.1356663Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-07-28T21:16:30.1357220Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-07-28T21:16:30.1357775Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-07-28T21:16:30.1358383Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-07-28T21:16:30.1358986Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-07-28T21:16:30.1359623Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-07-28T21:16:30.1360187Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-07-28T21:16:30.1360740Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-07-28T21:16:30.1361364Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-07-28T21:16:30.1361916Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-07-28T21:16:30.1362432Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-07-28T21:16:30.1362976Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-07-28T21:16:30.1363516Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-07-28T21:16:30.1364041Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-07-28T21:16:30.1364568Z  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2020-07-28T21:16:30.1365139Z  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> 2020-07-28T21:16:30.1365764Z  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> 2020-07-28T21:16:30.1366413Z  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> 2020-07-28T21:16:30.1367036Z  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> 2020-07-28T21:16:30.1367671Z  at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> 2020-07-28T21:16:30.1368337Z  at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> 2020-07-28T21:16:30.1368956Z  at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> 2020-07-28T21:16:30.1369530Z  at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18746) WindowStaggerTest.testWindowStagger failed

2020-07-29 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-18746:


Assignee: Aljoscha Krettek

> WindowStaggerTest.testWindowStagger failed
> --
>
> Key: FLINK-18746
> URL: https://issues.apache.org/jira/browse/FLINK-18746
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Aljoscha Krettek
>Priority: Major
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=4975=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=7c61167f-30b3-5893-cc38-a9e3d057e392
> {code}
> 2020-07-28T21:16:30.1350624Z [ERROR] Tests run: 1, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 0.145 s <<< FAILURE! - in 
> org.apache.flink.streaming.runtime.operators.windowing.WindowStaggerTest
> 2020-07-28T21:16:30.1352065Z [ERROR] 
> testWindowStagger(org.apache.flink.streaming.runtime.operators.windowing.WindowStaggerTest)
>   Time elapsed: 0.012 s  <<< FAILURE!
> 2020-07-28T21:16:30.1352701Z java.lang.AssertionError
> 2020-07-28T21:16:30.1353104Z  at org.junit.Assert.fail(Assert.java:86)
> 2020-07-28T21:16:30.1353810Z  at org.junit.Assert.assertTrue(Assert.java:41)
> 2020-07-28T21:16:30.1354289Z  at org.junit.Assert.assertTrue(Assert.java:52)
> 2020-07-28T21:16:30.1354914Z  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowStaggerTest.testWindowStagger(WindowStaggerTest.java:38)
> 2020-07-28T21:16:30.1355520Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-07-28T21:16:30.1356060Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-07-28T21:16:30.1356663Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-07-28T21:16:30.1357220Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-07-28T21:16:30.1357775Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-07-28T21:16:30.1358383Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-07-28T21:16:30.1358986Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-07-28T21:16:30.1359623Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-07-28T21:16:30.1360187Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-07-28T21:16:30.1360740Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-07-28T21:16:30.1361364Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-07-28T21:16:30.1361916Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-07-28T21:16:30.1362432Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-07-28T21:16:30.1362976Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-07-28T21:16:30.1363516Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-07-28T21:16:30.1364041Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-07-28T21:16:30.1364568Z  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2020-07-28T21:16:30.1365139Z  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> 2020-07-28T21:16:30.1365764Z  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> 2020-07-28T21:16:30.1366413Z  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> 2020-07-28T21:16:30.1367036Z  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> 2020-07-28T21:16:30.1367671Z  at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> 2020-07-28T21:16:30.1368337Z  at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> 2020-07-28T21:16:30.1368956Z  at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> 2020-07-28T21:16:30.1369530Z  at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18627) Get unmatch filter method records to side output

2020-07-28 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17166445#comment-17166445
 ] 

Aljoscha Krettek commented on FLINK-18627:
--

That will not produce the desired output, {{stream.getSideOutput()}} will only 
get the side output from the last operation. That's what I was hinting at 
above: I don't know how we can provide an ergonomic API for the pattern of 
chaining multiple filters.

> Get unmatch filter method records to side output
> 
>
> Key: FLINK-18627
> URL: https://issues.apache.org/jira/browse/FLINK-18627
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Roey Shem Tov
>Priority: Major
> Fix For: 1.12.0
>
>
> Unmatch records to filter functions should send somehow to side output.
> Example:
>  
> {code:java}
> datastream
> .filter(i->i%2==0)
> .sideOutput(oddNumbersSideOutput);
> {code}
>  
>  
> That's way we can filter multiple times and send the filtered records to our 
> side output instead of dropping it immediatly, it can be useful in many ways.
>  
> What do you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18281) Add WindowStagger into all Tumbling and Sliding Windows

2020-07-28 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-18281.

Fix Version/s: 1.12.0
 Assignee: Teng Hu
   Resolution: Fixed

master: 335c47e11478358e8514e63ca807ea765ed9dd9a

> Add WindowStagger into all Tumbling and Sliding Windows
> ---
>
> Key: FLINK-18281
> URL: https://issues.apache.org/jira/browse/FLINK-18281
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Teng Hu
>Assignee: Teng Hu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Adding the window staggering functionality into *TumblingEventTimeWindows*, 
> *SlidingProcessingTimeWindows* and *SlidingEventTimeWindows*.
> This is a follow-up issue of 
> [FLINK-12855|https://issues.apache.org/jira/browse/FLINK-12855]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-1140) Add a flatmap that returns an Iterable instead of aggregation to a collector

2020-07-28 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-1140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-1140.
---
Resolution: Abandoned

I'm closing this as "Abandoned", since there is no more activity and the code 
base has moved on quite a bit. Please re-open this if you feel otherwise and 
work should continue.

> Add a flatmap that returns an Iterable instead of aggregation to a collector 
> -
>
> Key: FLINK-1140
> URL: https://issues.apache.org/jira/browse/FLINK-1140
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet
>Affects Versions: 0.8.0
>Reporter: Márton Balassi
>Assignee: Márton Balassi
>Priority: Major
>
> This would enable more concise lambda function usage like the flatmap here:
> flatMap(sentence -> Arrays.asList(sentence.split(" ")))



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-3613) Add standard deviation, mean, variance to list of Aggregations

2020-07-28 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-3613.
---
Resolution: Abandoned

I'm closing this as "Abandoned", since there is no more activity and the code 
base has moved on quite a bit. Please re-open this if you feel otherwise and 
work should continue.

> Add standard deviation, mean, variance to list of Aggregations
> --
>
> Key: FLINK-3613
> URL: https://issues.apache.org/jira/browse/FLINK-3613
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet
>Reporter: Todd Lisonbee
>Priority: Minor
> Attachments: DataSet-Aggregation-Design-March2016-v1.txt
>
>
> Implement standard deviation, mean, variance for 
> org.apache.flink.api.java.aggregation.Aggregations
> Ideally implementation should be single pass and numerically stable.
> References:
> "Scalable and Numerically Stable Descriptive Statistics in SystemML", Tian et 
> al, International Conference on Data Engineering 2012
> http://dl.acm.org/citation.cfm?id=2310392
> "The Kahan summation algorithm (also known as compensated summation) reduces 
> the numerical errors that occur when adding a sequence of finite precision 
> floating point numbers. Numerical errors arise due to truncation and 
> rounding. These errors can lead to numerical instability when calculating 
> variance."
> https://en.wikipedia.org/wiki/Kahan_summation_algorithm



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-7561) Add support for pre-aggregation in DataStream API

2020-07-28 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-7561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7561.
---
Resolution: Abandoned

I'm closing this as "Abandoned", since there is no more activity and the code 
base has moved on quite a bit. Please re-open this if you feel otherwise and 
work should continue.

> Add support for pre-aggregation in DataStream API
> -
>
> Key: FLINK-7561
> URL: https://issues.apache.org/jira/browse/FLINK-7561
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18627) Get unmatch filter method records to side output

2020-07-28 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17166214#comment-17166214
 ] 

Aljoscha Krettek commented on FLINK-18627:
--

I tend to agree with [~sjwiesman] here: {{filter()}} classically is an 
operation that only filters data while {{split}} (or using OutputTags) allows 
splitting a stream into multiple streams.

I don't yet see how you would actually get the {{corruptedData}} in your 
example:
{code}
final OutputTag curruptedData = new OutputTag("side-output"){};

datastream
.filter(i->i%2==0).sideOutFilteredRecords(curruptedData)
.filter(i->i%3==0).sideOutFilteredRecords(curruptedData)
.filter(i->i%4==0).sideOutFilteredRecords(curruptedData)
.filter(i->i%5==0).sideOutFilteredRecords(curruptedData)
{code}

What would be the code to get a stream of all the {{corruptedData}} records?

> Get unmatch filter method records to side output
> 
>
> Key: FLINK-18627
> URL: https://issues.apache.org/jira/browse/FLINK-18627
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Roey Shem Tov
>Priority: Major
> Fix For: 1.12.0
>
>
> Unmatch records to filter functions should send somehow to side output.
> Example:
>  
> {code:java}
> datastream
> .filter(i->i%2==0)
> .sideOutput(oddNumbersSideOutput);
> {code}
>  
>  
> That's way we can filter multiple times and send the filtered records to our 
> side output instead of dropping it immediatly, it can be useful in many ways.
>  
> What do you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17159) ES6 ElasticsearchSinkITCase unstable

2020-07-28 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17166205#comment-17166205
 ] 

Aljoscha Krettek commented on FLINK-17159:
--

I think the reason is just that the embedded ES node is not coming fully up 
before we try and run the tests.

> ES6 ElasticsearchSinkITCase unstable
> 
>
> Key: FLINK-17159
> URL: https://issues.apache.org/jira/browse/FLINK-17159
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7482=logs=64110e28-73be-50d7-9369-8750330e0bf1=aa84fb9a-59ae-5696-70f7-011bc086e59b]
> {code:java}
> 2020-04-15T02:37:04.4289477Z [ERROR] 
> testElasticsearchSinkWithSmile(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase)
>   Time elapsed: 0.145 s  <<< ERROR!
> 2020-04-15T02:37:04.4290310Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-04-15T02:37:04.4290790Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> 2020-04-15T02:37:04.4291404Z  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659)
> 2020-04-15T02:37:04.4291956Z  at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77)
> 2020-04-15T02:37:04.4292548Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
> 2020-04-15T02:37:04.4293254Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticSearchSinkTest(ElasticsearchSinkTestBase.java:128)
> 2020-04-15T02:37:04.4293990Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticsearchSinkSmileTest(ElasticsearchSinkTestBase.java:106)
> 2020-04-15T02:37:04.4295096Z  at 
> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase.testElasticsearchSinkWithSmile(ElasticsearchSinkITCase.java:45)
> 2020-04-15T02:37:04.4295923Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-04-15T02:37:04.4296489Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-04-15T02:37:04.4297076Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-04-15T02:37:04.4297513Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-04-15T02:37:04.4297951Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-04-15T02:37:04.4298688Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-04-15T02:37:04.4299374Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-04-15T02:37:04.4300069Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-04-15T02:37:04.4300960Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-04-15T02:37:04.4301705Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-15T02:37:04.4302204Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-04-15T02:37:04.4302661Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-04-15T02:37:04.4303234Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-04-15T02:37:04.4303706Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-04-15T02:37:04.4304127Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-04-15T02:37:04.4304716Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-04-15T02:37:04.4305394Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-04-15T02:37:04.4305965Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-04-15T02:37:04.4306425Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2020-04-15T02:37:04.4306942Z  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2020-04-15T02:37:04.4307466Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4307920Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4308375Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4308782Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-15T02:37:04.4309182Z  at 
> 

[jira] [Updated] (FLINK-18724) Integration with DataStream and DataSet API report error

2020-07-28 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18724:
-
Component/s: (was: API / Core)

> Integration with DataStream and DataSet API report error 
> -
>
> Key: FLINK-18724
> URL: https://issues.apache.org/jira/browse/FLINK-18724
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / API
>Affects Versions: 1.11.1
>Reporter: liang ding
>Priority: Major
>
> I want to create a table from a DataStream(kafka) : there is two reason I 
> need to use DataStream:
>  # I need decode msg to columns by custom format, in sql mode I don't known 
> how to do it.
>  # I have realize DeserializationSchema or FlatMapFunction both. when use 
> datastream I can do many things before it become a suitable table, that is my 
> prefer way in any other apply.
>  so I do it like that:
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings tSet= 
> EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv=StreamTableEnvironment.create(env,tSet);
> DataStream stream = env
> .addSource(new FlinkKafkaConsumer<>("test-log", new 
> SimpleStringSchema(), properties))
> .flatMap(new LogParser());
> //stream.printToErr();
> tEnv.fromDataStream(stream).select("userId,city").execute().print();
> tEnv.execute("test-sql");
> //env.execute("test");
> {code}
> then I got message:
> {noformat}
>  [Kafka Fetcher for Source: Custom Source -> Flat Map ->* -> select: 
> (userId,city) -> to: Row (3/3)] INFO 
> org.apache.kafka.clients.FetchSessionHandler - [Consumer 
> clientId=consumer-flink-3-5, groupId=flink-3] Node 0 sent an invalid full 
> fetch response with extra=(test-log-0, response=(
>  [Kafka Fetcher for Source: Custom Source -> Flat Map ->* -> select: 
> (userId,city) -> to: Row (3/3)] INFO 
> org.apache.kafka.clients.FetchSessionHandler - [Consumer 
> clientId=consumer-flink-3-5, groupId=flink-3] Node 0 sent an invalid full 
> fetch response with extra=(test-log-1, response=({noformat}
> it seen like both StreamExecutionEnvironment and StreamTableEnvironment start 
> the fetcher and make no one successed.
> and there is no guide Integration which made me confused: should I do 
> env.execute or 
>  tableEnv.execute or both(it's seen not) ? and the blink planner way



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11779) CLI ignores -m parameter if high-availability is ZOOKEEPER

2020-07-27 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17165676#comment-17165676
 ] 

Aljoscha Krettek commented on FLINK-11779:
--

And {{jobmanager.rpc.address}}? That can't really be used from the client 
anymore but is still the fallback for {{rest.address}}. Maybe we should remove 
that as a fallback because it is the server-side config?

> CLI ignores -m parameter if high-availability is ZOOKEEPER 
> ---
>
> Key: FLINK-11779
> URL: https://issues.apache.org/jira/browse/FLINK-11779
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Gary Yao
>Assignee: leesf
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> *Description*
> The CLI will ignores the host/port provided by the {{-m}} parameter if 
> {{high-availability: ZOOKEEPER}} is configured in {{flink-conf.yaml}}
> *Expected behavior*
> * TBD: either document this behavior or give precedence to {{-m}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11779) CLI ignores -m parameter if high-availability is ZOOKEEPER

2020-07-27 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17165650#comment-17165650
 ] 

Aljoscha Krettek commented on FLINK-11779:
--

Turns out this is a smidge more complicated. Should only the {{-m}} parameter 
take precedence or also a {{rest.address}} in {{flink-conf.yaml}}? Currently, 
in the part of the code where we create the client HA services, we cannot 
differentiate between the two cases. Plus, currently {{jobmanager.rpc.address}} 
is the fallback option for {{rest.address}} so most setups would get 
{{localhost}} as the {{rest.address}}.

Also, should there be a single {{rest.address} for both the server side and 
client side? This is a bigger and somewhat orthogonal question, though.

> CLI ignores -m parameter if high-availability is ZOOKEEPER 
> ---
>
> Key: FLINK-11779
> URL: https://issues.apache.org/jira/browse/FLINK-11779
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Gary Yao
>Assignee: leesf
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> *Description*
> The CLI will ignores the host/port provided by the {{-m}} parameter if 
> {{high-availability: ZOOKEEPER}} is configured in {{flink-conf.yaml}}
> *Expected behavior*
> * TBD: either document this behavior or give precedence to {{-m}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18702) Flink elasticsearch connector leaks threads and classloaders thereof

2020-07-24 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-18702:


Assignee: Jun Qin

> Flink elasticsearch connector leaks threads and classloaders thereof
> 
>
> Key: FLINK-18702
> URL: https://issues.apache.org/jira/browse/FLINK-18702
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.10.0, 1.10.1
>Reporter: Jun Qin
>Assignee: Jun Qin
>Priority: Major
>
> Flink elasticsearch connector leaking threads and classloaders thereof.  This 
> results in OOM Metaspace when ES sink fails and restarted many times. 
> This issue is visible in Flink 1.10 but not in 1.11 because Flink 1.11 does 
> not create new class loaders in case of recoveries (FLINK-16408)
>  
> Reproduction:
>  * Start a job with ES sink in a Flink 1.10 cluster, without starting the ES 
> cluster.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18478) AvroDeserializationSchema does not work with types generated by avrohugger

2020-07-24 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18478:
-
Fix Version/s: (was: 1.11.1)
   (was: 1.12.0)
   (was: 1.10.2)

> AvroDeserializationSchema does not work with types generated by avrohugger
> --
>
> Key: FLINK-18478
> URL: https://issues.apache.org/jira/browse/FLINK-18478
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
>
> The main problem is that the code in {{SpecificData.createSchema()}} tries to 
> reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
> generated classes. However, avrohugger generates this field in a companion 
> object, which the reflective Java code will therefore not find.
> This is also described in these ML threads:
>  * 
> [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
>  * 
> [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18478) AvroDeserializationSchema does not work with types generated by avrohugger

2020-07-24 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-18478:


Assignee: (was: Aljoscha Krettek)

> AvroDeserializationSchema does not work with types generated by avrohugger
> --
>
> Key: FLINK-18478
> URL: https://issues.apache.org/jira/browse/FLINK-18478
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> The main problem is that the code in {{SpecificData.createSchema()}} tries to 
> reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
> generated classes. However, avrohugger generates this field in a companion 
> object, which the reflective Java code will therefore not find.
> This is also described in these ML threads:
>  * 
> [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
>  * 
> [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (FLINK-18478) AvroDeserializationSchema does not work with types generated by avrohugger

2020-07-24 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-18478:
--

> AvroDeserializationSchema does not work with types generated by avrohugger
> --
>
> Key: FLINK-18478
> URL: https://issues.apache.org/jira/browse/FLINK-18478
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> The main problem is that the code in {{SpecificData.createSchema()}} tries to 
> reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
> generated classes. However, avrohugger generates this field in a companion 
> object, which the reflective Java code will therefore not find.
> This is also described in these ML threads:
>  * 
> [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
>  * 
> [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18692) AvroSerializationSchema does not work with types generated by avrohugger

2020-07-24 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18692:
-
Labels: starter  (was: pull-request-available)

> AvroSerializationSchema does not work with types generated by avrohugger
> 
>
> Key: FLINK-18692
> URL: https://issues.apache.org/jira/browse/FLINK-18692
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Aljoscha Krettek
>Priority: Major
>  Labels: starter
>
> The main problem is that the code in {{SpecificData.createSchema()}} tries to 
> reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
> generated classes. However, avrohugger generates this field in a companion 
> object, which the reflective Java code will therefore not find.
> This is also described in these ML threads:
>  * 
> [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
>  * 
> [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18692) AvroSerializationSchema does not work with types generated by avrohugger

2020-07-24 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-18692:


Assignee: (was: Aljoscha Krettek)

> AvroSerializationSchema does not work with types generated by avrohugger
> 
>
> Key: FLINK-18692
> URL: https://issues.apache.org/jira/browse/FLINK-18692
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
>
> The main problem is that the code in {{SpecificData.createSchema()}} tries to 
> reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
> generated classes. However, avrohugger generates this field in a companion 
> object, which the reflective Java code will therefore not find.
> This is also described in these ML threads:
>  * 
> [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
>  * 
> [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18478) AvroDeserializationSchema does not work with types generated by avrohugger

2020-07-24 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17164261#comment-17164261
 ] 

Aljoscha Krettek commented on FLINK-18478:
--

So, the reason why it's deserializing the record as a generic record is that it 
cannot find the {{Tweet}} class. If you put the class in the right Java/Scala 
package ({{com.github.geoheil.streamingreference}} in your example) you will 
get one step further. Then it fails with this, though:

{code}
Exception in thread "main" org.apache.avro.AvroRuntimeException: 
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: 
org.apache.avro.AvroRuntimeException: Not a Specific class: class 
org.apache.flink.streaming.scala.examples.windowing.Tweet
at 
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227)
at 
org.apache.avro.specific.SpecificDatumReader.setSchema(SpecificDatumReader.java:74)
at 
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:134)
at 
org.apache.flink.streaming.scala.examples.windowing.AvroBug$.main(WindowWordCount.scala:67)
at 
org.apache.flink.streaming.scala.examples.windowing.AvroBug.main(WindowWordCount.scala)
Caused by: 
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: 
org.apache.avro.AvroRuntimeException: Not a Specific class: class 
org.apache.flink.streaming.scala.examples.windowing.Tweet
at 
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
at 
avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
at 
avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
at 
avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
at 
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
... 4 more
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class 
org.apache.flink.streaming.scala.examples.windowing.Tweet
at 
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
at 
avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at 
avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at 
avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at 
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
... 8 more
{code}

The reason here is that Avro-internal code tries to access the {{SCHEMA$}} 
field, which is not there for avrohugger-generated classes. The field is in the 
companion object, but that's not the same as a {{static}} the the class itself, 
which is what Avro expects.

[~dwysakowicz] The problematic call is this in {{AvroDeserializationSchema}}:
{code}
datumReader.setSchema(readerSchema);
{code}

maybe we could get around this by creating encoders/serializers/whatnot 
differently but I'm slowly getting a bit fed-up with avrohugger.

> AvroDeserializationSchema does not work with types generated by avrohugger
> --
>
> Key: FLINK-18478
> URL: https://issues.apache.org/jira/browse/FLINK-18478
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> The main problem is that the code in {{SpecificData.createSchema()}} tries to 
> reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
> generated classes. However, avrohugger generates this field in a companion 
> object, which the reflective Java code will therefore not find.
> This is also described in these ML threads:
>  * 
> [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
>  * 
> [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18692) AvroSerializationSchema does not work with types generated by avrohugger

2020-07-24 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18692:
-
Fix Version/s: (was: 1.11.1)
   (was: 1.12.0)
   (was: 1.10.2)

> AvroSerializationSchema does not work with types generated by avrohugger
> 
>
> Key: FLINK-18692
> URL: https://issues.apache.org/jira/browse/FLINK-18692
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
>
> The main problem is that the code in {{SpecificData.createSchema()}} tries to 
> reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
> generated classes. However, avrohugger generates this field in a companion 
> object, which the reflective Java code will therefore not find.
> This is also described in these ML threads:
>  * 
> [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
>  * 
> [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18693) AvroSerializationSchema does not work with types generated by avrohugger

2020-07-24 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18693:
-
Fix Version/s: (was: 1.11.1)
   (was: 1.12.0)
   (was: 1.10.2)

> AvroSerializationSchema does not work with types generated by avrohugger
> 
>
> Key: FLINK-18693
> URL: https://issues.apache.org/jira/browse/FLINK-18693
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
>
> The main problem is that the code in {{SpecificData.createSchema()}} tries to 
> reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
> generated classes. However, avrohugger generates this field in a companion 
> object, which the reflective Java code will therefore not find.
> This is also described in these ML threads:
>  * 
> [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
>  * 
> [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18478) AvroDeserializationSchema does not work with types generated by avrohugger

2020-07-24 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163801#comment-17163801
 ] 

Aljoscha Krettek edited comment on FLINK-18478 at 7/24/20, 8:15 AM:


This is a simpler reproducer:
{code:java}
object AvroBug {

  def main(args: Array[String]): Unit = {

val deserSchema = AvroDeserializationSchema.forSpecific(classOf[Tweet])
val serSchema = AvroSerializationSchema.forSpecific(classOf[Tweet])

val tweet = Tweet(Some("a"))

val serializedTweet = serSchema.serialize(tweet)
val deserializedTweet: Tweet = deserSchema.deserialize(serializedTweet)

println(s"Tweet: $deserializedTweet")
  }

}

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */

import scala.annotation.switch

/**
 * Twitter tweet record limited to basic information
 * @param tweet_id System-assigned numeric tweet ID. Cannot be changed by the 
user.
 */
final case class Tweet(var tweet_id: Option[String]) extends 
org.apache.avro.specific.SpecificRecordBase {
  def this() = this(None)
  def get(field$: Int): AnyRef = {
(field$: @switch) match {
  case 0 => {
tweet_id match {
  case Some(x) => x
  case None => null
}
  }.asInstanceOf[AnyRef]
  case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
  }
  def put(field$: Int, value: Any): Unit = {
(field$: @switch) match {
  case 0 => this.tweet_id = {
value match {
  case null => None
  case _ => Some(value.toString)
}
  }.asInstanceOf[Option[String]]
  case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
()
  }
  def getSchema: org.apache.avro.Schema = Tweet.SCHEMA$
}

object Tweet {
  val SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Tweet\",\"namespace\":\"com.github.geoheil.streamingreference\",\"doc\":\"Twitter
 tweet record limited to basic 
information\",\"fields\":[{\"name\":\"tweet_id\",\"type\":[\"null\",\"string\"],\"doc\":\"System-assigned
 numeric tweet ID. Cannot be changed by the user.\"}]}")
}
{code}
The problem is that Avro will deserialize the Tweet as a 
{{GenericData$Record}}, [~dwysakowicz] do you know if this is the expected 
behaviour here?

Also, I found another problem while working on the reproducer: FLINK-18692. The 
example only works when this is fixed, by changing {{checkAvroInitialized()}} 
in there to use
{code:java}
SpecificData specificData = new SpecificData(cl);
Schema schema = 
AvroFactory.extractAvroSpecificSchema(recordClazz, specificData);
{code}


was (Author: aljoscha):
This is a simpler reproducer:
{code}
object AvroBug {

  def main(args: Array[String]): Unit = {

val deserSchema = AvroDeserializationSchema.forSpecific(classOf[Tweet])
val serSchema = AvroSerializationSchema.forSpecific(classOf[Tweet])

val tweet = Tweet(Some("a"))

val serializedTweet = serSchema.serialize(tweet)
val deserializedTweet: Tweet = deserSchema.deserialize(serializedTweet)

println(s"Tweet: $deserializedTweet")
  }

}

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */

import scala.annotation.switch

/**
 * Twitter tweet record limited to basic information
 * @param tweet_id System-assigned numeric tweet ID. Cannot be changed by the 
user.
 */
final case class Tweet(var tweet_id: Option[String]) extends 
org.apache.avro.specific.SpecificRecordBase {
  def this() = this(None)
  def get(field$: Int): AnyRef = {
(field$: @switch) match {
  case 0 => {
tweet_id match {
  case Some(x) => x
  case None => null
}
  }.asInstanceOf[AnyRef]
  case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
  }
  def put(field$: Int, value: Any): Unit = {
(field$: @switch) match {
  case 0 => this.tweet_id = {
value match {
  case null => None
  case _ => Some(value.toString)
}
  }.asInstanceOf[Option[String]]
  case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
()
  }
  def getSchema: org.apache.avro.Schema = Tweet.SCHEMA$
}

object Tweet {
  val SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Tweet\",\"namespace\":\"com.github.geoheil.streamingreference\",\"doc\":\"Twitter
 tweet record limited to basic 
information\",\"fields\":[{\"name\":\"tweet_id\",\"type\":[\"null\",\"string\"],\"doc\":\"System-assigned
 numeric tweet ID. Cannot be changed by the user.\"}]}")
}
{code}

The problem is that Avro will deserialize the Tweet as a 
{{GenericData$Record}}, [~dwysakowicz] do you know if this is the expected 
behaviour here?

Also, I found another problem while working on the reproducer: FLINK-18693. The 
example only works when this is fixed, by changing {{checkAvroInitialized()}} 
in there to use 

[jira] [Commented] (FLINK-18693) AvroSerializationSchema does not work with types generated by avrohugger

2020-07-24 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17164249#comment-17164249
 ] 

Aljoscha Krettek commented on FLINK-18693:
--

Yes, thanks! Something went wrong with my connection and I cloned it twice by 
mistake. 

> AvroSerializationSchema does not work with types generated by avrohugger
> 
>
> Key: FLINK-18693
> URL: https://issues.apache.org/jira/browse/FLINK-18693
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> The main problem is that the code in {{SpecificData.createSchema()}} tries to 
> reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
> generated classes. However, avrohugger generates this field in a companion 
> object, which the reflective Java code will therefore not find.
> This is also described in these ML threads:
>  * 
> [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
>  * 
> [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18478) AvroDeserializationSchema does not work with types generated by avrohugger

2020-07-23 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163801#comment-17163801
 ] 

Aljoscha Krettek commented on FLINK-18478:
--

This is a simpler reproducer:
{code}
object AvroBug {

  def main(args: Array[String]): Unit = {

val deserSchema = AvroDeserializationSchema.forSpecific(classOf[Tweet])
val serSchema = AvroSerializationSchema.forSpecific(classOf[Tweet])

val tweet = Tweet(Some("a"))

val serializedTweet = serSchema.serialize(tweet)
val deserializedTweet: Tweet = deserSchema.deserialize(serializedTweet)

println(s"Tweet: $deserializedTweet")
  }

}

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */

import scala.annotation.switch

/**
 * Twitter tweet record limited to basic information
 * @param tweet_id System-assigned numeric tweet ID. Cannot be changed by the 
user.
 */
final case class Tweet(var tweet_id: Option[String]) extends 
org.apache.avro.specific.SpecificRecordBase {
  def this() = this(None)
  def get(field$: Int): AnyRef = {
(field$: @switch) match {
  case 0 => {
tweet_id match {
  case Some(x) => x
  case None => null
}
  }.asInstanceOf[AnyRef]
  case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
  }
  def put(field$: Int, value: Any): Unit = {
(field$: @switch) match {
  case 0 => this.tweet_id = {
value match {
  case null => None
  case _ => Some(value.toString)
}
  }.asInstanceOf[Option[String]]
  case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
()
  }
  def getSchema: org.apache.avro.Schema = Tweet.SCHEMA$
}

object Tweet {
  val SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Tweet\",\"namespace\":\"com.github.geoheil.streamingreference\",\"doc\":\"Twitter
 tweet record limited to basic 
information\",\"fields\":[{\"name\":\"tweet_id\",\"type\":[\"null\",\"string\"],\"doc\":\"System-assigned
 numeric tweet ID. Cannot be changed by the user.\"}]}")
}
{code}

The problem is that Avro will deserialize the Tweet as a 
{{GenericData$Record}}, [~dwysakowicz] do you know if this is the expected 
behaviour here?

Also, I found another problem while working on the reproducer: FLINK-18693. The 
example only works when this is fixed, by changing {{checkAvroInitialized()}} 
in there to use 
{code}
SpecificData specificData = new SpecificData(cl);
Schema schema = 
AvroFactory.extractAvroSpecificSchema(recordClazz, specificData);
{code}

> AvroDeserializationSchema does not work with types generated by avrohugger
> --
>
> Key: FLINK-18478
> URL: https://issues.apache.org/jira/browse/FLINK-18478
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> The main problem is that the code in {{SpecificData.createSchema()}} tries to 
> reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
> generated classes. However, avrohugger generates this field in a companion 
> object, which the reflective Java code will therefore not find.
> This is also described in these ML threads:
>  * 
> [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
>  * 
> [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18693) AvroSerializationSchema does not work with types generated by avrohugger

2020-07-23 Thread Aljoscha Krettek (Jira)
Aljoscha Krettek created FLINK-18693:


 Summary: AvroSerializationSchema does not work with types 
generated by avrohugger
 Key: FLINK-18693
 URL: https://issues.apache.org/jira/browse/FLINK-18693
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 1.10.2, 1.12.0, 1.11.1


The main problem is that the code in {{SpecificData.createSchema()}} tries to 
reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
generated classes. However, avrohugger generates this field in a companion 
object, which the reflective Java code will therefore not find.

This is also described in these ML threads:
 * 
[https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
 * 
[https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18692) AvroSerializationSchema does not work with types generated by avrohugger

2020-07-23 Thread Aljoscha Krettek (Jira)
Aljoscha Krettek created FLINK-18692:


 Summary: AvroSerializationSchema does not work with types 
generated by avrohugger
 Key: FLINK-18692
 URL: https://issues.apache.org/jira/browse/FLINK-18692
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 1.10.2, 1.12.0, 1.11.1


The main problem is that the code in {{SpecificData.createSchema()}} tries to 
reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
generated classes. However, avrohugger generates this field in a companion 
object, which the reflective Java code will therefore not find.

This is also described in these ML threads:
 * 
[https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
 * 
[https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18643) Migrate Jenkins jobs to ci-builds.apache.org

2020-07-23 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163570#comment-17163570
 ] 

Aljoscha Krettek commented on FLINK-18643:
--

Just the other day I had a user asking if nightly builds are available because 
they wanted to see if a fix that was not yet released is working for them. See 
here: 
https://issues.apache.org/jira/browse/FLINK-18478?focusedCommentId=17158523=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17158523

> Migrate Jenkins jobs to ci-builds.apache.org
> 
>
> Key: FLINK-18643
> URL: https://issues.apache.org/jira/browse/FLINK-18643
> Project: Flink
>  Issue Type: Improvement
>  Components: Release System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>
> Infra is [reworking the Jenkins 
> setup|https://lists.apache.org/thread.html/re974eed417a1bc294694701d5c91b4bf92689fcf32a4c91f169be87d%40%3Cbuilds.apache.org%3E],
>  so we have to migrate our jobs that do the snapshot deployments.
> Alternatively, find other ways to do this (Azure?) to reduce number of used 
> infrastructure services.
> /cc [~rmetzger]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18685) JobClient.getAccumulators() blocks until streaming job has finished in local environment

2020-07-23 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163560#comment-17163560
 ] 

Aljoscha Krettek commented on FLINK-18685:
--

For me the only option would be to fix the behaviour of the minicluster client. 
The API is clearly meant for asynchronous use.

> JobClient.getAccumulators() blocks until streaming job has finished in local 
> environment
> 
>
> Key: FLINK-18685
> URL: https://issues.apache.org/jira/browse/FLINK-18685
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Priority: Major
>  Labels: starter
>
> *Steps to reproduce:*
> {code:java}
> JobClient client = env.executeAsync("Test");
> CompletableFuture status = client.getJobStatus();
> LOG.info("status = " + status.get());
> CompletableFuture> accumulators = 
> client.getAccumulators(StreamingJob.class.getClassLoader());
> LOG.info("accus = " + accumulators.get(5, TimeUnit.SECONDS));
> {code}
> *Actual behavior*
> The accumulators future will never complete for a streaming job when calling 
> this just in your main() method from the IDE.
> *Expected behavior*
> Receive the accumulators of the running streaming job.
> The JavaDocs of the method state the following: "Accumulators can be 
> requested while it is running or after it has finished.". 
> While it is technically true that I can request accumulators, I was expecting 
> as a user that I can access the accumulators of a running job.
> Also, I can request accumulators if I submit the job to a cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18685) JobClient.getAccumulators() blocks until streaming job has finished in local environment

2020-07-23 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18685:
-
Labels: starter  (was: )

> JobClient.getAccumulators() blocks until streaming job has finished in local 
> environment
> 
>
> Key: FLINK-18685
> URL: https://issues.apache.org/jira/browse/FLINK-18685
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Priority: Major
>  Labels: starter
>
> *Steps to reproduce:*
> {code:java}
> JobClient client = env.executeAsync("Test");
> CompletableFuture status = client.getJobStatus();
> LOG.info("status = " + status.get());
> CompletableFuture> accumulators = 
> client.getAccumulators(StreamingJob.class.getClassLoader());
> LOG.info("accus = " + accumulators.get(5, TimeUnit.SECONDS));
> {code}
> *Actual behavior*
> The accumulators future will never complete for a streaming job when calling 
> this just in your main() method from the IDE.
> *Expected behavior*
> Receive the accumulators of the running streaming job.
> The JavaDocs of the method state the following: "Accumulators can be 
> requested while it is running or after it has finished.". 
> While it is technically true that I can request accumulators, I was expecting 
> as a user that I can access the accumulators of a running job.
> Also, I can request accumulators if I submit the job to a cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18398) ElasticSearch unavailibility causes TM shutdown

2020-07-23 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-18398.

Resolution: Won't Fix

I'm closing this for now since it seems to be caused by 
https://github.com/elastic/elasticsearch/issues/47599 which we cannot fix from 
Flink.

> ElasticSearch unavailibility causes TM shutdown
> ---
>
> Key: FLINK-18398
> URL: https://issues.apache.org/jira/browse/FLINK-18398
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.10.0
>Reporter: Alexander Fedulov
>Priority: Critical
> Attachments: elastic_jm_log.txt, elastic_tm_log.txt
>
>
> Similarly to [FLINK-17327|https://issues.apache.org/jira/browse/FLINK-17327], 
> unavailibility of ElasticSearch cluster causes Tasks cancellation to timeout 
> and Task Manager to be killed. The following exceptions can be found in the 
> logs:
>  
> {code:java}
> 2020-06-15 19:52:03.664Z ERROR [  I/O dispatcher 229] 
> .f.s.c.e.ElasticsearchSinkBase : Failed Elasticsearch bulk request: request 
> retries exceeded max retry timeout [3]java.io.IOException: request 
> retries exceeded max retry timeout [3]
> ...
> 2020-06-15 19:55:03.861Z  WARN [43df85ee0f907ae9d0).] 
> o.a.f.r.taskmanager.Task   : Task 'graph53 (1/1)' did not react to 
> cancelling signal for 30 seconds, but is stuck in method:
>  org.elasticsearch.action.bulk.BulkProcessor.flush(BulkProcessor.java:356)
> ...
> 2020-06-15 19:55:04.120Z ERROR [663038f87ef09c4da6).] 
> o.a.f.r.taskmanager.Task   : Task did not exit gracefully within 180 + 
> seconds.
> 2020-06-15 19:55:04.121Z ERROR [663038f87ef09c4da6).] o.a.f.r.t.TaskExecutor  
>: Task did not exit gracefully within 180 + seconds.
> 2020-06-15 19:55:04.121Z ERROR [663038f87ef09c4da6).] 
> o.a.f.r.t.TaskManagerRunner: Fatal error occurred while executing the 
> TaskManager. Shutting it down...
> {code}
> Detailed logs  are attached.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18650) The description of dispatcher in Flink Architecture document is not accurate

2020-07-22 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-18650.

Fix Version/s: 1.11.2
   1.12.0
   Resolution: Fixed

release-1.11: 2a62f91d41552e481ce0ac5e7e238103ed6fae30
master: 23f65064db0d3aca6b725b14e0bd2cb4b7f6cc64

> The description of dispatcher in Flink Architecture document is not accurate
> 
>
> Key: FLINK-18650
> URL: https://issues.apache.org/jira/browse/FLINK-18650
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.11.0
>Reporter: Peng
>Assignee: Peng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2
>
>
> The description of dispatcher is like below:
> The _Dispatcher_ provides a REST interface to submit Flink applications for 
> execution and _{color:#de350b}*starts a new JobManager*{color}_ for each 
> submitted job. It also runs the Flink WebUI to provide information about job 
> executions.
>  
> As I understand it, is it "starts a new *{color:#de350b}JobMaster{color}*" 
> rather than JobManager?  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-18650) The description of dispatcher in Flink Architecture document is not accurate

2020-07-21 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-18650:


Assignee: Peng

> The description of dispatcher in Flink Architecture document is not accurate
> 
>
> Key: FLINK-18650
> URL: https://issues.apache.org/jira/browse/FLINK-18650
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.11.0
>Reporter: Peng
>Assignee: Peng
>Priority: Minor
>  Labels: pull-request-available
>
> The description of dispatcher is like below:
> The _Dispatcher_ provides a REST interface to submit Flink applications for 
> execution and _{color:#de350b}*starts a new JobManager*{color}_ for each 
> submitted job. It also runs the Flink WebUI to provide information about job 
> executions.
>  
> As I understand it, is it "starts a new *{color:#de350b}JobMaster{color}*" 
> rather than JobManager?  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18629) ConnectedStreams#keyBy can not derive key TypeInformation for lambda KeySelectors

2020-07-21 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161876#comment-17161876
 ] 

Aljoscha Krettek commented on FLINK-18629:
--

I'm happy with either approach. Currently, if you did use different keys it 
will just break later, when it tries to assemble the operator, right?

> ConnectedStreams#keyBy can not derive key TypeInformation for lambda 
> KeySelectors
> -
>
> Key: FLINK-18629
> URL: https://issues.apache.org/jira/browse/FLINK-18629
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.10.0, 1.11.0, 1.12.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Critical
> Fix For: 1.10.2, 1.12.0, 1.11.2
>
>
> Following test fails:
> {code}
>   @Test
>   public void testKeyedConnectedStreamsType() {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   DataStreamSource stream1 = env.fromElements(1, 2);
>   DataStreamSource stream2 = env.fromElements(1, 2);
>   ConnectedStreams connectedStreams = 
> stream1.connect(stream2)
>   .keyBy(v -> v, v -> v);
>   KeyedStream firstKeyedInput = (KeyedStream) 
> connectedStreams.getFirstInput();
>   KeyedStream secondKeyedInput = (KeyedStream) 
> connectedStreams.getSecondInput();
>   assertThat(firstKeyedInput.getKeyType(), equalTo(Types.INT));
>   assertThat(secondKeyedInput.getKeyType(), equalTo(Types.INT));
>   }
> {code}
> The problem is that the wildcard type is evaluated as {{Object}} for lambdas, 
> which in turn produces {{GenericTypeInfo}} for any KeySelector 
> provided as lambda.
> I suggest changing the method signature to:
> {code}
>   public  ConnectedStreams keyBy(
>   KeySelector keySelector1,
>   KeySelector keySelector2)
> {code}
> This would be a code compatible change. Might break the compatibility of 
> state backend (would change derived key type info). 
> Still there would be a workaround to use the second method for old programs:
> {code}
>   public  ConnectedStreams keyBy(
>   KeySelector keySelector1,
>   KeySelector keySelector2,
>   TypeInformation keyType)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12751) Create file based HA support

2020-07-21 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-12751:
-
Component/s: (was: FileSystems)
 Runtime / Coordination
 Runtime / Checkpointing

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18650) The description of dispatcher in Flink Architecture document is not accurate

2020-07-21 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161841#comment-17161841
 ] 

Aljoscha Krettek commented on FLINK-18650:
--

Yes, this is wrong. Do you want to open a PR to fix it?

> The description of dispatcher in Flink Architecture document is not accurate
> 
>
> Key: FLINK-18650
> URL: https://issues.apache.org/jira/browse/FLINK-18650
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.11.0
>Reporter: Peng
>Priority: Minor
>
> The description of dispatcher is like below:
> The _Dispatcher_ provides a REST interface to submit Flink applications for 
> execution and _{color:#de350b}*starts a new JobManager*{color}_ for each 
> submitted job. It also runs the Flink WebUI to provide information about job 
> executions.
>  
> As I understand it, is it "starts a new *{color:#de350b}JobMaster{color}*" 
> rather than JobManager?  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18649) Add a MongoDB Connector with Exactly-Once Semantics

2020-07-21 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161840#comment-17161840
 ] 

Aljoscha Krettek commented on FLINK-18649:
--

I think this could be implemented either like the Cassandra connector, where we 
use a write-ahead log, or potentially like the ES connector where don't use a 
log but instead rely on idempotent writes and overwrite in case of recovery.

> Add a MongoDB Connector with Exactly-Once Semantics
> ---
>
> Key: FLINK-18649
> URL: https://issues.apache.org/jira/browse/FLINK-18649
> Project: Flink
>  Issue Type: Wish
>  Components: Connectors / Common
>Reporter: Eric Holsinger
>Priority: Minor
>
> Before taking the Flink Plunge, and per the following recommendation, I'm 
> opening a Jira ticket to see if someone can provide me with a formal 
> recommendation for obtaining exactly-once semantics for MongoDB:
>  
> [https://stackoverflow.com/questions/35158683/kafka-flink-datastream-mongodb]
>  
> FYI, we cannot use Kafka or any other framework other than Flink and MongoDB, 
> and we have constraints as to what can be installed in production.
>  
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18649) Add a MongoDB Connector with Exactly-Once Semantics

2020-07-21 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18649:
-
Summary: Add a MongoDB Connector with Exactly-Once Semantics  (was: MongoDB 
Exactly-Once Semantics - recommended approach)

> Add a MongoDB Connector with Exactly-Once Semantics
> ---
>
> Key: FLINK-18649
> URL: https://issues.apache.org/jira/browse/FLINK-18649
> Project: Flink
>  Issue Type: Wish
>  Components: Connectors / Common
>Reporter: Eric Holsinger
>Priority: Minor
>
> Before taking the Flink Plunge, and per the following recommendation, I'm 
> opening a Jira ticket to see if someone can provide me with a formal 
> recommendation for obtaining exactly-once semantics for MongoDB:
>  
> [https://stackoverflow.com/questions/35158683/kafka-flink-datastream-mongodb]
>  
> FYI, we cannot use Kafka or any other framework other than Flink and MongoDB, 
> and we have constraints as to what can be installed in production.
>  
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18649) MongoDB Exactly-Once Semantics - recommended approach

2020-07-21 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18649:
-
Component/s: (was: API / DataStream)
 Connectors / Common

> MongoDB Exactly-Once Semantics - recommended approach
> -
>
> Key: FLINK-18649
> URL: https://issues.apache.org/jira/browse/FLINK-18649
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Eric Holsinger
>Priority: Minor
>
> Before taking the Flink Plunge, and per the following recommendation, I'm 
> opening a Jira ticket to see if someone can provide me with a formal 
> recommendation for obtaining exactly-once semantics for MongoDB:
>  
> [https://stackoverflow.com/questions/35158683/kafka-flink-datastream-mongodb]
>  
> FYI, we cannot use Kafka or any other framework other than Flink and MongoDB, 
> and we have constraints as to what can be installed in production.
>  
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18649) MongoDB Exactly-Once Semantics - recommended approach

2020-07-21 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18649:
-
Issue Type: New Feature  (was: Task)

> MongoDB Exactly-Once Semantics - recommended approach
> -
>
> Key: FLINK-18649
> URL: https://issues.apache.org/jira/browse/FLINK-18649
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Eric Holsinger
>Priority: Minor
>
> Before taking the Flink Plunge, and per the following recommendation, I'm 
> opening a Jira ticket to see if someone can provide me with a formal 
> recommendation for obtaining exactly-once semantics for MongoDB:
>  
> [https://stackoverflow.com/questions/35158683/kafka-flink-datastream-mongodb]
>  
> FYI, we cannot use Kafka or any other framework other than Flink and MongoDB, 
> and we have constraints as to what can be installed in production.
>  
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18649) MongoDB Exactly-Once Semantics - recommended approach

2020-07-21 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18649:
-
Issue Type: Wish  (was: New Feature)

> MongoDB Exactly-Once Semantics - recommended approach
> -
>
> Key: FLINK-18649
> URL: https://issues.apache.org/jira/browse/FLINK-18649
> Project: Flink
>  Issue Type: Wish
>  Components: Connectors / Common
>Reporter: Eric Holsinger
>Priority: Minor
>
> Before taking the Flink Plunge, and per the following recommendation, I'm 
> opening a Jira ticket to see if someone can provide me with a formal 
> recommendation for obtaining exactly-once semantics for MongoDB:
>  
> [https://stackoverflow.com/questions/35158683/kafka-flink-datastream-mongodb]
>  
> FYI, we cannot use Kafka or any other framework other than Flink and MongoDB, 
> and we have constraints as to what can be installed in production.
>  
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18637) Key group is not in KeyGroupRange

2020-07-20 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18637:
-
Component/s: Runtime / State Backends

> Key group is not in KeyGroupRange
> -
>
> Key: FLINK-18637
> URL: https://issues.apache.org/jira/browse/FLINK-18637
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
> Environment: Version: 1.10.0, Rev:, Date:
> OS current user: yarn
>  Current Hadoop/Kerberos user: hadoop
>  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.141-b15
>  Maximum heap size: 28960 MiBytes
>  JAVA_HOME: /usr/java/jdk1.8.0_141/jre
>  Hadoop version: 2.8.5-amzn-6
>  JVM Options:
>  -Xmx30360049728
>  -Xms30360049728
>  -XX:MaxDirectMemorySize=4429185024
>  -XX:MaxMetaspaceSize=1073741824
>  -XX:+UseG1GC
>  -XX:+UnlockDiagnosticVMOptions
>  -XX:+G1SummarizeConcMark
>  -verbose:gc
>  -XX:+PrintGCDetails
>  -XX:+PrintGCDateStamps
>  -XX:+UnlockCommercialFeatures
>  -XX:+FlightRecorder
>  -XX:+DebugNonSafepoints
>  
> -XX:FlightRecorderOptions=defaultrecording=true,settings=/home/hadoop/heap.jfc,dumponexit=true,dumponexitpath=/var/lib/hadoop-yarn/recording.jfr,loglevel=info
>  
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1593935560662_0002/container_1593935560662_0002_01_02/taskmanager.log
>  -Dlog4j.configuration=[file:./log4j.properties|file:///log4j.properties]
>  Program Arguments:
>  -Dtaskmanager.memory.framework.off-heap.size=134217728b
>  -Dtaskmanager.memory.network.max=1073741824b
>  -Dtaskmanager.memory.network.min=1073741824b
>  -Dtaskmanager.memory.framework.heap.size=134217728b
>  -Dtaskmanager.memory.managed.size=23192823744b
>  -Dtaskmanager.cpu.cores=7.0
>  -Dtaskmanager.memory.task.heap.size=30225832000b
>  -Dtaskmanager.memory.task.off-heap.size=3221225472b
>  --configDir.
>  
> -Djobmanager.rpc.address=ip-10-180-30-250.us-west-2.compute.internal-Dweb.port=0
>  -Dweb.tmpdir=/tmp/flink-web-64f613cf-bf04-4a09-8c14-75c31b619574
>  -Djobmanager.rpc.port=33739
>  -Drest.address=ip-10-180-30-250.us-west-2.compute.internal
>Reporter: Ori Popowski
>Priority: Major
>
> I'm getting this error when creating a savepoint. I've read in 
> https://issues.apache.org/jira/browse/FLINK-16193 that it's caused by 
> unstable hashcode or equals on the key, or improper use of 
> {{reinterpretAsKeyedStream}}.
>   
>  My key is a string and I don't use {{reinterpretAsKeyedStream}}.
>  
> {code:java}
> senv
>   .addSource(source)
>   .flatMap(…)
>   .filterWith { case (metadata, _, _) => … }
>   .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(…))
>   .keyingBy { case (meta, _) => meta.toPathString }
>   .process(new TruncateLargeSessions(config.sessionSizeLimit))
>   .keyingBy { case (meta, _) => meta.toPathString }
>   .window(EventTimeSessionWindows.withGap(Time.of(…)))
>   .process(new ProcessSession(sessionPlayback, config))
>   .addSink(sink){code}
>  
> {code:java}
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
> 962fc8e984e7ca1ed65a038aa62ce124 failed.
>   at 
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
>   at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>   at 
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.CompletionException: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744)
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>   at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>   at 
> 

[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2020-07-20 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161371#comment-17161371
 ] 

Aljoscha Krettek commented on FLINK-11654:
--

I've assigned [~freezhan] for now, unless [~becket_qin] objects.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Assignee: freezhan
>Priority: Major
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2020-07-20 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-11654:


Assignee: freezhan  (was: Jiangjie Qin)

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Assignee: freezhan
>Priority: Major
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18478) AvroDeserializationSchema does not work with types generated by avrohugger

2020-07-20 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161370#comment-17161370
 ] 

Aljoscha Krettek commented on FLINK-18478:
--

Now you can actually try the Flink 1.11.1 release candiate: 
https://lists.apache.org/thread.html/r29ca31ff17e213465f839b5d20c039412234f0fb0faae06897967db9%40%3Cdev.flink.apache.org%3E

You can build against the staged jars by putting this in a {{settings.xml}}:
{code}


flink-1.11.1



  flink-1.11.1
  

  flink-1.11.1
  
  
https://repository.apache.org/content/repositories/orgapacheflink-1378/
  


  archetype
  
  
https://repository.apache.org/content/repositories/orgapacheflink-1378/
  

  



{code} 

And reference that in you maven commands via --settings path/to/settings.xml.

> AvroDeserializationSchema does not work with types generated by avrohugger
> --
>
> Key: FLINK-18478
> URL: https://issues.apache.org/jira/browse/FLINK-18478
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> The main problem is that the code in {{SpecificData.createSchema()}} tries to 
> reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
> generated classes. However, avrohugger generates this field in a companion 
> object, which the reflective Java code will therefore not find.
> This is also described in these ML threads:
>  * 
> [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
>  * 
> [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18629) ConnectedStreams#keyBy can not derive key TypeInformation for lambda KeySelectors

2020-07-20 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161359#comment-17161359
 ] 

Aljoscha Krettek commented on FLINK-18629:
--

Yes, I believe this was a mistake. But why not
{code:java}
public  ConnectedStreams keyBy(
KeySelector keySelector1,
KeySelector keySelector2) {code}
The key must be the same for the two streams after all.

> ConnectedStreams#keyBy can not derive key TypeInformation for lambda 
> KeySelectors
> -
>
> Key: FLINK-18629
> URL: https://issues.apache.org/jira/browse/FLINK-18629
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.10.0, 1.11.0, 1.12.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Critical
> Fix For: 1.10.2, 1.12.0, 1.11.2
>
>
> Following test fails:
> {code}
>   @Test
>   public void testKeyedConnectedStreamsType() {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   DataStreamSource stream1 = env.fromElements(1, 2);
>   DataStreamSource stream2 = env.fromElements(1, 2);
>   ConnectedStreams connectedStreams = 
> stream1.connect(stream2)
>   .keyBy(v -> v, v -> v);
>   KeyedStream firstKeyedInput = (KeyedStream) 
> connectedStreams.getFirstInput();
>   KeyedStream secondKeyedInput = (KeyedStream) 
> connectedStreams.getSecondInput();
>   assertThat(firstKeyedInput.getKeyType(), equalTo(Types.INT));
>   assertThat(secondKeyedInput.getKeyType(), equalTo(Types.INT));
>   }
> {code}
> The problem is that the wildcard type is evaluated as {{Object}} for lambdas, 
> which in turn produces {{GenericTypeInfo}} for any KeySelector 
> provided as lambda.
> I suggest changing the method signature to:
> {code}
>   public  ConnectedStreams keyBy(
>   KeySelector keySelector1,
>   KeySelector keySelector2)
> {code}
> This would be a code compatible change. Might break the compatibility of 
> state backend (would change derived key type info). 
> Still there would be a workaround to use the second method for old programs:
> {code}
>   public  ConnectedStreams keyBy(
>   KeySelector keySelector1,
>   KeySelector keySelector2,
>   TypeInformation keyType)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18599) Compile error when use windowAll and TumblingProcessingTimeWindows

2020-07-15 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18599:
-
Component/s: (was: API / DataStream)
 API / Scala

> Compile error when use windowAll and TumblingProcessingTimeWindows
> --
>
> Key: FLINK-18599
> URL: https://issues.apache.org/jira/browse/FLINK-18599
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala
>Affects Versions: 1.11.0
>Reporter: henvealf
>Priority: Major
>
> Code:
> {code:java}
> import org.apache.commons.lang3.StringUtils
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import 
> org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows}
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromElements("a", "b", "c")
> stream
>   .filter((str: String) => StringUtils.isNotEmpty(str))
>   .map( _ => 1)
>   .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>   .reduce((a1, a2) => a1 + a2)
>   .print()
> {code}
> Compile failed:
> {code:java}
> error: type mismatch;
>  found   : 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
>  required: 
> org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[_ >: Int, ?]
> Note: Object <: Any (and 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
>  <: 
> org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[Object,org.apache.flink.streaming.api.windowing.windows.TimeWindow]),
>  but Java-defined class WindowAssigner is invariant in type T.
> You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
>   .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>  ^
> one error found
> {code}
>  What went wrong?
>  Scala version: 2.11
>  Flink version: 1.11
>  Thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18599) Compile error when use windowAll and TumblingProcessingTimeWindows

2020-07-15 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17158241#comment-17158241
 ] 

Aljoscha Krettek commented on FLINK-18599:
--

There is a quick in the Scala compiler which causes {{windowAll}} (or 
{{window()}}) to not work when the type of the {{DataStream}} is a primitive 
type ({{Int}} in your case). If you change it to another type, for example 
{{("foo", 1)}} (a tuple type), the code will work when you also adapt the 
subsequent {{reduce()}}.  

> Compile error when use windowAll and TumblingProcessingTimeWindows
> --
>
> Key: FLINK-18599
> URL: https://issues.apache.org/jira/browse/FLINK-18599
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.0
>Reporter: henvealf
>Priority: Major
>
> Code:
> {code:java}
> import org.apache.commons.lang3.StringUtils
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import 
> org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows}
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromElements("a", "b", "c")
> stream
>   .filter((str: String) => StringUtils.isNotEmpty(str))
>   .map( _ => 1)
>   .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>   .reduce((a1, a2) => a1 + a2)
>   .print()
> {code}
> Compile failed:
> {code:java}
> error: type mismatch;
>  found   : 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
>  required: 
> org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[_ >: Int, ?]
> Note: Object <: Any (and 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
>  <: 
> org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[Object,org.apache.flink.streaming.api.windowing.windows.TimeWindow]),
>  but Java-defined class WindowAssigner is invariant in type T.
> You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
>   .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>  ^
> one error found
> {code}
>  What went wrong?
>  Scala version: 2.11
>  Flink version: 1.11
>  Thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18599) Compile error when use windowAll and TumblingProcessingTimeWindows

2020-07-15 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-18599.

Resolution: Not A Bug

> Compile error when use windowAll and TumblingProcessingTimeWindows
> --
>
> Key: FLINK-18599
> URL: https://issues.apache.org/jira/browse/FLINK-18599
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala
>Affects Versions: 1.11.0
>Reporter: henvealf
>Priority: Major
>
> Code:
> {code:java}
> import org.apache.commons.lang3.StringUtils
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import 
> org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows}
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromElements("a", "b", "c")
> stream
>   .filter((str: String) => StringUtils.isNotEmpty(str))
>   .map( _ => 1)
>   .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>   .reduce((a1, a2) => a1 + a2)
>   .print()
> {code}
> Compile failed:
> {code:java}
> error: type mismatch;
>  found   : 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
>  required: 
> org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[_ >: Int, ?]
> Note: Object <: Any (and 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
>  <: 
> org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[Object,org.apache.flink.streaming.api.windowing.windows.TimeWindow]),
>  but Java-defined class WindowAssigner is invariant in type T.
> You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
>   .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>  ^
> one error found
> {code}
>  What went wrong?
>  Scala version: 2.11
>  Flink version: 1.11
>  Thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18478) AvroDeserializationSchema does not work with types generated by avrohugger

2020-07-15 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17158218#comment-17158218
 ] 

Aljoscha Krettek commented on FLINK-18478:
--

>From the log you posted in the gist it seems that code is still using the 
>Flink 1.11.0 version of {{AvroDeserializationSchema}} which doesn't have the 
>fix.

You can try and use the snapshots repository and depend on version 
{{1.12-SNAPSHOT}}. In maven you can do it by adding this repository:
{code:java}


apache.snapshots
Apache Development Snapshot Repository

https://repository.apache.org/content/repositories/snapshots/

false


true



{code}
 

> AvroDeserializationSchema does not work with types generated by avrohugger
> --
>
> Key: FLINK-18478
> URL: https://issues.apache.org/jira/browse/FLINK-18478
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> The main problem is that the code in {{SpecificData.createSchema()}} tries to 
> reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
> generated classes. However, avrohugger generates this field in a companion 
> object, which the reflective Java code will therefore not find.
> This is also described in these ML threads:
>  * 
> [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
>  * 
> [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18589) java.io.EOFException when reading integer

2020-07-15 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18589:
-
Component/s: Runtime / State Backends

> java.io.EOFException when reading integer
> -
>
> Key: FLINK-18589
> URL: https://issues.apache.org/jira/browse/FLINK-18589
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
> Environment: {code:java}
> Starting YARN TaskExecutor runner (Version: 1.10.0, Rev:, 
> Date:)
> OS current user: yarn
> Current Hadoop/Kerberos user: hadoop
> JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.141-b15
> Maximum heap size: 28960 MiBytes
> JAVA_HOME: /usr/java/jdk1.8.0_141/jre
> Hadoop version: 2.8.5-amzn-6
> JVM Options:
>  -Xmx30360049728
>  -Xms30360049728
>  -XX:MaxDirectMemorySize=4429185024
>  -XX:MaxMetaspaceSize=1073741824
>  -XX:+UseG1GC
>  
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1594539266405_0024/container_1594539266405_0024_01_50/taskmanager.log
>  -Dlog4j.configuration=file:./log4j.properties
> Program Arguments:
>  -D taskmanager.memory.framework.off-heap.size=134217728b
>  -D taskmanager.memory.network.max=1073741824b
>  -D taskmanager.memory.network.min=1073741824b
>  -D taskmanager.memory.framework.heap.size=134217728b
>  -D taskmanager.memory.managed.size=23192823744b
>  -D taskmanager.cpu.cores=7.0
>  -D taskmanager.memory.task.heap.size=30225832000b
>  -D taskmanager.memory.task.off-heap.size=3221225472b
>  --configDir .
>  -Djobmanager.rpc.address=ip-10-180-28-59.us-west-2.compute.internal
>  -Dweb.port=0
>  -Dweb.tmpdir=/tmp/flink-web-90514561-0a82-48c4-a8b1-c23bf7d814c0
>  -Djobmanager.rpc.port=38887
>  -Drest.address=ip-10-180-28-59.us-west-2.compute.internal{code}
>Reporter: Ori Popowski
>Priority: Major
>
> Getting {{java.io.EOFException}} when calling {{value()}} on 
> {{ValueState[java.lang.Integer]}}
>  
> Stacktrace:
>  
> {code:java}
> 2020-07-13 19:25:11
> org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from 
> RocksDB.
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:92)
>   at 
> org.apache.flink.runtime.state.ttl.AbstractTtlDecorator.getWrappedWithTtlCheckAndUpdate(AbstractTtlDecorator.java:92)
>   at 
> org.apache.flink.runtime.state.ttl.AbstractTtlDecorator.getWithTtlCheckAndUpdate(AbstractTtlDecorator.java:84)
>   at 
> org.apache.flink.runtime.state.ttl.AbstractTtlState.getWithTtlCheckAndUpdate(AbstractTtlState.java:56)
>   at 
> org.apache.flink.runtime.state.ttl.TtlValueState.value(TtlValueState.java:44)
>   at 
> walkme.flink.TruncateLargeSessions.processElement(TruncateLargeSessions.scala:28)
>   at 
> walkme.flink.TruncateLargeSessions.processElement(TruncateLargeSessions.scala:13)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
>   at 
> org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:231)
>   at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:74)
>   at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:32)
>   at 
> org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:151)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
>   ... 18 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18587) java.lang.IllegalArgumentException: Position out of bounds when reading Long state

2020-07-15 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18587:
-
Component/s: Runtime / State Backends

> java.lang.IllegalArgumentException: Position out of bounds when reading Long 
> state
> --
>
> Key: FLINK-18587
> URL: https://issues.apache.org/jira/browse/FLINK-18587
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.0
> Environment: {code:java}
> Starting YARN TaskExecutor runner (Version: 1.10.0, Rev:, 
> Date:)
> OS current user: yarn
> Current Hadoop/Kerberos user: hadoop
> JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.141-b15
> Maximum heap size: 28960 MiBytes
> JAVA_HOME: /usr/java/jdk1.8.0_141/jre
> Hadoop version: 2.8.5-amzn-6
> JVM Options:
>  -Xmx30360049728
>  -Xms30360049728
>  -XX:MaxDirectMemorySize=4429185024
>  -XX:MaxMetaspaceSize=1073741824
>  -XX:+UseG1GC
>  
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1594539266405_0024/container_1594539266405_0024_01_50/taskmanager.log
>  -Dlog4j.configuration=file:./log4j.properties
> Program Arguments:
>  -D taskmanager.memory.framework.off-heap.size=134217728b
>  -D taskmanager.memory.network.max=1073741824b
>  -D taskmanager.memory.network.min=1073741824b
>  -D taskmanager.memory.framework.heap.size=134217728b
>  -D taskmanager.memory.managed.size=23192823744b
>  -D taskmanager.cpu.cores=7.0
>  -D taskmanager.memory.task.heap.size=30225832000b
>  -D taskmanager.memory.task.off-heap.size=3221225472b
>  --configDir .
>  -Djobmanager.rpc.address=ip-10-180-28-59.us-west-2.compute.internal
>  -Dweb.port=0
>  -Dweb.tmpdir=/tmp/flink-web-90514561-0a82-48c4-a8b1-c23bf7d814c0
>  -Djobmanager.rpc.port=38887
>  -Drest.address=ip-10-180-28-59.us-west-2.compute.internal{code}
>Reporter: Ori Popowski
>Priority: Major
>
> Getting {{java.lang.IllegalArgumentException: Position out of bounds}} when 
> calling {{value()}} on {{ValueState[java.lang.Long]}}
>  
> Stacktrace:
>  
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Position out of bounds.
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>   at 
> org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:368)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:189)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:116)
>   at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:84)
>   at 
> org.apache.flink.runtime.state.ttl.AbstractTtlDecorator.getWrappedWithTtlCheckAndUpdate(AbstractTtlDecorator.java:92)
>   at 
> org.apache.flink.runtime.state.ttl.AbstractTtlDecorator.getWithTtlCheckAndUpdate(AbstractTtlDecorator.java:84)
>   at 
> org.apache.flink.runtime.state.ttl.AbstractTtlState.getWithTtlCheckAndUpdate(AbstractTtlState.java:56)
>   at 
> org.apache.flink.runtime.state.ttl.TtlValueState.value(TtlValueState.java:44)
>   at 
> walkme.flink.TruncateLargeSessions.processElement(TruncateLargeSessions.scala:28)
>   at 
> walkme.flink.TruncateLargeSessions.processElement(TruncateLargeSessions.scala:14)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>   at 

[jira] [Updated] (FLINK-18577) Mail.run Blocked By the Legacy Source Thread

2020-07-15 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-18577:
-
Component/s: Runtime / Task

> Mail.run Blocked By the Legacy Source Thread
> 
>
> Key: FLINK-18577
> URL: https://issues.apache.org/jira/browse/FLINK-18577
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.11.0
>Reporter: zhangyunyun
>Priority: Major
>
> I found the Bocked log in the "Thread Dump", It like this:
> ```
> "Source: [Kafka:FILEBEAT_LOG] -> Flat Map -> Timestamps/Watermarks -> 
> [CategoryFill] (4/12)" Id=95 BLOCKED on java.lang.Object@297f1806 owned by 
> "Legacy Source Thread - Source: [Kafka:FILEBEAT_LOG] -> Flat Map -> 
> Timestamps/Watermarks -> [CategoryFill] (4/12)" Id=178
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:91)
> - blocked on java.lang.Object@297f1806
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> ```
> Is there somethine wrong with my code? What should I do?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


<    1   2   3   4   5   6   7   8   9   10   >