Hi Madhu, Great. Do you want to contribute it back via a GitHub pull request? If not that's also fine. We will try look into the 2.0 connector next week.
Best, Max On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <madhukar.th...@gmail.com> wrote: > i have created working connector for Elasticsearch 2.0 based on > elasticsearch-flink connector. I am using it right now but i want official > connector from flink. > > ElasticsearchSink.java > > > import org.apache.flink.api.java.utils.ParameterTool; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import java.net.InetAddress; > import java.net.UnknownHostException; > import java.util.List; > import java.util.Map; > import java.util.concurrent.atomic.AtomicBoolean; > import java.util.concurrent.atomic.AtomicReference; > > import org.elasticsearch.action.bulk.BulkItemResponse; > import org.elasticsearch.action.bulk.BulkProcessor; > import org.elasticsearch.action.bulk.BulkRequest; > import org.elasticsearch.action.bulk.BulkResponse; > import org.elasticsearch.action.index.IndexRequest; > import org.elasticsearch.client.Client; > import org.elasticsearch.client.transport.TransportClient; > import org.elasticsearch.cluster.node.DiscoveryNode; > import org.elasticsearch.common.settings.Settings; > import org.elasticsearch.common.transport.InetSocketTransportAddress; > import org.elasticsearch.common.unit.ByteSizeUnit; > import org.elasticsearch.common.unit.ByteSizeValue; > import org.elasticsearch.common.unit.TimeValue; > > > public class ElasticsearchSink<T> extends RichSinkFunction<T> { > > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = > "bulk.flush.max.actions"; > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = > "bulk.flush.max.size.mb"; > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = > "bulk.flush.interval.ms"; > > private static final long serialVersionUID = 1L; > private static final int DEFAULT_PORT = 9300; > private static final Logger LOG = > LoggerFactory.getLogger(ElasticsearchSink.class); > > /** > * The user specified config map that we forward to Elasticsearch when > we create the Client. > */ > private final Map<String, String> userConfig; > > /** > * The builder that is used to construct an {@link IndexRequest} from > the incoming element. > */ > private final IndexRequestBuilder<T> indexRequestBuilder; > > /** > * The Client that was either retrieved from a Node or is a > TransportClient. > */ > private transient Client client; > > /** > * Bulk processor that was created using the client > */ > private transient BulkProcessor bulkProcessor; > > /** > * This is set from inside the BulkProcessor listener if there where > failures in processing. > */ > private final AtomicBoolean hasFailure = new AtomicBoolean(false); > > /** > * This is set from inside the BulkProcessor listener if a Throwable was > thrown during processing. > */ > private final AtomicReference<Throwable> failureThrowable = new > AtomicReference<Throwable>(); > > public ElasticsearchSink(Map<String, String> userConfig, > IndexRequestBuilder<T> indexRequestBuilder) { > this.userConfig = userConfig; > this.indexRequestBuilder = indexRequestBuilder; > } > > > @Override > public void open(Configuration configuration) { > > ParameterTool params = ParameterTool.fromMap(userConfig); > Settings settings = Settings.settingsBuilder() > .put(userConfig) > .build(); > > TransportClient transportClient = > TransportClient.builder().settings(settings).build(); > for (String server : params.get("esHost").split(";")) > { > String[] components = server.trim().split(":"); > String host = components[0]; > int port = DEFAULT_PORT; > if (components.length > 1) > { > port = Integer.parseInt(components[1]); > } > > try { > transportClient = transportClient.addTransportAddress(new > InetSocketTransportAddress(InetAddress.getByName(host), port)); > } catch (UnknownHostException e) { > e.printStackTrace(); > } > } > > List<DiscoveryNode> nodes = transportClient.connectedNodes(); > if (nodes.isEmpty()) { > throw new RuntimeException("Client is not connected to any > Elasticsearch nodes!"); > } else { > if (LOG.isDebugEnabled()) { > LOG.info("Connected to nodes: " + nodes.toString()); > } > } > client = transportClient; > > BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder( > client, > new BulkProcessor.Listener() { > public void beforeBulk(long executionId, > BulkRequest request) { > > } > > public void afterBulk(long executionId, > BulkRequest request, > BulkResponse response) { > if (response.hasFailures()) { > for (BulkItemResponse itemResp : > response.getItems()) { > if (itemResp.isFailed()) { > LOG.error("Failed to index document in > Elasticsearch: " + itemResp.getFailureMessage()); > failureThrowable.compareAndSet(null, new > RuntimeException(itemResp.getFailureMessage())); > } > } > hasFailure.set(true); > } > } > > public void afterBulk(long executionId, > BulkRequest request, > Throwable failure) { > LOG.error(failure.getMessage()); > failureThrowable.compareAndSet(null, failure); > hasFailure.set(true); > } > }); > > // This makes flush() blocking > bulkProcessorBuilder.setConcurrentRequests(0); > > > > if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { > > bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); > } > > if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { > bulkProcessorBuilder.setBulkSize(new > ByteSizeValue(params.getInt( > CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); > } > > if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { > > bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); > } > > bulkProcessor = bulkProcessorBuilder.build(); > } > > > @Override > public void invoke(T element) { > IndexRequest indexRequest = > indexRequestBuilder.createIndexRequest(element, getRuntimeContext()); > > if (LOG.isDebugEnabled()) { > LOG.debug("Emitting IndexRequest: {}", indexRequest); > } > > bulkProcessor.add(indexRequest); > } > > @Override > public void close() { > if (bulkProcessor != null) { > bulkProcessor.close(); > bulkProcessor = null; > } > > if (client != null) { > client.close(); > } > > if (hasFailure.get()) { > Throwable cause = failureThrowable.get(); > if (cause != null) { > throw new RuntimeException("An error occured in > ElasticsearchSink.", cause); > } else { > throw new RuntimeException("An error occured in > ElasticsearchSink."); > > } > } > } > > } > > > In my Main Class: > > > Map<String, String> config = Maps.newHashMap(); > > //Elasticsearch Parameters > > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, > parameter.get("elasticsearch.bulk.flush.max.actions","1")); > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, > parameter.get("elasticsearch.bulk.flush.interval.ms","2")); > config.put("cluster.name", parameter.get("elasticsearch.cluster.name")); > config.put("esHost", parameter.get("elasticsearch.server", > "localhost:9300")); > > > DataStreamSink<String> elastic = messageStream.rebalance().addSink(new > ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element, > runtimeContext) -> { > String[] line = element.toLowerCase().split(" > +(?=(?:([^\"]*\"){2})*[^\"]*$)"); > String measureAndTags = line[0]; > String[] kvSplit = line[1].split("="); > String fieldName = kvSplit[0]; > String fieldValue = kvSplit[1]; > Map<String, String> tags = new HashMap<>(); > String measure = parseMeasureAndTags(measureAndTags, tags); > long time = (long) (Double.valueOf(line[2]) / 1000000); > > Map<String, Object> test = new HashMap<>(); > DateFormat dateFormat = new > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ"); > dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); > > test.put(fieldName, setValue(fieldValue)); > test.put("tags", tags); > test.put("measurement", measure); > test.put("@timestamp", dateFormat.format(new Date(time))); > > return Requests.indexRequest() > .index("metrics") > .type("test") > .source(new Gson().toJson(test).toLowerCase()); > > > })); > > > -Madhu > > > On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <m...@apache.org> wrote: >> >> Hi Madhu, >> >> Not yet. The API has changed slightly. We'll add one very soon. In the >> meantime I've created an issue to keep track of the status: >> >> https://issues.apache.org/jira/browse/FLINK-3115 >> >> Thanks, >> Max >> >> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota >> <madhukar.th...@gmail.com> wrote: >> > is current elasticsearch-flink connector support elasticsearch 2.x >> > version? >> > >> > -Madhu > >