[jira] [Comment Edited] (FLINK-17969) Enhance Flink (Task) logging to include job name as context diagnostic information

2020-06-03 Thread Bhagavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125396#comment-17125396
 ] 

Bhagavan edited comment on FLINK-17969 at 6/3/20, 11:32 PM:


PR: https://github.com/apache/flink/pull/12472
I think we may also need to include a configuration option to disable MDC.


was (Author: dasbh):
PR: https://github.com/apache/flink/pull/12472

> Enhance Flink (Task) logging to include job name as context diagnostic 
> information
> --
>
> Key: FLINK-17969
> URL: https://issues.apache.org/jira/browse/FLINK-17969
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.10.0
>Reporter: Bhagavan
>Priority: Trivial
>  Labels: pull-request-available
>
> Problem statement:
> We use a shared session cluster (Standalone/Yarn) to execute jobs. All logs 
> from the cluster are shipped using log aggregation framework 
> (Logstash/Splunk) so that application diagnostic is easier.
> However, we are missing one vital information in the logline. i.e. Job name 
> so that we can filter the logs for a single job.
> Background
> Currently, Flink logging uses SLF4J as API to abstract away from concrete 
> logging implementation (log4j 1.x, Logback or log4j2) and configuration of 
> logging pattern and implementation can be configured at deployment, However, 
> there is no MDC info from framework indicating job context.
> Proposed improvement.
> Add jobName field to Task class so that we can add it as MDC when task thread 
> starts executing.
> Change is trivial and uses SLF4J MDC API.
> With this change, user can customise logging pattern to include MDC (e.g. in 
> Logback %X{jobName} )
> Change required.
> {code:java}
> @@ -319,6 +323,7 @@ public class Task implements Runnable, TaskSlotPayload, 
> TaskActions, PartitionPr
>  
> this.jobId = jobInformation.getJobId();
> +   this.jobName = jobInformation.getJobName();
> this.vertexId = taskInformation.getJobVertexId();
> @@ -530,8 +535,10 @@ public class Task implements Runnable, TaskSlotPayload, 
> TaskActions, PartitionPr
> @Override
> public void run() {
> try {
> +   MDC.put("jobName", this.jobName);
> doRun();
> } finally {
> +   MDC.remove("jobName");
> terminationFuture.complete(executionState);
> }
> }
> {code}
> if we are in agreement for this small change. Will raise PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17969) Enhance Flink (Task) logging to include job name as context diagnostic information

2020-06-03 Thread Bhagavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125396#comment-17125396
 ] 

Bhagavan commented on FLINK-17969:
--

PR: https://github.com/apache/flink/pull/12472

> Enhance Flink (Task) logging to include job name as context diagnostic 
> information
> --
>
> Key: FLINK-17969
> URL: https://issues.apache.org/jira/browse/FLINK-17969
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.10.0
>Reporter: Bhagavan
>Priority: Trivial
>  Labels: pull-request-available
>
> Problem statement:
> We use a shared session cluster (Standalone/Yarn) to execute jobs. All logs 
> from the cluster are shipped using log aggregation framework 
> (Logstash/Splunk) so that application diagnostic is easier.
> However, we are missing one vital information in the logline. i.e. Job name 
> so that we can filter the logs for a single job.
> Background
> Currently, Flink logging uses SLF4J as API to abstract away from concrete 
> logging implementation (log4j 1.x, Logback or log4j2) and configuration of 
> logging pattern and implementation can be configured at deployment, However, 
> there is no MDC info from framework indicating job context.
> Proposed improvement.
> Add jobName field to Task class so that we can add it as MDC when task thread 
> starts executing.
> Change is trivial and uses SLF4J MDC API.
> With this change, user can customise logging pattern to include MDC (e.g. in 
> Logback %X{jobName} )
> Change required.
> {code:java}
> @@ -319,6 +323,7 @@ public class Task implements Runnable, TaskSlotPayload, 
> TaskActions, PartitionPr
>  
> this.jobId = jobInformation.getJobId();
> +   this.jobName = jobInformation.getJobName();
> this.vertexId = taskInformation.getJobVertexId();
> @@ -530,8 +535,10 @@ public class Task implements Runnable, TaskSlotPayload, 
> TaskActions, PartitionPr
> @Override
> public void run() {
> try {
> +   MDC.put("jobName", this.jobName);
> doRun();
> } finally {
> +   MDC.remove("jobName");
> terminationFuture.complete(executionState);
> }
> }
> {code}
> if we are in agreement for this small change. Will raise PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17969) Enhance Flink (Task) logging to include job name as context diagnostic information

2020-05-27 Thread Bhagavan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhagavan updated FLINK-17969:
-
Description: 
Problem statement:
We use a shared session cluster (Standalone/Yarn) to execute jobs. All logs 
from the cluster are shipped using log aggregation framework (Logstash/Splunk) 
so that application diagnostic is easier.
However, we are missing one vital information in the logline. i.e. Job name so 
that we can filter the logs for a single job.

Background
Currently, Flink logging uses SLF4J as API to abstract away from concrete 
logging implementation (log4j 1.x, Logback or log4j2) and configuration of 
logging pattern and implementation can be configured at deployment, However, 
there is no MDC info from framework indicating job context.

Proposed improvement.

Add jobName field to Task class so that we can add it as MDC when task thread 
starts executing.

Change is trivial and uses SLF4J MDC API.

With this change, user can customise logging pattern to include MDC (e.g. in 
Logback %X{jobName} )

Change required.
{code:java}
@@ -319,6 +323,7 @@ public class Task implements Runnable, TaskSlotPayload, 
TaskActions, PartitionPr
 
