[jira] [Comment Edited] (FLINK-8318) Conflict jackson library with ElasticSearch connector

2018-01-05 Thread Jihyun Cho (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313362#comment-16313362
 ] 

Jihyun Cho edited comment on FLINK-8318 at 1/5/18 4:14 PM:
---

Here is my code and pom files.
{code:title=Test.scala}
import java.net.{InetAddress, InetSocketAddress}
import java.util.{Properties, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import 
org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, 
RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write

object Test {
  val consumeProperties = {
val props = new Properties()
props.setProperty("bootstrap.servers", "kafka-001:9092")
props.setProperty("group.id", "test")
props.setProperty("auto.offset.reset", "latest")
props
  }

  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new 
SimpleStringSchema(), consumeProperties))


val config = new java.util.HashMap[String, String]
config.put("cluster.name", "test")

val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("es-001"), 9300))

val esSink = new ElasticsearchSink[String](config, transportAddresses,
  new ElasticsearchSinkFunction[String] {
def createIndexRequest(t: String): IndexRequest = {
  return Requests.indexRequest()
.index("test")
.`type`("message")
.source(t)
}

override def process(t: String, runtimeContext: RuntimeContext, 
requestIndexer: RequestIndexer) = {
  requestIndexer.add(createIndexRequest(t))
}
  }
)

stream.map { value =>
  try {
val esDateFormat = FastDateFormat.getInstance("-MM-dd 
HH:mm:ss.SSS", TimeZone.getTimeZone("UTC"))
implicit val formats = DefaultFormats
val json = parse(value)
val transJson = json transformField {
  case JField("short_message", JString(s)) => ("message", JString(s))
  case JField("host", JString(s)) => ("source", JString(s))
  case JField("timestamp", JInt(i)) => ("timestamp", 
JString(esDateFormat.format((i * 1000L).toLong)))
  case JField("timestamp", JDouble(d)) => ("timestamp", 
JString(esDateFormat.format((d * 1000L).toLong)))
  case JField(k, v) => (k.stripPrefix("_"), v)
}
write(transJson)
  } catch {
case _: Exception => ""
  }
}.filter(_.nonEmpty).addSink(esSink)

env.execute("Test")
  }
}
{code}
{code:title=pom.xml}

http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>

4.0.0

test.streaming
test
jar
1.0-SNAPSHOT


1.4.0




org.apache.flink
flink-streaming-scala_2.11
${flink.version}


org.apache.flink
flink-streaming-contrib_2.11
${flink.version}


org.apache.flink
flink-connector-kafka-0.10_2.11
${flink.version}


org.apache.flink
flink-connector-elasticsearch5_2.11
${flink.version}


org.json4s
json4s-native_2.11
3.5.3






org.apache.maven.plugins
maven-compiler-plugin

1.8
1.8



net.alchim31.maven
scala-maven-plugin
3.2.2



compile
testCompile





org.apache.maven.plugins
maven-shade-plugin
2.4.3




test.streaming.Test



[jira] [Comment Edited] (FLINK-8318) Conflict jackson library with ElasticSearch connector

2018-01-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308217#comment-16308217
 ] 

Aljoscha Krettek edited comment on FLINK-8318 at 1/2/18 3:28 PM:
-

Do you have a specific reason for using {{parent-first}}? I think you case 
should work if you include ES and Jackson in your user jar and use 
{{child-first}}, which was introduced for exactly such cases of dependency 
clashes.


was (Author: aljoscha):
Do you have a specific reason for using `parent-first`. I think you case should 
work if you include ES and Jackson in your user jar and use `child-first`, 
which was introduced for exactly such cases of dependency clashes.

> Conflict jackson library with ElasticSearch connector
> -
>
> Key: FLINK-8318
> URL: https://issues.apache.org/jira/browse/FLINK-8318
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Startup Shell Scripts
>Affects Versions: 1.4.0
>Reporter: Jihyun Cho
>
> My flink job is failed after update flink version to 1.4.0. It uses 
> ElasticSearch connector.
> I'm using CDH Hadoop with Flink option "classloader.resolve-order: 
> parent-first" 
> The failure log is below.
> {noformat}
> Using the result of 'hadoop classpath' to augment the Hadoop classpath: 
> /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//*
> 2017-12-26 14:13:21,160 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  OS current 
> user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Current 
> Hadoop/Kerberos user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM: Java 
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Maximum heap 
> size: 31403 MiBytes
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JAVA_HOME: 
> (not set)
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
> version: 2.6.5
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM Options:
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xms32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xmx32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:MaxDirectMemorySize=8388607T
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Program 
> Arguments:
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> --configDir
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> /home/www/service/flink-1.4.0/conf
> 2017-12-26 14:13:21,449 INFO  
>