Re: Writing the results of the stream onto a CSV File
Hi, Instead of use writeAsText you have a writeAsCsv https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html You can use just with the string path (like you have) or you can use the overwrite flag if it suit your needs. Best Regards, Fábio Dias. Abdul Salam Shaikh escreveu no dia sexta, 28/04/2017 às 10:18: > Hi, > > I am trying to write the results of my stream into a CSV format using the > following code and it has compilation issues: > > DataStream objectStream = windowedStream.flatMap(new > WindowObjectStreamTransformer()); > objectStream.writeAsText("H:\\data.csv", new > TextFormatter() { > public String format (DetectorStatistics value) { > return value.getDetectorId() + " ," + > value.getActualPhaseTime() ; > } > }); > > What am I doing wrong here ? Or is there an alternative way to write > records onto a CSV file ? > > Thanks! > >
Flink Scheduling and FlinkML
Hi to all, I'm building a recommendation system to my application. I have a set of logs (that contains the user info, the hour, the button that was clicked ect...) that arrive to my Flink by kafka, then I save every log in a HDFS (HADOOP), but know I have a problem, I want to apply ML to (all) my data. I think in 2 scenarios: First : Transform my DataStream in a DataSet and perform the ML task. It is possible? Second : Preform a task in flink that get the data from Hadoop and perform the ML task. What is the best way to do it? I already check the IncrementalLearningSkeleton but I didn't understand how to apply that to an actual real case. Is there some complex example that I could look? ( https://github.com/apache/flink/tree/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml ) Another thing that I would like to ask is how to perform the second scenario, where I need to perform this task every hour, what it is the best way to do it? Thanks, Fábio Dias.
Elasticsearch 5.x connection
Hi, Last Friday I was running elasticsearch 5.X with Flink 1.2.0 In the pom.xml I added this dependency: org.apache.flink flink-connector-elasticsearch-base_2.10 1.3-SNAPSHOT org.elasticsearch elasticsearch And I added to two files : the ElasticsearchSink.java and Elasticsearch5ApiCallBridge.java from the flink github. And this code was running with no problem: public static void writeToElastic(DataStream> elasticStream) { HashMap config = new HashMap<>(); config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "clouduxlogs"); try { ArrayList transports = new ArrayList<>(); transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300)); ElasticsearchSinkFunction> indexLog = new ElasticsearchSinkFunction>() { private static final long serialVersionUID = 8802869701292023100L; public IndexRequest createIndexRequest(HashMap element) { HashMap> valueOfLog = new HashMap<>(); element.put("timestamp", (new Timestamp((new Date()).getTime())).toString()); valueOfLog.put("data", element); //{aggregation : { aggregationType : "value", "value" : 468, "count" : 1, "timestamp": } } return Requests .indexRequest() .index("logs") .type("object") .source(valueOfLog); } public void process(HashMap element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } }; SinkFunction> esSink = new ElasticsearchSink>(config, transports, indexLog); elasticStream.addSink(esSink); } catch (Exception e) { System.out.println(e); } } but in monday those files (ElasticsearchSink.java and Elasticsearch5ApiCallBridge.java) was changed and now my code doesn't work. I have tried to use this dependency: org.apache.flink flink-connector-elasticsearch5_2.10 1.3-SNAPSHOT but I'm getting this error: java.lang.NoSuchMethodError: org.apache.flink.util.InstantiationUtil.isSerializable(Ljava/lang/Object;)Z at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.(ElasticsearchSinkBase.java:195) at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.(ElasticsearchSink.java:95) at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.(ElasticsearchSink.java:78) at ux.App.writeToElastic(App.java:102) at ux.App.main(App.java:55) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) Do I need to downgrade my elastic version or there is some other way to make it work? Thanks, Fábio Dias.
Re: Apache Flink and Elasticsearch send Json Object instead of string
Hi, thanks for the reply. There isn't other way to do that? Using REST you can send json like this : curl -XPOST 'localhost:9200/customer/external?pretty&pretty' -H 'Content-Type: application/json' -d' { "name": "Jane Doe" } ' In my case I have json like this: { "filters" : { "id" : 1, "name": "abc" } } how can I treat this cases? There isn't a way to send all the json element and index it like the in the REST request? Thanks. Tzu-Li (Gordon) Tai escreveu no dia terça, 21/02/2017 às 07:54: > Hi, > > I’ll use your code to explain. > > public IndexRequest createIndexRequest(String element){ > > HashMap esJson = new HashMap<>(); > > esJson.put("data", element); > > What you should do here is parse the field values from `element`, and > simply treat them as key-value pairs of the `esJson` map. > > So, the `esJson` should be prepared by doing: > > esJson.put(“id”, 6); > > esJson.put(“name”, “A green door”); > > esJson.put(“price”, 12.5); > > etc. > > > Cheers, > > Gordon > > > On February 21, 2017 at 12:41:40 AM, Fábio Dias (fabiodio...@gmail.com) > wrote: > > Hi, > > I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a > json object ({"id":1, "name":"X"} ect...), I already have a string with > this information, but I don't want to save it as string. > > I recieve this: > > { > "_index": "logs", > "_type": "object", > "_id": "AVpcARfkfYWqSubr0ZvK", > "_score": 1, > "_source": { > "data": "{\"id\":6,\"name\":\"A green > door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}" > } > } > > And I want to recieve this: > > { > "_index": "logs", > "_type": "external", > "_id": "AVpcARfkfYWqSubr0ZvK", > "_score": 1, > "_source": { > "data": { > "id":6, > "name":"A green door", > "price":12.5, > "tags": > ["home","green"] > } > } > } > > my java code: > > try { > ArrayList transports = new ArrayList<>(); > transports.add(new InetSocketAddress("127.0.0.1", 9300)); > > ElasticsearchSinkFunction indexLog = new > ElasticsearchSinkFunction() { > > private static final long serialVersionUID = 8802869701292023100L; > > public IndexRequest createIndexRequest(String element){ > > HashMap esJson = new HashMap<>(); > > esJson.put("data", element); > > > > return Requests > .indexRequest() > .index("logs") > .type("object") > .source(esJson); > } > @Override > public void process(String element, RuntimeContext ctx, > RequestIndexer indexer) { > indexer.add(createIndexRequest(element)); > } > }; > > ElasticsearchSink esSink = new > ElasticsearchSink(config, transports, indexLog); > input.addSink(esSink); > } > catch (Exception e) { > System.out.println(e); > } > > > Do I need to treat every entry as a map? Can I just send a object with key > value? > > Thanks. > >
Apache Flink and Elasticsearch send Json Object instead of string
Hi, I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a json object ({"id":1, "name":"X"} ect...), I already have a string with this information, but I don't want to save it as string. I recieve this: { "_index": "logs", "_type": "object", "_id": "AVpcARfkfYWqSubr0ZvK", "_score": 1, "_source": { "data": "{\"id\":6,\"name\":\"A green door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}" } } And I want to recieve this: { "_index": "logs", "_type": "external", "_id": "AVpcARfkfYWqSubr0ZvK", "_score": 1, "_source": { "data": { "id":6, "name":"A green door", "price":12.5, "tags": ["home","green"] } } } my java code: try { ArrayList transports = new ArrayList<>(); transports.add(new InetSocketAddress("127.0.0.1", 9300)); ElasticsearchSinkFunction indexLog = new ElasticsearchSinkFunction() { private static final long serialVersionUID = 8802869701292023100L; public IndexRequest createIndexRequest(String element){ HashMap esJson = new HashMap<>(); esJson.put("data", element); return Requests .indexRequest() .index("logs") .type("object") .source(esJson); } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } }; ElasticsearchSink esSink = new ElasticsearchSink(config, transports, indexLog); input.addSink(esSink); } catch (Exception e) { System.out.println(e); } Do I need to treat every entry as a map? Can I just send a object with key value? Thanks.