this.jobId = jobInformation.getJobId();
+   this.jobName = jobInformation.getJobName();
this.vertexId = taskInformation.getJobVertexId();
@@ -530,8 +535,10 @@ public class Task implements Runnable, TaskSlotPayload, 
TaskActions, PartitionPr
@Override
public void run() {
try {
+   MDC.put("jobName", this.jobName);
doRun();
} finally {
+   MDC.remove("jobName");
terminationFuture.complete(executionState);
}
}
{code}

if we are in agreement for this small change. Will raise PR.

  was:
Problem statement:
We use a shared session cluster (Standalone/Yarn) to execute jobs. All logs 
from the cluster are shipped using log aggregation framework (Logstash/Splunk) 
so that application diagnostic is easier.
However, we are missing one vital information in the logline. i.e. Job name so 
that we can filter the logs for a single job.

Background
Currently, Flink logging uses SLF4J as API to abstract away from concrete 
logging implementation (log4j 1.x, Logback or log4j2) and configuration of 
logging pattern and implementation can be configured at deployment, However, 
there is no MDC info from framework indicating job context.

Proposed improvement.

Add jobName field to Task class so that we can add it as MDC when task thread 
starts executing.

Change is trivial and uses SLF4J MDC API.

With this change, user can customise logging pattern to include MDC (e.g. in 
Logback [%X{jobName}])

Change required.
{code:java}
@@ -319,6 +323,7 @@ public class Task implements Runnable, TaskSlotPayload, 
TaskActions, PartitionPr
 
this.jobId = jobInformation.getJobId();
+   this.jobName = jobInformation.getJobName();
this.vertexId = taskInformation.getJobVertexId();
@@ -530,8 +535,10 @@ public class Task implements Runnable, TaskSlotPayload, 
TaskActions, PartitionPr
@Override
public void run() {
try {
+   MDC.put("jobName", this.jobName);
doRun();
} finally {
+   MDC.remove("jobName");
terminationFuture.complete(executionState);
}
}
{code}

if we are in agreement for this small change. Will raise PR.


> Enhance Flink (Task) logging to include job name as context diagnostic 
> information
> --
>
> Key: FLINK-17969
> URL: https://issues.apache.org/jira/browse/FLINK-17969
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.10.0
>Reporter: Bhagavan
>Priority: Trivial
>
> Problem statement:
> We use a shared session cluster (Standalone/Yarn) to execute jobs. All logs 
> from the cluster are shipped using log aggregation framework 
> (Logstash/Splunk) so that application diagnostic is easier.
> However, we are missing one vital information in the logline. i.e. Job name 
> so that we can filter the logs for a single job.
> Background
> Currently, Flink logging uses SLF4J as API to abstract away from concrete 
> logging implementation (log4j 1.x, Logback or log4j2) and configuration of 
> logging pattern and implementation can be configured at deployment, However, 
> there is no MDC info from framework indicating job context.
> Proposed improvement.
> Add jobName field to Task class so that we can add it as MDC when task thread 
> starts executing.
> Change is trivial and uses SLF4J MDC API.
> With 

[jira] [Created] (FLINK-17969) Enhance Flink (Task) logging to include job name as context diagnostic information

2020-05-27 Thread Bhagavan (Jira)
Bhagavan created FLINK-17969:


 Summary: Enhance Flink (Task) logging to include job name as 
context diagnostic information
 Key: FLINK-17969
 URL: https://issues.apache.org/jira/browse/FLINK-17969
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.10.0
Reporter: Bhagavan


Problem statement:
We use a shared session cluster (Standalone/Yarn) to execute jobs. All logs 
from the cluster are shipped using log aggregation framework (Logstash/Splunk) 
so that application diagnostic is easier.
However, we are missing one vital information in the logline. i.e. Job name so 
that we can filter the logs for a single job.

Background
Currently, Flink logging uses SLF4J as API to abstract away from concrete 
logging implementation (log4j 1.x, Logback or log4j2) and configuration of 
logging pattern and implementation can be configured at deployment, However, 
there is no MDC info from framework indicating job context.

Proposed improvement.

Add jobName field to Task class so that we can add it as MDC when task thread 
starts executing.

Change is trivial and uses SLF4J MDC API.

With this change, user can customise logging pattern to include MDC (e.g. in 
Logback [%X{jobName}])

Change required.
{code:java}
@@ -319,6 +323,7 @@ public class Task implements Runnable, TaskSlotPayload, 
TaskActions, PartitionPr
 
this.jobId = jobInformation.getJobId();
+   this.jobName = jobInformation.getJobName();
this.vertexId = taskInformation.getJobVertexId();
@@ -530,8 +535,10 @@ public class Task implements Runnable, TaskSlotPayload, 
TaskActions, PartitionPr
@Override
public void run() {
try {
+   MDC.put("jobName", this.jobName);
doRun();
} finally {
+   MDC.remove("jobName");
terminationFuture.complete(executionState);
}
}
{code}

if we are in agreement for this small change. Will raise PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17464) Stanalone HA Cluster crash with non-recoverable cluster state - need to wipe cluster to recover service

2020-05-13 Thread Bhagavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17106297#comment-17106297
 ] 

Bhagavan commented on FLINK-17464:
--

Maybe we need configuration parameter for standalone session cluster to " allow 
failure on recovering job graphs" and mark the job as failed instead of 
crashing the job manager.

