[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173157#comment-16173157
 ] 

ASF GitHub Bot commented on BAHIR-134:
--

Github user asfgit closed the pull request at:

https://github.com/apache/bahir-flink/pull/21


> Add InfluxDB Sink for Flink Streaming
> -
>
> Key: BAHIR-134
> URL: https://issues.apache.org/jira/browse/BAHIR-134
> Project: Bahir
>  Issue Type: Wish
>  Components: Flink Streaming Connectors
>Reporter: Hai Zhou_UTC+8
>Assignee: Hai Zhou_UTC+8
>
> InfluxDBSink via implementation RichSinkFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming

2017-09-20 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173156#comment-16173156
 ] 

ASF subversion and git services commented on BAHIR-134:
---

Commit f07276eef2babc52ffdb43c5fcb76f9d51b9153f in bahir-flink's branch 
refs/heads/master from zhouhai02
[ https://git-wip-us.apache.org/repos/asf?p=bahir-flink.git;h=f07276e ]

[BAHIR-134] Add InfluxDb sink for flink stream

This closes #21


> Add InfluxDB Sink for Flink Streaming
> -
>
> Key: BAHIR-134
> URL: https://issues.apache.org/jira/browse/BAHIR-134
> Project: Bahir
>  Issue Type: Wish
>  Components: Flink Streaming Connectors
>Reporter: Hai Zhou_UTC+8
>Assignee: Hai Zhou_UTC+8
>
> InfluxDBSink via implementation RichSinkFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173121#comment-16173121
 ] 

ASF GitHub Bot commented on BAHIR-134:
--

Github user rmetzger commented on the issue:

https://github.com/apache/bahir-flink/pull/21
  
Thanks. I'll merge the change.


> Add InfluxDB Sink for Flink Streaming
> -
>
> Key: BAHIR-134
> URL: https://issues.apache.org/jira/browse/BAHIR-134
> Project: Bahir
>  Issue Type: Wish
>  Components: Flink Streaming Connectors
>Reporter: Hai Zhou_UTC+8
>Assignee: Hai Zhou_UTC+8
>
> InfluxDBSink via implementation RichSinkFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168973#comment-16168973
 ] 

ASF GitHub Bot commented on BAHIR-134:
--

Github user yew1eb commented on the issue:

https://github.com/apache/bahir-flink/pull/21
  
Hi @rmetzger, thanks for your review. I have updated the PR.
Best,
Hai Zhou
:beers:


> Add InfluxDB Sink for Flink Streaming
> -
>
> Key: BAHIR-134
> URL: https://issues.apache.org/jira/browse/BAHIR-134
> Project: Bahir
>  Issue Type: Wish
>  Components: Flink Streaming Connectors
>Reporter: Hai Zhou_UTC+8
>Assignee: Hai Zhou_UTC+8
>
> InfluxDBSink via implementation RichSinkFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168970#comment-16168970
 ] 

ASF GitHub Bot commented on BAHIR-134:
--

Github user yew1eb commented on a diff in the pull request:

https://github.com/apache/bahir-flink/pull/21#discussion_r139290380
  
