Re: TABLE API + DataStream outsourcing schema or Pojo?
Hi Steve, The memory catalog does not persist metadata and needs to be repopulated everytime. However, you can implement a catalog that persists the metadata to a file or a database. There is an effort to implement a Catalog interface of Hive's metastore. A preview is available in the latest release (1.9.0) Best, Fabian Am Do., 5. Sept. 2019 um 14:52 Uhr schrieb Steve Robert < contact.steverob...@gmail.com>: > Hi Fabian , > > thank you for your answer it is indeed the solution that I am currently > testing > i use TypeInformation convert = > JsonRowSchemaConverter.convert(JSON_SCHEMA); provided by the > flink-json and provide the TypeFormation to the operatorStream > its look like to work :) with this solution my schema can be outside my > package > > one additional question about . GenericMemoryCatalog > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/catalogs.html > . > catalog can be use accross multiple job running on the same cluster ? or > the catalog are scoped on the job session only ? > > DataStream dataStreamJson = dataStream.map(new MapFunction JsonNode>() { > @Override > public JsonNode map(String s) throws Exception { > ObjectMapper objectMapper = new ObjectMapper(); > JsonNode node = objectMapper.readTree(s); > return node; > } > }); > DataStream dataStreamRow = dataStreamJson.map(new MapFunction Row>() { > @Override > public Row map(JsonNode jsonNode) throws Exception { > int pos = 0; > Row row = new Row(jsonNode.size()); > Iterator iterator = jsonNode.fieldNames(); > while (iterator.hasNext()) { > String key = iterator.next(); > row.setField(pos, jsonNode.get(key).asText()); > pos++; > } > return row; > } > }).returns(convert); > > Table tableA = tEnv.fromDataStream(dataStreamRow); > > > Le jeu. 5 sept. 2019 à 13:23, Fabian Hueske a écrit : > >> Hi Steve, >> >> Maybe you could implement a custom TableSource that queries the data from >> the rest API and converts the JSON directly into a Row data type. >> This would also avoid going through the DataStream API just for ingesting >> the data. >> >> Best, Fabian >> >> Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert < >> contact.steverob...@gmail.com>: >> >>> Hi guys , >>> >>> It's been a while since I'm studying TABLE APIs for integration into my >>> system. >>> when i take a look on this documentation >>> : >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors >>> >>> >>> I understand that it is possible to apply a JSON FORMAT on the connector >>> and apply a JSON-SCHEMA without any hardcoded java pojo >>> .jsonSchema( >>> "{" + >>> " type: 'object'," + >>> " properties: {" + >>> "lon: {" + >>> " type: 'number'" + >>> "}," + >>> "rideTime: {" + >>> " type: 'string'," + >>> " format: 'date-time'" + >>> "}" + >>> " }" + >>> "}" >>> ) >>> >>> >>> but my problematic is the following my data comes from REST-API , so >>> I have to process the data and transmit it via a DataStream >>> the problem is that between the conversation of a dataStream and a >>> table must pass through a Java Pojo. Datastream input >>> Table table=tEnv.fromDataStream(input); >>> I tried a trick while making a conversation from my JSON to AVRO >>> using a GenericRecord but it does not seem possible . >>> >>> my user case and being able to add REST-API processing in runtime >>> and be able to outsource and dynamically load my Pojo / Schema without >>> harcode an Java-Pojo object >>> >>> >>> Do you have an approach to suggest me ? >>> >>> >>> Thank a lot >>> >>
Re: TABLE API + DataStream outsourcing schema or Pojo?
Hi Fabian , thank you for your answer it is indeed the solution that I am currently testing i use TypeInformation convert = JsonRowSchemaConverter.convert(JSON_SCHEMA); provided by the flink-json and provide the TypeFormation to the operatorStream its look like to work :) with this solution my schema can be outside my package one additional question about . GenericMemoryCatalog https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/catalogs.html . catalog can be use accross multiple job running on the same cluster ? or the catalog are scoped on the job session only ? DataStream dataStreamJson = dataStream.map(new MapFunction() { @Override public JsonNode map(String s) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); JsonNode node = objectMapper.readTree(s); return node; } }); DataStream dataStreamRow = dataStreamJson.map(new MapFunction() { @Override public Row map(JsonNode jsonNode) throws Exception { int pos = 0; Row row = new Row(jsonNode.size()); Iterator iterator = jsonNode.fieldNames(); while (iterator.hasNext()) { String key = iterator.next(); row.setField(pos, jsonNode.get(key).asText()); pos++; } return row; } }).returns(convert); Table tableA = tEnv.fromDataStream(dataStreamRow); Le jeu. 5 sept. 2019 à 13:23, Fabian Hueske a écrit : > Hi Steve, > > Maybe you could implement a custom TableSource that queries the data from > the rest API and converts the JSON directly into a Row data type. > This would also avoid going through the DataStream API just for ingesting > the data. > > Best, Fabian > > Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert < > contact.steverob...@gmail.com>: > >> Hi guys , >> >> It's been a while since I'm studying TABLE APIs for integration into my >> system. >> when i take a look on this documentation >> : >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors >> >> >> I understand that it is possible to apply a JSON FORMAT on the connector >> and apply a JSON-SCHEMA without any hardcoded java pojo >> .jsonSchema( >> "{" + >> " type: 'object'," + >> " properties: {" + >> "lon: {" + >> " type: 'number'" + >> "}," + >> "rideTime: {" + >> " type: 'string'," + >> " format: 'date-time'" + >> "}" + >> " }" + >> "}" >> ) >> >> >> but my problematic is the following my data comes from REST-API , so >> I have to process the data and transmit it via a DataStream >> the problem is that between the conversation of a dataStream and a >> table must pass through a Java Pojo. Datastream input >> Table table=tEnv.fromDataStream(input); >> I tried a trick while making a conversation from my JSON to AVRO >> using a GenericRecord but it does not seem possible . >> >> my user case and being able to add REST-API processing in runtime >> and be able to outsource and dynamically load my Pojo / Schema without >> harcode an Java-Pojo object >> >> >> Do you have an approach to suggest me ? >> >> >> Thank a lot >> >
Re: TABLE API + DataStream outsourcing schema or Pojo?
Hi Steve, Maybe you could implement a custom TableSource that queries the data from the rest API and converts the JSON directly into a Row data type. This would also avoid going through the DataStream API just for ingesting the data. Best, Fabian Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert < contact.steverob...@gmail.com>: > Hi guys , > > It's been a while since I'm studying TABLE APIs for integration into my > system. > when i take a look on this documentation > : > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors > > > I understand that it is possible to apply a JSON FORMAT on the connector > and apply a JSON-SCHEMA without any hardcoded java pojo > .jsonSchema( > "{" + > " type: 'object'," + > " properties: {" + > "lon: {" + > " type: 'number'" + > "}," + > "rideTime: {" + > " type: 'string'," + > " format: 'date-time'" + > "}" + > " }" + > "}" > ) > > > but my problematic is the following my data comes from REST-API , so I > have to process the data and transmit it via a DataStream > the problem is that between the conversation of a dataStream and a > table must pass through a Java Pojo. Datastream input > Table table=tEnv.fromDataStream(input); > I tried a trick while making a conversation from my JSON to AVRO using > a GenericRecord but it does not seem possible . > > my user case and being able to add REST-API processing in runtime and > be able to outsource and dynamically load my Pojo / Schema without harcode > an Java-Pojo object > > > Do you have an approach to suggest me ? > > > Thank a lot >