> Stanalone HA Cluster crash with non-recoverable cluster state - need to wipe 
> cluster to recover service
> ---
>
> Key: FLINK-17464
> URL: https://issues.apache.org/jira/browse/FLINK-17464
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: John Lonergan
>Priority: Critical
>
> When recovering job graphs after a failover of the JobManager, or after a 
> restart of the cluster, the HA Cluster can get into a state where it cannot 
> be restarted and the only resoluton we have identified is to destroy the 
> Zookkeeper job graph store.
> This happens when any job graph that is being recovered throws an exception 
> during recovery on the master. 
> Whilst we encountered this issues on a sink that extends "InitialiseOnMaster" 
> we believe the vulnerability is generic in nature and the unrecolverable 
> problems encountered will occur if the application code throws any exception 
> for any reason during recovery on the main line. 
> These application exceptions propagate up to the JobManager ClusterEntryPoint 
> class at which point the JM leader does a system.exit. If there are remaining 
> JobManagers then they will also follow leader election and also encounter the 
> same sequence of events. Ultimately all JM's exit and then all TM's fail 
> also. 
> The entire cluster is destroyed.
> Because these events happen during job graph recovery then merely attempt a 
> restart of the cluster will fail leaving the only option as destroying the 
> job graph state. 
> If one is running a shared cluster with many jobs then this is effectively a 
> DOS and results in prolonged down time as code or data changes are necessary 
> to work around the issue.
> --
> Of course if the same exception were to be thrown during job submission using 
> the CLI, then we would not see the cluster crashing nor the cluster being 
> corrupted; the job would merely fail.
> Our feeling is that the job graph recovery process ought to behave in a 
> similar fashion to the job submission processes.
> If a job submission fails then the job is recorded as failed and there is no 
> further impact on the cluster. However, if job recovery fails then the entire 
> cluster is taken down, and may as we have seen, become inoperable.
> We feel that a failure to restore a single job graph ought merely to result 
> in the job being recorded as failed. It should not result in a cluster-wide 
> impact.
> We do not understand the logic of the design in this space. However, if the 
> existing logic was for the benefit of single job clusters then this is a poor 
> result for multi job clusters. In which case we ought to be able to configure 
> a cluster for "multi-job mode" so that job graph recovery is "sandboxed"  and 
> doesn't take out the entire cluster.
> ---
> It is easy to demonstrate the problem using the built in Flink streaming Word 
> Count example.
> In order for this to work you configure the job to write a single output file 
> and also write this to HDFS not to a local disk. 
> You will note that the class FileOutputFormat extends InitializeOnMaster and 
> the initializeGlobal() function executes only when the file is on HDFS, not 
> on local disk.
> When this functon runs it will generate an exception if the output already 
> exists.
> Therefore to demonstrate the issues do the following:
> - configure the job to write a single file to HDFS
> - configure the job to to read a large file so that the job takes some time 
> to execute and we have time to complete the next few steps bnefore the job 
> finishes.
> - run the job on a HA cluster with two JM nodes
> - wait for the job to start and the output file to be created
> - kill the leader JM before the job has finished 
> - observe JM failover occuring ... 
> - recovery during failover will NOT suceed because the recovery of the Word 
> Count job will fail due to the presence of the output file
> - observe all JM's and TM's ultimately terminating
> Once the cluster has outright failed then try and restart it.
> During restart the cluster will detect the presence of job graphs in Zk and 
> attempt to restore them. This however, is doomed due to the same 
> vulnerability that causes the global outage above.
> ---
> For operability Flink needs a mod such that the job graph recovery process is 
> entirely sandboxed and failure of a given job 

[jira] [Commented] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStatement

2020-01-22 Thread Bhagavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17021482#comment-17021482
 ] 

Bhagavan commented on FLINK-15728:
--

Also, I have found how to achieve upsert in Oracle and H2. I can add the 
dialect to this PR.

SQL
{code:sql}
MERGE INTO TAB t 
   USING (SELECT ? "id", ? "msg" FROM DUAL) v 
   ON ( t."id"=v."id")
WHEN MATCHED THEN 
   UPDATE SET t."msg"=v."msg"
WHEN NOT MATCHED THEN 
INSERT ("id","msg") 
VALUES ( v."id", v."msg" )
{code}

> JDBCUpsertOutputFormat does not set bind parameter keyFields in 
> updateStatement
> ---
>
> Key: FLINK-15728
> URL: https://issues.apache.org/jira/browse/FLINK-15728
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.9.1
>Reporter: Bhagavan
>Assignee: Bhagavan
>Priority: Major
>
> When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
> UpsertWriterUsingInsertUpdateStatement, code fails with below error.
> {code:java}
> Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
> [90012-200] 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
> at org.h2.message.DbException.get(DbException.java:205) 
> at org.h2.message.DbException.get(DbException.java:181) 
> at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
> at 
> org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
>  
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
>  {code}
> This is due to UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch 
> does not set all bind paramters in case of Update.
> This bug does get surfaced while using Derby DB. 
>  In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce 
> the bug.
> The fix is trivial. Happy to raise PR.
> {code:java}
> //for update case replace below
> setRecordToStatement(updateStatement, fieldTypes, row); 
> //with
> setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
> //NOTE:  as prepared updateStatement contains additional where clause we need 
> pass additional bind values and its sql Types
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStatement

2020-01-22 Thread Bhagavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17021232#comment-17021232
 ] 

Bhagavan commented on FLINK-15728:
--

Raised PR https://github.com/apache/flink/pull/10926

> JDBCUpsertOutputFormat does not set bind parameter keyFields in 
> updateStatement
> ---
>
> Key: FLINK-15728
> URL: https://issues.apache.org/jira/browse/FLINK-15728
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.9.1
>Reporter: Bhagavan
>Assignee: Bhagavan
>Priority: Major
>
> When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
> UpsertWriterUsingInsertUpdateStatement, code fails with below error.
> {code:java}
> Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
> [90012-200] 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
> at org.h2.message.DbException.get(DbException.java:205) 
> at org.h2.message.DbException.get(DbException.java:181) 
> at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
> at 
> org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
>  
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
>  {code}
> This is due to UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch 
> does not set all bind paramters in case of Update.
> This bug does get surfaced while using Derby DB. 
>  In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce 
> the bug.
> The fix is trivial. Happy to raise PR.
> {code:java}
> //for update case replace below
> setRecordToStatement(updateStatement, fieldTypes, row); 
> //with
> setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
> //NOTE:  as prepared updateStatement contains additional where clause we need 
> pass additional bind values and its sql Types
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStatement

