I opened an issue for this: https://issues.apache.org/jira/browse/FLINK-7605
On Wed, Sep 6, 2017 at 4:24 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Maybe this should be well documented also...is there any dedicated page to > Flink and JDBC connectors? > > On Wed, Sep 6, 2017 at 4:12 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Great! >> >> If you want to, you can open a PR that adds >> >> if (!conn.getAutoCommit()) { >> conn.setAutoCommit(true); >> } >> >> to JdbcOutputFormat.open(). >> >> Cheers, Fabian >> >> >> >> 2017-09-06 15:55 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >> >>> Hi Fabian, >>> thanks for the detailed answer. Obviously you are right :) >>> As stated by https://phoenix.apache.org/tuning.html auto-commit is >>> disabled by default in Phoenix, but it can be easily enabled just appending >>> AutoCommit=true to the connection URL or, equivalently, setting the proper >>> property in the conf object passed to the Phoenix >>> QueryUtil.getConnectionUrl method that autogenerate the connection URL, >>> i.e.: >>> >>> ---------------------- >>> Job job = Job.getInstance(HBaseConfiguration.create(), >>> "phoenix-mr-job"); >>> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, >>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true"); >>> final Properties phoenixProps = PropertiesUtil.extractProperties(new >>> Properties(), jobConf); >>> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf); >>> ---------------------- >>> >>> Now my job works also with the standard Flink JDBCOutputformat. >>> Just to help other people willing to play with Phoenix and HBase I paste >>> below my simple test job: >>> >>> @Test >>> public void testPhoenixOutputFormat() throws Exception { >>> >>> final StreamExecutionEnvironment senv = getStreamingExecutionEnv(); >>> senv.enableCheckpointing(5000); >>> DataStream<String> testStream = senv.fromElements("1,aaa,XXX", >>> "2,bbb,YYY", "3,ccc,ZZZ"); >>> >>> // Set the target Phoenix table and the columns >>> DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() >>> { >>> >>> private static final long serialVersionUID = 1L; >>> >>> @Override >>> public Row map(String str) throws Exception { >>> String[] split = str.split(Pattern.quote(",")); >>> Row ret = new Row(3); >>> ret.setField(0, split[0]); >>> ret.setField(1, split[1]); >>> ret.setField(2, split[2]); >>> return ret; >>> } >>> }).returns(new RowTypeInfo(BasicTypeInfo.STRI >>> NG_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo. >>> STRING_TYPE_INFO)); >>> >>> Job job = Job.getInstance(HBaseConfiguration.create(), >>> "phoenix-mr-job"); >>> PhoenixMapReduceUtil.setOutput(job, "MY_TABLE", >>> "FIELD_1,FIELD2,FIELD_3"); >>> final org.apache.hadoop.conf.Configuration jobConf = >>> job.getConfiguration(); >>> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, >>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true"); >>> final String upsertStatement = PhoenixConfigurationUtil.getUp >>> sertStatement(jobConf); >>> final Properties phoenixProps = PropertiesUtil.extractProperties(new >>> Properties(), jobConf); >>> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf); >>> >>> rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat() >>> .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.g >>> etCanonicalName()) >>> .setDBUrl(connUrl) >>> .setQuery(upsertStatement) >>> .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR, >>> Types.VARCHAR}) >>> .finish()); >>> >>> senv.execute(); >>> } >>> >>> Best, >>> Flavio >>> >>> On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> According to the JavaDocs of java.sql.Connection, commit() will throw >>>> an exception if the connection is in auto commit mode which should be the >>>> default. >>>> So adding this change to the JdbcOutputFormat seems a bit risky. >>>> >>>> Maybe the Phoenix JDBC connector does not enable auto commits by >>>> default (or doesn't support it). Can you check that Flavio? >>>> If the Phoenix connector supports but not activates auto commits by >>>> default, we can enable it in JdbcOutputFormat.open(). >>>> If auto commits are not supported, we can add a check after execute() >>>> and call commit() only if Connection.getAutoCommit() returns false. >>>> >>>> Best, Fabian >>>> >>>> >>>> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>> >>>>> Hi to all, >>>>> I'm writing a job that uses Apache Phoenix. >>>>> >>>>> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but >>>>> it's not well suited to work with Table API because it cannot handle >>>>> generic objects like Rows (it need a DBWritable Object that should be >>>>> already present at compile time). So I've looked into the code of the >>>>> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat >>>>> (basically). >>>>> >>>>> However, to make it work I had to slightly modify the Flink >>>>> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the >>>>> PreparedStatement. E.g: >>>>> >>>>> upload.executeBatch(); >>>>> dbConn.commit(); >>>>> >>>>> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat >>>>> where I've added these 2 lines of code starting from the code of the >>>>> JDBCOutputformat (it couldn't be extended in this case because all fields >>>>> are private). >>>>> >>>>> What do you think about this? Should I open a ticket to add a >>>>> connection commit after executeBatch (in order to be compatible with >>>>> Phoenix) or something else (e.g. create a Phoenix connector that basically >>>>> extend JDBCOutputConnector and ovewrite 2 methods, changing also the >>>>> visibility of its fields to protected)? >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>>> >>>> >>> >>> >> >