Hi,

I couldn’t spot anything off in the code snippet you provided. So you should be 
ok with this :)

Cheers,
Gordon


On 15 August 2017 at 9:18:59 PM, mingleizhang (18717838...@163.com) wrote:

BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's 
value setted to itself.




At 2017-08-15 21:17:00, "mingleizhang" <18717838...@163.com> wrote:
Hi, flink experts!

I sinked my data ( PB objects ) to elasticsearch. I dont know whether the 
sinked data is correct or incorrect. The codes like following, Could you help 
me check it please ? Im not familar with ES. Now, I want to install a kibana to 
view my data. But I dont know the below codes is correct or incorrect. I ran 
the flink program. it does not give me an error. I just want to confirm.


// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new 
ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new 
ElasticsearchSinkFunction[ActivityInfo] {
  def createIndexRequest(element: ActivityInfo): IndexRequest = {
    val json = new java.util.HashMap[String, ActivityInfo]
    json.put("data", element)
    
Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
  }
  override def process(activityInfo: ActivityInfo, runtimeContext: 
RuntimeContext, requestIndexer: RequestIndexer): Unit = {
    requestIndexer.add(createIndexRequest(activityInfo))
  }
}))

Thanks
mingleizhang



 



【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>        

Reply via email to