2020-01-22 Thread Bhagavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020991#comment-17020991
 ] 

Bhagavan commented on FLINK-15728:
--

Sorry, should have added more context.

Added, H2Dialect as below (i.e. no support for UPSERT) which similar to Derby 
other than driver name URL
Note:  not overriding method * _getUpsertStatement_* from JDBCDialect similar 
to derby. So this would use *UpsertWriterUsingInsertUpdateStatement*

{code:java}
private static class H2Dialect implements JDBCDialect {

   private static final long serialVersionUID = 1L;

   @Override
   public boolean canHandle(String url) {
  return url.startsWith("jdbc:h2:");
   }

   @Override
   public Optional defaultDriverName() {
  return Optional.of("org.h2.Driver");
   }

   @Override
   public String quoteIdentifier(String identifier) {
  return identifier;
   }
}{code}


With this, if you run the test JDBCUpsertOutputFormatTest. You will get 
{code:java}
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#" is not set 
[90012-200] 
{code}

Also, with current *UpsertWriter* implementation Dialect upsert statement can 
only have  bind parameters ('?') same length as Row and in the same order
{code:sql}

//MySql uses VALUES so each num of ? is same as Row length
INSERT INTO `TAB`(`id`, `msg`) VALUES (?, ?) 
ON DUPLICATE KEY 
UPDATE `id`=VALUES(`id`), `msg`=VALUES(`msg`)

//postgres uses EXCLUDED so each num of ? is same as Row length
INSERT INTO TAB(id, msg) VALUES (?, ?) 
ON CONFLICT (id) 
DO UPDATE SET id=EXCLUDED.id, msg=EXCLUDED.msg

{code}

But if I write oracle upsert as below, setRecordToStatement in UpsertWriter.. 
will not work
{code:sql}
MERGE INTO TAB USING dual ON ( "id"=? )
WHEN MATCHED THEN UPDATE SET "msg"=?
WHEN NOT MATCHED THEN INSERT ("id","msg") 
VALUES ( ?, ? )
{code}

Hope, I have clarified the issue.

> JDBCUpsertOutputFormat does not set bind parameter keyFields in 
> updateStatement
> ---
>
> Key: FLINK-15728
> URL: https://issues.apache.org/jira/browse/FLINK-15728
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.9.1
>Reporter: Bhagavan
>Assignee: Bhagavan
>Priority: Major
>
> When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
> UpsertWriterUsingInsertUpdateStatement, code fails with below error.
> {code:java}
> Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
> [90012-200] 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
> at org.h2.message.DbException.get(DbException.java:205) 
> at org.h2.message.DbException.get(DbException.java:181) 
> at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
> at 
> org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
>  
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
>  {code}
> This is due to UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch 
> does not set all bind paramters in case of Update.
> This bug does get surfaced while using Derby DB. 
>  In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce 
> the bug.
> The fix is trivial. Happy to raise PR.
> {code:java}
> //for update case replace below
> setRecordToStatement(updateStatement, fieldTypes, row); 
> //with
> setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
> //NOTE:  as prepared updateStatement contains additional where clause we need 
> pass additional bind values and its sql Types
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStatement

2020-01-21 Thread Bhagavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020494#comment-17020494
 ] 

Bhagavan commented on FLINK-15728:
--

+ added [~jark] as original work was reviewed 

> JDBCUpsertOutputFormat does not set bind parameter keyFields in 
> updateStatement
> ---
>
> Key: FLINK-15728
> URL: https://issues.apache.org/jira/browse/FLINK-15728
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.9.1
>Reporter: Bhagavan
>Priority: Major
>
> When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
> UpsertWriterUsingInsertUpdateStatement, code fails with below error.
> {code:java}
> Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
> [90012-200] 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
> at org.h2.message.DbException.get(DbException.java:205) 
> at org.h2.message.DbException.get(DbException.java:181) 
> at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
> at 
> org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
>  
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
>  {code}
> This is due to UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch 
> does not set all bind paramters in case of Update.
> This bug does get surfaced while using Derby DB. 
>  In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce 
> the bug.
> The fix is trivial. Happy to raise PR.
> {code:java}
> //for update case replace below
> setRecordToStatement(updateStatement, fieldTypes, row); 
> //with
> setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
> //NOTE:  as prepared updateStatement contains additional where clause we need 
> pass additional bind values and its sql Types
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStatement

2020-01-21 Thread Bhagavan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhagavan updated FLINK-15728:
-
Description: 
When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
UpsertWriterUsingInsertUpdateStatement, code fails with below error.
{code:java}
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
[90012-200] 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
at org.h2.message.DbException.get(DbException.java:205) 
at org.h2.message.DbException.get(DbException.java:181) 
at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
at org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
 {code}
This is due to UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch does 
not set all bind paramters in case of Update.

This bug does get surfaced while using Derby DB. 
 In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce the 
bug.

The fix is trivial. Happy to raise PR.
{code:java}
//for update case replace below
setRecordToStatement(updateStatement, fieldTypes, row); 
//with
setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
//NOTE:  as prepared updateStatement contains additional where clause we need 
pass additional bind values and its sql Types



{code}

  was:
When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
UpsertWriterUsingInsertUpdateStatement, code fails with below error.
{code:java}
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
[90012-200] 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
at org.h2.message.DbException.get(DbException.java:205) 
at org.h2.message.DbException.get(DbException.java:181) 
at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
at org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
 {code}
This is due to UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch does 
not set all bind paramters in case of Update.

This bug does get surfaced while using Derby DB. 
 In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce the 
bug.

The fix is trivial. Happy to raise PR.
{code:java}
//for update case replace below
setRecordToStatement(updateStatement, fieldTypes, row); 
//with
setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
//NOTE:  as prepared updateStatement contains addition where clause we need 
pass additional bind values and its sql Types