--- Diff: 
flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.influxdb;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Point;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Sink to save data into a InfluxDB cluster.
+ */
+public class InfluxDBSink extends RichSinkFunction {
+
+private transient InfluxDB influxDB = null;
+private final String dbName;
+private final String username;
+private final String password;
+private final String host;
+private boolean batchEnabled = true;
+
+/**
+ * Creates a new {@link InfluxDBSink} that connects to the InfluxDB 
server.
+ *
+ * @param host the url to connect to.
+ * @param username the username which is used to authorize against the 
influxDB instance.
+ * @param password the password for the username which is used to 
authorize against the influxDB instance.
+ * @param dbName   the database to write to.
+ */
+public InfluxDBSink(String host, String username, String password, 
String dbName) {
+this.host = Preconditions.checkNotNull(host, "host can not be 
null");
+this.username = Preconditions.checkNotNull(username, "username can 
not be null");
+this.password = Preconditions.checkNotNull(password, "password can 
not be null");
+this.dbName = Preconditions.checkNotNull(dbName, "dbName can not 
be null");
+}
+
+public InfluxDBSink(String host, String username, String password, 
String dbName, boolean batchEnabled) {
+this(host, username, password, dbName);
+this.batchEnabled = Preconditions.checkNotNull(batchEnabled, 
"batchEnabled can not be null");
+}
+
+/**
+ * Initializes the connection to InfluxDB by either cluster or 
sentinels or single server.
+ */
+@Override
+public void open(Configuration parameters) throws Exception {
+super.open(parameters);
+
+influxDB = InfluxDBFactory.connect(host, username, password);
+if (!influxDB.databaseExists(dbName)) {
+influxDB.createDatabase(dbName);
--- End diff --

Thank you for review.
I will use `throw RuntimeException` instead of helping the user to create. 
Because `open` method will being execute by all parallel sink tasks.


> Add InfluxDB Sink for Flink Streaming
> -
>
> Key: BAHIR-134
> URL: https://issues.apache.org/jira/browse/BAHIR-134
> Project: Bahir
>  Issue Type: Wish
>  Components: Flink Streaming Connectors
>Reporter: Hai Zhou_UTC+8
>Assignee: Hai Zhou_UTC+8
>
> InfluxDBSink via implementation RichSinkFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168860#comment-16168860
 ] 

ASF GitHub Bot commented on BAHIR-134:
--

Github user rmetzger commented on the issue:

https://github.com/apache/bahir-flink/pull/21
  
Flink 1.3, the current release has still Java 7 support. But starting from 
Flink 1.4, Java 7 will be dropped.
I'll start a quick discussion on the dev@ list, to make sure nobody 
disagrees.


> Add InfluxDB Sink for Flink Streaming
> -
>
> Key: BAHIR-134
> URL: https://issues.apache.org/jira/browse/BAHIR-134
> Project: Bahir
>  Issue Type: Wish
>  Components: Flink Streaming Connectors
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>
> InfluxDBSink via implementation RichSinkFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168422#comment-16168422
 ] 

ASF GitHub Bot commented on BAHIR-134:
--

Github user mridulm commented on the issue:

https://github.com/apache/bahir-flink/pull/21
  
As of spark 2.2, java 7 support has been removed.
I think flink also requires java 8, right ?

Perhaps we can do the same for bahir


> Add InfluxDB Sink for Flink Streaming
> -
>
> Key: BAHIR-134
> URL: https://issues.apache.org/jira/browse/BAHIR-134
> Project: Bahir
>  Issue Type: Wish
>  Components: Flink Streaming Connectors
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>
> InfluxDBSink via implementation RichSinkFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168416#comment-16168416
 ] 

ASF GitHub Bot commented on BAHIR-134:
--

Github user rmetzger commented on the issue:

https://github.com/apache/bahir-flink/pull/21
  
Overall, I like the change.
The only problem is that it breaks the build because of missing Java 7 
support.

I guess we need to decide whether we want to drop java 7 support.


> Add InfluxDB Sink for Flink Streaming
> -
>
> Key: BAHIR-134
> URL: https://issues.apache.org/jira/browse/BAHIR-134
> Project: Bahir
>  Issue Type: Wish
>  Components: Flink Streaming Connectors
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>
> InfluxDBSink via implementation RichSinkFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168413#comment-16168413
 ] 

ASF GitHub Bot commented on BAHIR-134:
--

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/bahir-flink/pull/21#discussion_r139234041
  
--- Diff: 
flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.influxdb;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Point;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Sink to save data into a InfluxDB cluster.
+ */
+public class InfluxDBSink extends RichSinkFunction {
+
+private transient InfluxDB influxDB = null;
+private final String dbName;
+private final String username;
+private final String password;
+private final String host;
+private boolean batchEnabled = true;
+
+/**
+ * Creates a new {@link InfluxDBSink} that connects to the InfluxDB 
server.
+ *
+ * @param host the url to connect to.
+ * @param username the username which is used to authorize against the 
influxDB instance.
+ * @param password the password for the username which is used to 
authorize against the influxDB instance.
+ * @param dbName   the database to write to.
+ */
+public InfluxDBSink(String host, String username, String password, 
String dbName) {
+this.host = Preconditions.checkNotNull(host, "host can not be 
null");
+this.username = Preconditions.checkNotNull(username, "username can 
not be null");
+this.password = Preconditions.checkNotNull(password, "password can 
not be null");
+this.dbName = Preconditions.checkNotNull(dbName, "dbName can not 
be null");
+}
+
+public InfluxDBSink(String host, String username, String password, 
String dbName, boolean batchEnabled) {
+this(host, username, password, dbName);
+this.batchEnabled = Preconditions.checkNotNull(batchEnabled, 
"batchEnabled can not be null");
+}
+
+/**
+ * Initializes the connection to InfluxDB by either cluster or 
sentinels or single server.
+ */
+@Override
+public void open(Configuration parameters) throws Exception {
+super.open(parameters);
+
+influxDB = InfluxDBFactory.connect(host, username, password);
+if (!influxDB.databaseExists(dbName)) {
+influxDB.createDatabase(dbName);
+}
+
+if (batchEnabled) {
+// Flush every 2000 Points, at least every 100ms
+influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS);
--- End diff --

Ideally we'll make this configurable.


> Add InfluxDB Sink for Flink Streaming
> -
>
> Key: BAHIR-134
> URL: https://issues.apache.org/jira/browse/BAHIR-134
> Project: Bahir
>  Issue Type: Wish
>  Components: Flink Streaming Connectors
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>
> InfluxDBSink via implementation RichSinkFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168412#comment-16168412
 ] 

ASF GitHub Bot commented on BAHIR-134:
--

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/bahir-flink/pull/21#discussion_r139233921
  
--- Diff: 
flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.influxdb;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Point;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Sink to save data into a InfluxDB cluster.
+ */
+public class InfluxDBSink extends RichSinkFunction {
+
+private transient InfluxDB influxDB = null;
+private final String dbName;
+private final String username;
+private final String password;
+private final String host;
+private boolean batchEnabled = true;
+
+/**
+ * Creates a new {@link InfluxDBSink} that connects to the InfluxDB 
server.
+ *
+ * @param host the url to connect to.
+ * @param username the username which is used to authorize against the 
influxDB instance.
+ * @param password the password for the username which is used to 
authorize against the influxDB instance.
+ * @param dbName   the database to write to.
+ */
+public InfluxDBSink(String host, String username, String password, 
String dbName) {
+this.host = Preconditions.checkNotNull(host, "host can not be 
null");
+this.username = Preconditions.checkNotNull(username, "username can 
not be null");
+this.password = Preconditions.checkNotNull(password, "password can 
not be null");
+this.dbName = Preconditions.checkNotNull(dbName, "dbName can not 
be null");
+}
+
+public InfluxDBSink(String host, String username, String password, 
String dbName, boolean batchEnabled) {
+this(host, username, password, dbName);
+this.batchEnabled = Preconditions.checkNotNull(batchEnabled, 
"batchEnabled can not be null");
+}
+
+/**
+ * Initializes the connection to InfluxDB by either cluster or 
sentinels or single server.
+ */
+@Override
+public void open(Configuration parameters) throws Exception {
+super.open(parameters);
+
+influxDB = InfluxDBFactory.connect(host, username, password);
+if (!influxDB.databaseExists(dbName)) {
+influxDB.createDatabase(dbName);
--- End diff --

Maybe for the future: I would actually log an info message, stating that 
you've created the database.
I think its a good if code always tells the user when it is doing magic.


> Add InfluxDB Sink for Flink Streaming
> -
>
> Key: BAHIR-134
> URL: https://issues.apache.org/jira/browse/BAHIR-134
> Project: Bahir
>  Issue Type: Wish
>  Components: Flink Streaming Connectors
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>
> InfluxDBSink via implementation RichSinkFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming

2017-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143347#comment-16143347
 ] 

ASF GitHub Bot commented on BAHIR-134:
--

GitHub user yew1eb opened a pull request:

https://github.com/apache/bahir-flink/pull/21

[BAHIR-134] Add InfluxDB sink for flink stream

add InfluxDBSink for flink stream

## Verifying this change
Add example `InfluxDBSinkExample`
Runing environment:
flink version 1.3.0
influxdb version: 1.3.4
influxdb-java(influxdb client) version: 2.7(Compatible with InfluxDB 
version 0.9 ~ 1.3.x)

It works well.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/bahir-flink BAHIR-134

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/bahir-flink/pull/21.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21


commit 1249a8c26fa261719431a269f4b46203e748774f
Author: zhouhai02 
Date:   2017-08-27T11:35:31Z

add InfluxDb sink for flink stream




> Add InfluxDB Sink for Flink Streaming
> -
>
> Key: BAHIR-134
> URL: https://issues.apache.org/jira/browse/BAHIR-134
> Project: Bahir
>  Issue Type: Wish
>  Components: Flink Streaming Connectors
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>
> InfluxDBSink via implementation RichSinkFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)