{code}


> JDBCUpsertOutputFormat does not set bind parameter keyFields in 
> updateStatement
> ---
>
> Key: FLINK-15728
> URL: https://issues.apache.org/jira/browse/FLINK-15728
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.9.1
>Reporter: Bhagavan
>Priority: Major
>
> When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
> UpsertWriterUsingInsertUpdateStatement, code fails with below error.
> {code:java}
> Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
> [90012-200] 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
> at org.h2.message.DbException.get(DbException.java:205) 
> at org.h2.message.DbException.get(DbException.java:181) 
> at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
> at 
> org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
>  
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
>  {code}
> This is due to UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch 
> does not set all bind paramters in case of Update.
> This bug does get surfaced while using Derby DB. 
>  In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce 
> the bug.
> The fix is trivial. Happy to raise PR.
> {code:java}
> //for update case replace below
> 

[jira] [Updated] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStatement

2020-01-21 Thread Bhagavan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhagavan updated FLINK-15728:
-
Description: 
When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
UpsertWriterUsingInsertUpdateStatement, code fails with below error.
{code:java}
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
[90012-200] 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
at org.h2.message.DbException.get(DbException.java:205) 
at org.h2.message.DbException.get(DbException.java:181) 
at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
at org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
 {code}
This is due to UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch does 
not set all bind paramters in case of Update.

This bug does get surfaced while using Derby DB. 
 In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce the 
bug.

The fix is trivial. Happy to raise PR.
{code:java}
//for update case replace below
setRecordToStatement(updateStatement, fieldTypes, row); 
//with
setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
//NOTE:  as prepared updateStatement contains addition where clause we need 
pass additional bind values and its sql Types



{code}

  was:
When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
UpsertWriterUsingInsertUpdateStatement, code fails with below error.
{code:java}
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
[90012-200] 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
at org.h2.message.DbException.get(DbException.java:205) 
at org.h2.message.DbException.get(DbException.java:181) 
at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
at org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
 {code}
This is due UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch
 does not set all bind paramters in case of Update.

This bug does get surfaced while using Derby DB. 
 In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce the 
bug.

The fix is trivial. Happy to raise PR.
{code:java}
//for update case replace below
setRecordToStatement(updateStatement, fieldTypes, row); 
//with
setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
//NOTE:  as prepared updateStatement contains addition where clause we need 
pass additional bind values and its sql Types



{code}


> JDBCUpsertOutputFormat does not set bind parameter keyFields in 
> updateStatement
> ---
>
> Key: FLINK-15728
> URL: https://issues.apache.org/jira/browse/FLINK-15728
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.9.1
>Reporter: Bhagavan
>Priority: Major
>
> When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
> UpsertWriterUsingInsertUpdateStatement, code fails with below error.
> {code:java}
> Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
> [90012-200] 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
> at org.h2.message.DbException.get(DbException.java:205) 
> at org.h2.message.DbException.get(DbException.java:181) 
> at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
> at 
> org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
>  
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
>  {code}
> This is due to UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch 
> does not set all bind paramters in case of Update.
> This bug does get surfaced while using Derby DB. 
>  In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce 
> the bug.
> The fix is trivial. Happy to raise PR.
> {code:java}
> //for update case replace below
> 

[jira] [Updated] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStatement

2020-01-21 Thread Bhagavan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhagavan updated FLINK-15728:
-
Description: 
When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
UpsertWriterUsingInsertUpdateStatement, code fails with below error.
{code:java}
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
[90012-200] 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
at org.h2.message.DbException.get(DbException.java:205) 
at org.h2.message.DbException.get(DbException.java:181) 
at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
at org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
 {code}
This is due UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch
 does not set all bind paramters in case of Update.

This bug does get surfaced while using Derby DB. 
 In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce the 
bug.

The fix is trivial. Happy to raise PR.
{code:java}
//for update case replace below
setRecordToStatement(updateStatement, fieldTypes, row); 
//with
setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
//NOTE:  as prepared updateStatement contains addition where clause we need 
pass additional bind values and its sql Types



{code}

  was:
When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
UpsertWriterUsingInsertUpdateStatement, code fails with below error.
{code:java}
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
[90012-200]
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
[90012-200] 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
at org.h2.message.DbException.get(DbException.java:205) 
at org.h2.message.DbException.get(DbException.java:181) 
at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
at org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
 {code}
This is due UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch
 does not set all bind paramters in case of Update.

This bug does get surfaced while using Derby DB. 
 In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce the 
bug.

The fix is trivial. Happy to raise PR.
{code:java}
//for update case replace below
setRecordToStatement(updateStatement, fieldTypes, row); 
//with
setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
//NOTE:  as prepared updateStatement contains addition where clause we need 
pass additional bind values and its sql Types



{code}


> JDBCUpsertOutputFormat does not set bind parameter keyFields in 
> updateStatement
> ---
>
> Key: FLINK-15728
> URL: https://issues.apache.org/jira/browse/FLINK-15728
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.9.1
>Reporter: Bhagavan
>Priority: Major
>
> When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
> UpsertWriterUsingInsertUpdateStatement, code fails with below error.
> {code:java}
> Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
> [90012-200] 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
> at org.h2.message.DbException.get(DbException.java:205) 
> at org.h2.message.DbException.get(DbException.java:181) 
> at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
> at 
> org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
>  
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
>  {code}
> This is due UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch
>  does not set all bind paramters in case of Update.
> This bug does get surfaced while using Derby DB. 
>  In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce 
> the bug.
> The fix is trivial. Happy 

[jira] [Updated] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStatement

2020-01-21 Thread Bhagavan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhagavan updated FLINK-15728:
-
Description: 
When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
UpsertWriterUsingInsertUpdateStatement, code fails with below error.
{code:java}
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
[90012-200]
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
[90012-200] 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
at org.h2.message.DbException.get(DbException.java:205) 
at org.h2.message.DbException.get(DbException.java:181) 
at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
at org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
 
at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
 {code}
This is due UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch
 does not set all bind paramters in case of Update.

This bug does get surfaced while using Derby DB. 
 In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce the 
bug.

The fix is trivial. Happy to raise PR.
{code:java}
//for update case replace below
setRecordToStatement(updateStatement, fieldTypes, row); 
//with
setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
//NOTE:  as prepared updateStatement contains addition where clause we need 
pass additional bind values and its sql Types



{code}

  was:
When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
UpsertWriterUsingInsertUpdateStatement, code fails with below error.
{code:java}
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
[90012-200]Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not 
set [90012-200] at 
org.h2.message.DbException.getJdbcSQLException(DbException.java:590) at 
org.h2.message.DbException.getJdbcSQLException(DbException.java:429) at 
org.h2.message.DbException.get(DbException.java:205) at 
org.h2.message.DbException.get(DbException.java:181) at 
org.h2.expression.Parameter.checkSet(Parameter.java:83) at 
org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
 at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
 {code}
This is due UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch
 does not set all bind paramters in case of Update.

This bug does get surfaced while using Derby DB. 
 In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce the 
bug.

The fix is trivial. Happy to raise PR.
{code:java}
//for update case replace below
setRecordToStatement(updateStatement, fieldTypes, row); 
//with
setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
//NOTE:  as prepared updateStatement contains addition where clause we need 
pass additional bind values and its sql Types



{code}


> JDBCUpsertOutputFormat does not set bind parameter keyFields in 
> updateStatement
> ---
>
> Key: FLINK-15728
> URL: https://issues.apache.org/jira/browse/FLINK-15728
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.9.1
>Reporter: Bhagavan
>Priority: Major
>
> When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
> UpsertWriterUsingInsertUpdateStatement, code fails with below error.
> {code:java}
> Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
> [90012-200]
> Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
> [90012-200] 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) 
> at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) 
> at org.h2.message.DbException.get(DbException.java:205) 
> at org.h2.message.DbException.get(DbException.java:181) 
> at org.h2.expression.Parameter.checkSet(Parameter.java:83) 
> at 
> org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
>  
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
>  {code}
> This is due UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch
>  does not set all bind paramters in case of Update.
> 

[jira] [Updated] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStatement

2020-01-21 Thread Bhagavan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhagavan updated FLINK-15728:
-
Summary: JDBCUpsertOutputFormat does not set bind parameter keyFields in 
updateStatement  (was: JDBCUpsertOutputFormat does not set bind parameter 
keyFields in updateStaement)

> JDBCUpsertOutputFormat does not set bind parameter keyFields in 
> updateStatement
> ---
>
> Key: FLINK-15728
> URL: https://issues.apache.org/jira/browse/FLINK-15728
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.9.1
>Reporter: Bhagavan
>Priority: Major
>
> When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
> UpsertWriterUsingInsertUpdateStatement, code fails with below error.
> {code:java}
> Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
> [90012-200]Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not 
> set [90012-200] at 
> org.h2.message.DbException.getJdbcSQLException(DbException.java:590) at 
> org.h2.message.DbException.getJdbcSQLException(DbException.java:429) at 
> org.h2.message.DbException.get(DbException.java:205) at 
> org.h2.message.DbException.get(DbException.java:181) at 
> org.h2.expression.Parameter.checkSet(Parameter.java:83) at 
> org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) 
> at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
>  at 
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
>  {code}
> This is due UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch
>  does not set all bind paramters in case of Update.
> This bug does get surfaced while using Derby DB. 
>  In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce 
> the bug.
> The fix is trivial. Happy to raise PR.
> {code:java}
> //for update case replace below
> setRecordToStatement(updateStatement, fieldTypes, row); 
> //with
> setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
> //NOTE:  as prepared updateStatement contains addition where clause we need 
> pass additional bind values and its sql Types
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStaement

2020-01-21 Thread Bhagavan (Jira)
Bhagavan created FLINK-15728:


 Summary: JDBCUpsertOutputFormat does not set bind parameter 
keyFields in updateStaement
 Key: FLINK-15728
 URL: https://issues.apache.org/jira/browse/FLINK-15728
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.9.1
Reporter: Bhagavan


When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses 
UpsertWriterUsingInsertUpdateStatement, code fails with below error.
{code:java}
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set 
[90012-200]Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not 
set [90012-200] at 
org.h2.message.DbException.getJdbcSQLException(DbException.java:590) at 
org.h2.message.DbException.getJdbcSQLException(DbException.java:429) at 
org.h2.message.DbException.get(DbException.java:205) at 
org.h2.message.DbException.get(DbException.java:181) at 
org.h2.expression.Parameter.checkSet(Parameter.java:83) at 
org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233)
 at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111)
 {code}
This is due UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch
 does not set all bind paramters in case of Update.

This bug does get surfaced while using Derby DB. 
 In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce the 
bug.

The fix is trivial. Happy to raise PR.
{code:java}
//for update case replace below
setRecordToStatement(updateStatement, fieldTypes, row); 
//with
setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
//NOTE:  as prepared updateStatement contains addition where clause we need 
pass additional bind values and its sql Types



{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15174) FLINK security using PKI mutual auth needs certificate pinning or Private CA

2020-01-07 Thread Bhagavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010030#comment-17010030
 ] 

Bhagavan commented on FLINK-15174:
--

The proposed setup is as you have noted.
 * KeyStore = specific certificate (shared secret)
 * TrustStore = internal CA
 * Certificate fingerprint = ensuring only that single specific certificate is 
accepted

However, for internal communication one could use the same Keystore file as 
TrustStore

Re: "it really not possible to set up a TrustStore with only a non-self-signed 
certificate" - correct. Tried that as the first step. JDK does not like this 
setup. Code throws NPE while validating as the issuer is null.

So, It is not possible to have only the actual specific certificate in the 
TrustStore (and not have the CA in there).

 

> FLINK security using PKI mutual auth needs certificate pinning or Private CA
> 
>
> Key: FLINK-15174
> URL: https://issues.apache.org/jira/browse/FLINK-15174
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration, Runtime / REST
>Affects Versions: 1.9.0, 1.9.1, 1.10.0
>Reporter: Bhagavan
>Assignee: Bhagavan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.9.2, 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The current design for Flink security for internal/REST relies on PKI mutual 
> authentication. However, the design is not robust if CA used for generating 
> certificates are public CA or Firwide internal CA. This is due to how the 
> chain of trust works whilst validating the client certificate. i.e. Any 
> certificate signed by same CA would be able to make a connection to internal 
> Flink network.
> Proposed improvement.
> An environment where operators are constrained to use firmwide Internal 
> public CA, Allow the operator to specify the certificate fingerprint to 
> further protect the cluster allowing only specific certificate.
> This change should be a backward compatible change where one can use just 
> certificate with private CA.
> Changes are easy to implement as all network communications are done using 
> netty and netty provides FingerprintTrustManagerFactory.
> Happy to send PR if we agree on the change.
> Document corrections.
> From security documentation.
> [https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html]
> _"All internal connections are SSL authenticated and encrypted. The 
> connections use *mutual authentication*, meaning both server and client-side 
> of each connection need to present the certificate to each other. The 
> certificate acts effectively as a shared secret."_
> _-_ This not exactly true. Any party who obtains the client certificate from 
> CA would be able to form the connection even though the certificate 
> public/private keys are different. So it's not *a* shared secret ( merely a 
> common signature)
> _Further doc says - "A common setup is to generate a dedicated certificate 
> (maybe self-signed) for a Flink deployment._
> - I think this is the only way to make the cluster secure. i.e. create 
> private CA just for the cluster.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15174) FLINK security using PKI mutual auth needs certificate pinning or Private CA

2019-12-10 Thread Bhagavan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhagavan updated FLINK-15174:
-
Priority: Blocker  (was: Major)

> FLINK security using PKI mutual auth needs certificate pinning or Private CA
> 
>
> Key: FLINK-15174
> URL: https://issues.apache.org/jira/browse/FLINK-15174
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration, Runtime / REST
>Affects Versions: 1.9.0, 1.9.1
>Reporter: Bhagavan
>Priority: Blocker
>
> The current design for Flink security for internal/REST relies on PKI mutual 
> authentication. However, the design is not robust if CA used for generating 
> certificates are public CA or Firwide internal CA. This is due to how the 
> chain of trust works whilst validating the client certificate. i.e. Any 
> certificate signed by same CA would be able to make a connection to internal 
> Flink network.
> Proposed improvement.
> An environment where operators are constrained to use firmwide Internal 
> public CA, Allow the operator to specify the certificate fingerprint to 
> further protect the cluster allowing only specific certificate.
> This change should be a backward compatible change where one can use just 
> certificate with private CA.
> Changes are easy to implement as all network communications are done using 
> netty and netty provides FingerprintTrustManagerFactory.
> Happy to send PR if we agree on the change.
> Document corrections.
> From security documentation.
> [https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html]
> _"All internal connections are SSL authenticated and encrypted. The 
> connections use *mutual authentication*, meaning both server and client-side 
> of each connection need to present the certificate to each other. The 
> certificate acts effectively as a shared secret."_
> _-_ This not exactly true. Any party who obtains the client certificate from 
> CA would be able to form the connection even though the certificate 
> public/private keys are different. So it's not *a* shared secret ( merely a 
> common signature)
> _Further doc says - "A common setup is to generate a dedicated certificate 
> (maybe self-signed) for a Flink deployment._
> - I think this is the only way to make the cluster secure. i.e. create 
> private CA just for the cluster.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14170) Support hadoop < 2.7 with StreamingFileSink.BulkFormatBuilder

2019-12-10 Thread Bhagavan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhagavan updated FLINK-14170:
-
Priority: Major  (was: Blocker)

> Support hadoop < 2.7 with StreamingFileSink.BulkFormatBuilder
> -
>
> Key: FLINK-14170
> URL: https://issues.apache.org/jira/browse/FLINK-14170
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.9.0
>Reporter: Bhagavan
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, StreamingFileSink is supported only with Hadoop >= 2.7 
> irrespective of Row/bulk format builder. This restriction is due to truncate 
> is not supported in  Hadoop < 2.7
> However, BulkFormatBuilder does not use truncate method to restore the file. 
> So the restricting StreamingFileSink.BulkFormatBuilder to be used only with 
> Hadoop >= 2.7 is not necessary.
> So requested improvement is to remove the precondition on 
> HadoopRecoverableWriter and allow  BulkFormatBuilder (Parquet) to be used in 
> Hadoop 2.6 ( Most of the enterprises still on CDH 5.x)
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14170) Support hadoop < 2.7 with StreamingFileSink.BulkFormatBuilder

2019-12-10 Thread Bhagavan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhagavan updated FLINK-14170:
-
Priority: Blocker  (was: Major)

> Support hadoop < 2.7 with StreamingFileSink.BulkFormatBuilder
> -
>
> Key: FLINK-14170
> URL: https://issues.apache.org/jira/browse/FLINK-14170
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.9.0
>Reporter: Bhagavan
>Priority: Blocker
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, StreamingFileSink is supported only with Hadoop >= 2.7 
> irrespective of Row/bulk format builder. This restriction is due to truncate 
> is not supported in  Hadoop < 2.7
> However, BulkFormatBuilder does not use truncate method to restore the file. 
> So the restricting StreamingFileSink.BulkFormatBuilder to be used only with 
> Hadoop >= 2.7 is not necessary.
> So requested improvement is to remove the precondition on 
> HadoopRecoverableWriter and allow  BulkFormatBuilder (Parquet) to be used in 
> Hadoop 2.6 ( Most of the enterprises still on CDH 5.x)
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14170) Support hadoop < 2.7 with StreamingFileSink.BulkFormatBuilder

2019-12-10 Thread Bhagavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16992443#comment-16992443
 ] 

Bhagavan edited comment on FLINK-14170 at 12/10/19 11:10 AM:
-

[~aljoscha] I have a PR for this. Please assign this ticket to me


was (Author: dasbh):
[~aljoscha] I have a PR for this please assign this ticket to me

> Support hadoop < 2.7 with StreamingFileSink.BulkFormatBuilder
> -
>
> Key: FLINK-14170
> URL: https://issues.apache.org/jira/browse/FLINK-14170
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.9.0
>Reporter: Bhagavan
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, StreamingFileSink is supported only with Hadoop >= 2.7 
> irrespective of Row/bulk format builder. This restriction is due to truncate 
> is not supported in  Hadoop < 2.7
> However, BulkFormatBuilder does not use truncate method to restore the file. 
> So the restricting StreamingFileSink.BulkFormatBuilder to be used only with 
> Hadoop >= 2.7 is not necessary.
> So requested improvement is to remove the precondition on 
> HadoopRecoverableWriter and allow  BulkFormatBuilder (Parquet) to be used in 
> Hadoop 2.6 ( Most of the enterprises still on CDH 5.x)
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14170) Support hadoop < 2.7 with StreamingFileSink.BulkFormatBuilder

2019-12-10 Thread Bhagavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16992443#comment-16992443
 ] 

Bhagavan commented on FLINK-14170:
--

[~aljoscha] I have a PR for this please assign this ticket to me

> Support hadoop < 2.7 with StreamingFileSink.BulkFormatBuilder
> -
>
> Key: FLINK-14170
> URL: https://issues.apache.org/jira/browse/FLINK-14170
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.9.0
>Reporter: Bhagavan
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, StreamingFileSink is supported only with Hadoop >= 2.7 
> irrespective of Row/bulk format builder. This restriction is due to truncate 
> is not supported in  Hadoop < 2.7
> However, BulkFormatBuilder does not use truncate method to restore the file. 
> So the restricting StreamingFileSink.BulkFormatBuilder to be used only with 
> Hadoop >= 2.7 is not necessary.
> So requested improvement is to remove the precondition on 
> HadoopRecoverableWriter and allow  BulkFormatBuilder (Parquet) to be used in 
> Hadoop 2.6 ( Most of the enterprises still on CDH 5.x)
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15174) FLINK security using PKI mutual auth needs certificate pinning or Private CA

2019-12-10 Thread Bhagavan (Jira)
Bhagavan created FLINK-15174:


 Summary: FLINK security using PKI mutual auth needs certificate 
pinning or Private CA
 Key: FLINK-15174
 URL: https://issues.apache.org/jira/browse/FLINK-15174
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration, Runtime / REST
Affects Versions: 1.9.1, 1.9.0
Reporter: Bhagavan


The current design for Flink security for internal/REST relies on PKI mutual 
authentication. However, the design is not robust if CA used for generating 
certificates are public CA or Firwide internal CA. This is due to how the chain 
of trust works whilst validating the client certificate. i.e. Any certificate 
signed by same CA would be able to make a connection to internal Flink network.

Proposed improvement.

An environment where operators are constrained to use firmwide Internal public 
CA, Allow the operator to specify the certificate fingerprint to further 
protect the cluster allowing only specific certificate.

This change should be a backward compatible change where one can use just 
certificate with private CA.

Changes are easy to implement as all network communications are done using 
netty and netty provides FingerprintTrustManagerFactory.

Happy to send PR if we agree on the change.

Document corrections.

>From security documentation.

[https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html]

_"All internal connections are SSL authenticated and encrypted. The connections 
use *mutual authentication*, meaning both server and client-side of each 
connection need to present the certificate to each other. The certificate acts 
effectively as a shared secret."_

_-_ This not exactly true. Any party who obtains the client certificate from CA 
would be able to form the connection even though the certificate public/private 
keys are different. So it's not *a* shared secret ( merely a common signature)

_Further doc says - "A common setup is to generate a dedicated certificate 
(maybe self-signed) for a Flink deployment._

- I think this is the only way to make the cluster secure. i.e. create private 
CA just for the cluster.

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14170) Support hadoop < 2.7 with StreamingFileSink.BulkFormatBuilder

2019-09-23 Thread Bhagavan (Jira)
Bhagavan created FLINK-14170:


 Summary: Support hadoop < 2.7 with 
StreamingFileSink.BulkFormatBuilder
 Key: FLINK-14170
 URL: https://issues.apache.org/jira/browse/FLINK-14170
 Project: Flink
  Issue Type: Improvement
  Components: API / DataSet
Affects Versions: 1.9.0, 1.8.2, 1.8.1, 1.8.0
Reporter: Bhagavan


Currently, StreamingFileSink is supported only with Hadoop >= 2.7 irrespective 
of Row/bulk format builder. This restriction is due to truncate is not 
supported in  Hadoop < 2.7

However, BulkFormatBuilder does not use truncate method to restore the file. So 
the restricting StreamingFileSink.BulkFormatBuilder to be used only with Hadoop 
>= 2.7 is not necessary.

So requested improvement is to remove the precondition on 
HadoopRecoverableWriter and allow  BulkFormatBuilder (Parquet) to be used in 
Hadoop 2.6 ( Most of the enterprises still on CDH 5.x)

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)