[jira] [Commented] (BAHIR-32) Github Integration Tests
[ https://issues.apache.org/jira/browse/BAHIR-32?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377302#comment-15377302 ] ASF GitHub Bot commented on BAHIR-32: - Github user lresende commented on the issue: https://github.com/apache/bahir/pull/9 Great, this seem to be working now. > Github Integration Tests > > > Key: BAHIR-32 > URL: https://issues.apache.org/jira/browse/BAHIR-32 > Project: Bahir > Issue Type: Bug > Components: Build >Reporter: Luciano Resende >Assignee: Luciano Resende > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-35) Include Python code in the binary jars for use with "--packages ..."
[ https://issues.apache.org/jira/browse/BAHIR-35?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15390147#comment-15390147 ] ASF GitHub Bot commented on BAHIR-35: - Github user deroneriksson commented on the issue: https://github.com/apache/bahir/pull/11 LGTM @ckadner Building spark-streaming-mqtt with current master results in no *.py files in the artifact. After this PR, building spark-streaming-mqtt results in artifact with *.py files at root level of the artifact (__init__.py, dstream.py, and mqtt.py). > Include Python code in the binary jars for use with "--packages ..." > > > Key: BAHIR-35 > URL: https://issues.apache.org/jira/browse/BAHIR-35 > Project: Bahir > Issue Type: Task > Components: Build >Affects Versions: 2.0.0 >Reporter: Christian Kadner > Original Estimate: 8h > Remaining Estimate: 8h > > Currently, to make use the PySpark code (i.e streaming-mqtt/python) a user > will have to download the jar from Maven central or clone the code from > GitHub and then have to find individual *.py files, create a zip and add that > to the {{spark-submit}} command with the {{--py-files}} option, or, add them > to the {{PYTHONPATH}} when running locally. > If we include the Python code in the binary build (to the jar that gets > uploaded to Maven central), then users need not do any acrobatics besides > using the {{--packages ...}} option. > An example where the Python code is part of the binary jar is the > [GraphFrames|https://spark-packages.org/package/graphframes/graphframes] > package. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401593#comment-15401593 ] ASF GitHub Bot commented on BAHIR-39: - Github user ScrapCodes commented on the issue: https://github.com/apache/bahir/pull/13 @lresende If release can hold a bit, that would be good. > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401991#comment-15401991 ] ASF GitHub Bot commented on BAHIR-39: - Github user ScrapCodes commented on the issue: https://github.com/apache/bahir/pull/13 Potential issues with this source, 1) Not all MQTT options are exposed to end user. Not sure how useful they can be. 2) Message parser is not pluggable, this limits as to what a user can do with our source. 3) Currently persistence layer used create a file per message it stores, this can lead to serious problems if number of messages go very large. Too many files in a directory is not supported by all File systems (since it uses local filesystem). 4) I have not yet run a very long job with this source. And study the memory usage and so on. > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-29) Add documentation for streaming-mqtt connector
[ https://issues.apache.org/jira/browse/BAHIR-29?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402345#comment-15402345 ] ASF GitHub Bot commented on BAHIR-29: - GitHub user lresende opened a pull request: https://github.com/apache/bahir/pull/17 [BAHIR-29] Add basic documentation for MQTT Connector You can merge this pull request into a Git repository by running: $ git pull https://github.com/lresende/bahir doc-mqtt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir/pull/17.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17 commit 086784aa4177449c3f73380f72b057355e85015d Author: Luciano ResendeDate: 2016-08-01T16:18:35Z [BAHIR-29] Add basic documentation for MQTT Connector > Add documentation for streaming-mqtt connector > -- > > Key: BAHIR-29 > URL: https://issues.apache.org/jira/browse/BAHIR-29 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-31) Add documentation for streaming-zeromq connector
[ https://issues.apache.org/jira/browse/BAHIR-31?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402342#comment-15402342 ] ASF GitHub Bot commented on BAHIR-31: - GitHub user lresende opened a pull request: https://github.com/apache/bahir/pull/15 [BAHIR-31] Add basic documentation for ZeroMQ connector You can merge this pull request into a Git repository by running: $ git pull https://github.com/lresende/bahir doc-zeromq Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir/pull/15.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15 commit 0a6f8443ce6b6f53df565e35822ab178e03e4f96 Author: Luciano ResendeDate: 2016-08-01T16:21:24Z [BAHIR-31] Add basic documentation for ZeroMQ connector > Add documentation for streaming-zeromq connector > > > Key: BAHIR-31 > URL: https://issues.apache.org/jira/browse/BAHIR-31 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-38) Spark-submit does not use latest locally installed Bahir packages
[ https://issues.apache.org/jira/browse/BAHIR-38?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402362#comment-15402362 ] ASF GitHub Bot commented on BAHIR-38: - Github user asfgit closed the pull request at: https://github.com/apache/bahir/pull/14 > Spark-submit does not use latest locally installed Bahir packages > - > > Key: BAHIR-38 > URL: https://issues.apache.org/jira/browse/BAHIR-38 > Project: Bahir > Issue Type: Bug > Components: Build >Affects Versions: 2.0.0 > Environment: Maven (3.3.9) on Mac OS X >Reporter: Christian Kadner >Assignee: Christian Kadner > Fix For: 2.0.0 > > Original Estimate: 4h > Remaining Estimate: 4h > > We use {{`spark-submit --packages ...`}} to run Spark > with any of the Bahir extensions. > In order to perform a _manual integration test_ of a Bahir code change > developers have to _build_ the respective Bahir module and then _install_ it > into their *local Maven repository*. Then, when running {{`spark-submit > --packages ...`}} Spark will use *Ivy* to resolve the > given _maven-coordinates_ in order add the necessary jar files to the > classpath. > The first time Ivy encounters new maven coordinates, it will download them > from the local or remote Maven repository. All consecutive times Ivy will > just use the previously cached jar files based on group ID, artifact ID and > version, but irrespective of creation time stamp. > This behavior is fine when using spark-submit with released versions of Spark > packages. For continuous development and integration-testing however that Ivy > caching behavior poses a problem. > To *work around* it developers have to *clear the local Ivy cache* each time > they _install_ a new version of a Bahir package into their local Maven > repository and before the run spark-submit. > For example, to test a code change in module streaming-mqtt, we would have to > do ... > {code} > mvn clean install -pl streaming-mqtt > rm -rf ~/.ivy2/cache/org.apache.bahir/spark-streaming-mqtt_2.11/ > ${SPARK_HOME}/bin/spark-submit \ > --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.0.0-SNAPSHOT \ > test.py > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-30) Add documentation for streaming-twitter connector
[ https://issues.apache.org/jira/browse/BAHIR-30?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402343#comment-15402343 ] ASF GitHub Bot commented on BAHIR-30: - GitHub user lresende opened a pull request: https://github.com/apache/bahir/pull/16 [BAHIR-30] Add basic documentation for Twitter connector You can merge this pull request into a Git repository by running: $ git pull https://github.com/lresende/bahir doc-twitter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir/pull/16.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16 commit d2c84ff23eddbe50f93b884f4869a59d7929d992 Author: Luciano ResendeDate: 2016-08-01T16:20:20Z [BAHIR-30] Add basic documentation for Twitter connector > Add documentation for streaming-twitter connector > - > > Key: BAHIR-30 > URL: https://issues.apache.org/jira/browse/BAHIR-30 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-38) Spark-submit does not use latest locally installed Bahir packages
[ https://issues.apache.org/jira/browse/BAHIR-38?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402321#comment-15402321 ] ASF GitHub Bot commented on BAHIR-38: - Github user lresende commented on the issue: https://github.com/apache/bahir/pull/14 LGTM, merging soon > Spark-submit does not use latest locally installed Bahir packages > - > > Key: BAHIR-38 > URL: https://issues.apache.org/jira/browse/BAHIR-38 > Project: Bahir > Issue Type: Bug > Components: Build >Affects Versions: 2.0.0 > Environment: Maven (3.3.9) on Mac OS X >Reporter: Christian Kadner >Assignee: Christian Kadner > Original Estimate: 4h > Remaining Estimate: 4h > > We use {{`spark-submit --packages ...`}} to run Spark > with any of the Bahir extensions. > In order to perform a _manual integration test_ of a Bahir code change > developers have to _build_ the respective Bahir module and then _install_ it > into their *local Maven repository*. Then, when running {{`spark-submit > --packages ...`}} Spark will use *Ivy* to resolve the > given _maven-coordinates_ in order add the necessary jar files to the > classpath. > The first time Ivy encounters new maven coordinates, it will download them > from the local or remote Maven repository. All consecutive times Ivy will > just use the previously cached jar files based on group ID, artifact ID and > version, but irrespective of creation time stamp. > This behavior is fine when using spark-submit with released versions of Spark > packages. For continuous development and integration-testing however that Ivy > caching behavior poses a problem. > To *work around* it developers have to *clear the local Ivy cache* each time > they _install_ a new version of a Bahir package into their local Maven > repository and before the run spark-submit. > For example, to test a code change in module streaming-mqtt, we would have to > do ... > {code} > mvn clean install -pl streaming-mqtt > rm -rf ~/.ivy2/cache/org.apache.bahir/spark-streaming-mqtt_2.11/ > ${SPARK_HOME}/bin/spark-submit \ > --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.0.0-SNAPSHOT \ > test.py > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-28) Add documentation for streaming-akka connector
[ https://issues.apache.org/jira/browse/BAHIR-28?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402346#comment-15402346 ] ASF GitHub Bot commented on BAHIR-28: - GitHub user lresende opened a pull request: https://github.com/apache/bahir/pull/18 [BAHIR-28] Add basic documentation for Akka Connector You can merge this pull request into a Git repository by running: $ git pull https://github.com/lresende/bahir doc-akka Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir/pull/18.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18 commit eb7899b11b9e770e7a75774dccfb2110c851a267 Author: Luciano ResendeDate: 2016-08-01T16:17:02Z [BAHIR-28] Add basic documentation for Akka Connector > Add documentation for streaming-akka connector > -- > > Key: BAHIR-28 > URL: https://issues.apache.org/jira/browse/BAHIR-28 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405435#comment-15405435 ] ASF GitHub Bot commented on BAHIR-39: - Github user ScrapCodes commented on the issue: https://github.com/apache/bahir/pull/13 @lresende Can you please review this PR ? @mridulm Can you please take a look now and see if the concurrency issue that you sensed earlier exists ? > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-51) Add additional MQTT options/parameters to MQTTInputDStream and MqttStreamSource
[ https://issues.apache.org/jira/browse/BAHIR-51?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15416990#comment-15416990 ] ASF GitHub Bot commented on BAHIR-51: - GitHub user ScrapCodes opened a pull request: https://github.com/apache/bahir/pull/22 [BAHIR-51] MqttStreamSource supports a few important MqttConnectOptions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ScrapCodes/bahir BAHIR-51 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir/pull/22.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22 commit de08f208454f9a96381d5fac90afb4d33b6ef352 Author: Prashant SharmaDate: 2016-08-11T10:26:20Z [BAHIR-51] MqttStreamSource supports a few important MqttConnectOptions. > Add additional MQTT options/parameters to MQTTInputDStream and > MqttStreamSource > --- > > Key: BAHIR-51 > URL: https://issues.apache.org/jira/browse/BAHIR-51 > Project: Bahir > Issue Type: Improvement > Components: Spark Streaming Connectors, Spark Structured Streaming > Connectors >Reporter: Sebastian Woehrl > > We are using Spark Streaming in the automotive IOT environment with MQTT as > the data source. > For security reasons our MQTT broker is protected by username and password > (as is default for these kind of environments). At the moment it is not > possible to set username/password when creating an MQTT Receiver > (MQTTInputDStream or MqttStreamSource). > I propose that the MQTTInputDStream and MqttStreamSource be extended to > optionally allow setting the following mqtt options / parameters: > * username > * password > * clientId > * cleanSession > * QoS > * connectionTimeout > * keepAliveInterval > * mqttVersion > If this proposal meets your approval I am willing to create a pull request > with these changes implemented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-51) Add additional MQTT options/parameters to MQTTInputDStream and MqttStreamSource
[ https://issues.apache.org/jira/browse/BAHIR-51?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417334#comment-15417334 ] ASF GitHub Bot commented on BAHIR-51: - Github user swoehrl-mw commented on the issue: https://github.com/apache/bahir/pull/22 Change looks good to me. It adds all the important mqtt options and uses sensible defaults. > Add additional MQTT options/parameters to MQTTInputDStream and > MqttStreamSource > --- > > Key: BAHIR-51 > URL: https://issues.apache.org/jira/browse/BAHIR-51 > Project: Bahir > Issue Type: Improvement > Components: Spark Streaming Connectors, Spark Structured Streaming > Connectors >Reporter: Sebastian Woehrl > > We are using Spark Streaming in the automotive IOT environment with MQTT as > the data source. > For security reasons our MQTT broker is protected by username and password > (as is default for these kind of environments). At the moment it is not > possible to set username/password when creating an MQTT Receiver > (MQTTInputDStream or MqttStreamSource). > I propose that the MQTTInputDStream and MqttStreamSource be extended to > optionally allow setting the following mqtt options / parameters: > * username > * password > * clientId > * cleanSession > * QoS > * connectionTimeout > * keepAliveInterval > * mqttVersion > If this proposal meets your approval I am willing to create a pull request > with these changes implemented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-52) Update extension documentation formats for code sections
[ https://issues.apache.org/jira/browse/BAHIR-52?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415986#comment-15415986 ] ASF GitHub Bot commented on BAHIR-52: - Github user lresende closed the pull request at: https://github.com/apache/bahir/pull/21 > Update extension documentation formats for code sections > > > Key: BAHIR-52 > URL: https://issues.apache.org/jira/browse/BAHIR-52 > Project: Bahir > Issue Type: Bug >Reporter: Luciano Resende >Assignee: Luciano Resende > > The ```md format is not working properly for pure jekyll html generation, and > the tab seems to be the supported way in vanilla jekyll. We should update > Bahir extension readme to use the supported format. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-52) Update extension documentation formats for code sections
[ https://issues.apache.org/jira/browse/BAHIR-52?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415964#comment-15415964 ] ASF GitHub Bot commented on BAHIR-52: - Github user lresende commented on the issue: https://github.com/apache/bahir/pull/21 @deroneriksson This is what you suggested on the other PR, could you please take a quick look at it > Update extension documentation formats for code sections > > > Key: BAHIR-52 > URL: https://issues.apache.org/jira/browse/BAHIR-52 > Project: Bahir > Issue Type: Bug >Reporter: Luciano Resende >Assignee: Luciano Resende > > The ```md format is not working properly for pure jekyll html generation, and > the tab seems to be the supported way in vanilla jekyll. We should update > Bahir extension readme to use the supported format. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-52) Update extension documentation formats for code sections
[ https://issues.apache.org/jira/browse/BAHIR-52?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415960#comment-15415960 ] ASF GitHub Bot commented on BAHIR-52: - GitHub user lresende opened a pull request: https://github.com/apache/bahir/pull/21 [BAHIR-52] Update README.md formatting for source code Update source code paragraphs to use tabs instead of ``` which is the supported way in vanilla Jekyll. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lresende/bahir readme-formatting Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir/pull/21.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21 commit 6494c1fd3d26851f905bde281a692eb73c8de80f Author: Luciano ResendeDate: 2016-08-10T19:47:07Z [BAHIR-52] Update README.md formatting for source code Update source code paragraphs to use tabs instead of ``` which is the supported way in vanilla Jekyll. > Update extension documentation formats for code sections > > > Key: BAHIR-52 > URL: https://issues.apache.org/jira/browse/BAHIR-52 > Project: Bahir > Issue Type: Bug >Reporter: Luciano Resende >Assignee: Luciano Resende > > The ```md format is not working properly for pure jekyll html generation, and > the tab seems to be the supported way in vanilla jekyll. We should update > Bahir extension readme to use the supported format. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-52) Update extension documentation formats for code sections
[ https://issues.apache.org/jira/browse/BAHIR-52?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415979#comment-15415979 ] ASF GitHub Bot commented on BAHIR-52: - Github user deroneriksson commented on the issue: https://github.com/apache/bahir/pull/21 Formatting looks great for all README's. LGTM. > Update extension documentation formats for code sections > > > Key: BAHIR-52 > URL: https://issues.apache.org/jira/browse/BAHIR-52 > Project: Bahir > Issue Type: Bug >Reporter: Luciano Resende >Assignee: Luciano Resende > > The ```md format is not working properly for pure jekyll html generation, and > the tab seems to be the supported way in vanilla jekyll. We should update > Bahir extension readme to use the supported format. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15407153#comment-15407153 ] ASF GitHub Bot commented on BAHIR-39: - Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73461457 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var client: MqttClient = _ + + private def fetchLastProcessedOffset(): Int = { +Try(store.maxProcessedOffset) match { + case Success(x) => +log.info(s"Recovering from last stored offset $x") +x + case Failure(e) => 0 +} + } + + initialize() + private def initialize(): Unit = { + +client = new MqttClient(brokerUrl, clientId, persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +// This is required to support recovery. TODO: configurable ? +mqttConnectOptions.setCleanSession(false) + +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized { +initLock.await() // Wait for initialization to complete. +val temp = offset + 1 +messages.put(temp, messageParser(message.getPayload)) +offset = temp +log.trace(s"Message arrived, $topic_ $message") + } + + override def deliveryComplete(token: IMqttDeliveryToken): Unit = { + } + + override def connectionLost(cause: Throwable): Unit = { +log.warn("Connection to mqtt server lost.", cause) + } + + override def connectComplete(reconnect: Boolean, serverURI: String): Unit = { +log.info(s"Connect complete $serverURI. Is it a reconnect?: $reconnect") + } +} +client.setCallback(callback) +client.connect(mqttConnectOptions) +client.subscribe(topic) +// It is not possible to initialize offset without `client.connect` +offset = fetchLastProcessedOffset() --- End diff --
[jira] [Commented] (BAHIR-31) Add documentation for streaming-zeromq connector
[ https://issues.apache.org/jira/browse/BAHIR-31?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406464#comment-15406464 ] ASF GitHub Bot commented on BAHIR-31: - Github user lresende closed the pull request at: https://github.com/apache/bahir/pull/15 > Add documentation for streaming-zeromq connector > > > Key: BAHIR-31 > URL: https://issues.apache.org/jira/browse/BAHIR-31 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende >Assignee: Luciano Resende > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-29) Add documentation for streaming-mqtt connector
[ https://issues.apache.org/jira/browse/BAHIR-29?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406466#comment-15406466 ] ASF GitHub Bot commented on BAHIR-29: - Github user lresende closed the pull request at: https://github.com/apache/bahir/pull/17 > Add documentation for streaming-mqtt connector > -- > > Key: BAHIR-29 > URL: https://issues.apache.org/jira/browse/BAHIR-29 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende >Assignee: Luciano Resende > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406522#comment-15406522 ] ASF GitHub Bot commented on BAHIR-39: - Github user lresende commented on the issue: https://github.com/apache/bahir/pull/13 @ScrapCodes I added some comments, they should be easy to address. Also, I am ok if not all issues you mentioned above are finished before we commit this code, but I would appreciate if you could raise jiras to make sure we address them in the future. > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-30) Add documentation for streaming-twitter connector
[ https://issues.apache.org/jira/browse/BAHIR-30?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406465#comment-15406465 ] ASF GitHub Bot commented on BAHIR-30: - Github user lresende closed the pull request at: https://github.com/apache/bahir/pull/16 > Add documentation for streaming-twitter connector > - > > Key: BAHIR-30 > URL: https://issues.apache.org/jira/browse/BAHIR-30 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende >Assignee: Luciano Resende > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406487#comment-15406487 ] ASF GitHub Bot commented on BAHIR-39: - Github user lresende commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73405558 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") --- End diff -- Not thread safe, see comments below on calendar usage... > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406494#comment-15406494 ] ASF GitHub Bot commented on BAHIR-39: - Github user lresende commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73406376 --- Diff: sql-streaming-mqtt/README.md --- @@ -0,0 +1,121 @@ +A library for reading data from MQTT Servers using Spark SQL Streaming ( or Structured streaming.). + +## Linking + +Using SBT: + +```scala +libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.0.0" +``` + +Using Maven: + +```xml + +org.apache.bahir +spark-sql-streaming-mqtt_2.11 +2.0.0 + +``` + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + +``` +$ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.0.0 +``` + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards. + +## Examples + +A SQL Stream can be created with data streams received through MQTT Server using, + +```scala +sqlContext.readStream + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", "mytopic") + .load("tcp://localhost:1883") + +``` + +## Enable recovering from failures. + +Setting values for option `localStorage` and `clientId` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown. + +```scala +sqlContext.readStream + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", "mytopic") + .option("localStorage", "/path/to/localdir") + .option("clientId", "some-client-id") + .load("tcp://localhost:1883") + +``` + +### Scala API + +An example, for scala API to count words from incoming message stream. + +```scala +// Create DataFrame representing the stream of input lines from connection to mqtt server +val lines = spark.readStream + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", topic) + .load(brokerUrl).as[(String, Timestamp)] + +// Split the lines into words +val words = lines.map(_._1).flatMap(_.split(" ")) + +// Generate running word count +val wordCounts = words.groupBy("value").count() + +// Start running the query that prints the running counts to the console +val query = wordCounts.writeStream + .outputMode("complete") + .format("console") + .start() + +query.awaitTermination() + +``` +Please see `MQTTStreamWordCount.scala` for full example. + +### Java API + +An example, for Java API to count words from incoming message stream. + +```java + +// Create DataFrame representing the stream of input lines from connection to mqtt server. +Dataset lines = spark +.readStream() + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") +.option("topic", topic) +.load(brokerUrl).select("value").as(Encoders.STRING()); + +// Split the lines into words +Dataset words = lines.flatMap(new FlatMapFunction() { +@Override +public Iterator call(String x) { +return Arrays.asList(x.split(" ")).iterator(); +} +}, Encoders.STRING()); + +// Generate running word count +Dataset wordCounts = words.groupBy("value").count(); + +// Start running the query that prints the running counts to the console +StreamingQuery query = wordCounts.writeStream() +.outputMode("complete") +.format("console") +.start(); + +query.awaitTermination(); +``` + +Please see `JavaMQTTStreamWordCount.java` for full example. --- End diff -- Please add a link to the example directory. Also, the example file name is MQTTStreamWordCount.java > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL:
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406625#comment-15406625 ] ASF GitHub Bot commented on BAHIR-39: - Github user frreiss commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73422375 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var client: MqttClient = _ + + private def fetchLastProcessedOffset(): Int = { +Try(store.maxProcessedOffset) match { + case Success(x) => +log.info(s"Recovering from last stored offset $x") +x + case Failure(e) => 0 +} + } + + initialize() + private def initialize(): Unit = { + +client = new MqttClient(brokerUrl, clientId, persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +// This is required to support recovery. TODO: configurable ? +mqttConnectOptions.setCleanSession(false) + +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized { +initLock.await() // Wait for initialization to complete. +val temp = offset + 1 +messages.put(temp, messageParser(message.getPayload)) +offset = temp +log.trace(s"Message arrived, $topic_ $message") + } + + override def deliveryComplete(token: IMqttDeliveryToken): Unit = { --- End diff -- I feel like this callback probably needs to do something to ensure that there are no duplicate or out of order messages in the message buffer. What is the interaction between these two callbacks (messageArrived/deliveryComplete) and the different QoS levels in MQTT? > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions:
[jira] [Commented] (BAHIR-51) Add additional MQTT options/parameters to MQTTInputDStream and MqttStreamSource
[ https://issues.apache.org/jira/browse/BAHIR-51?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15418377#comment-15418377 ] ASF GitHub Bot commented on BAHIR-51: - Github user ScrapCodes commented on the issue: https://github.com/apache/bahir/pull/22 Thanks @swoehrl-mw, @lresende Please take a look ! > Add additional MQTT options/parameters to MQTTInputDStream and > MqttStreamSource > --- > > Key: BAHIR-51 > URL: https://issues.apache.org/jira/browse/BAHIR-51 > Project: Bahir > Issue Type: Improvement > Components: Spark Streaming Connectors, Spark Structured Streaming > Connectors >Reporter: Sebastian Woehrl > > We are using Spark Streaming in the automotive IOT environment with MQTT as > the data source. > For security reasons our MQTT broker is protected by username and password > (as is default for these kind of environments). At the moment it is not > possible to set username/password when creating an MQTT Receiver > (MQTTInputDStream or MqttStreamSource). > I propose that the MQTTInputDStream and MqttStreamSource be extended to > optionally allow setting the following mqtt options / parameters: > * username > * password > * clientId > * cleanSession > * QoS > * connectionTimeout > * keepAliveInterval > * mqttVersion > If this proposal meets your approval I am willing to create a pull request > with these changes implemented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-51) Add additional MQTT options/parameters to MQTTInputDStream and MqttStreamSource
[ https://issues.apache.org/jira/browse/BAHIR-51?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15420265#comment-15420265 ] ASF GitHub Bot commented on BAHIR-51: - GitHub user swoehrl-mw opened a pull request: https://github.com/apache/bahir/pull/23 [BAHIR-51] Add important mqtt options to MQTTInputDStream You can merge this pull request into a Git repository by running: $ git pull https://github.com/swoehrl-mw/bahir bahir-51 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir/pull/23.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23 commit ab0d69a60f908af91a57f3ac00f4fb668ec64f0d Author: Sebastian WoehrlDate: 2016-08-13T13:00:13Z [BAHIR-51] Add important mqtt options to MQTTInputDStream > Add additional MQTT options/parameters to MQTTInputDStream and > MqttStreamSource > --- > > Key: BAHIR-51 > URL: https://issues.apache.org/jira/browse/BAHIR-51 > Project: Bahir > Issue Type: Improvement > Components: Spark Streaming Connectors, Spark Structured Streaming > Connectors >Reporter: Sebastian Woehrl > > We are using Spark Streaming in the automotive IOT environment with MQTT as > the data source. > For security reasons our MQTT broker is protected by username and password > (as is default for these kind of environments). At the moment it is not > possible to set username/password when creating an MQTT Receiver > (MQTTInputDStream or MqttStreamSource). > I propose that the MQTTInputDStream and MqttStreamSource be extended to > optionally allow setting the following mqtt options / parameters: > * username > * password > * clientId > * cleanSession > * QoS > * connectionTimeout > * keepAliveInterval > * mqttVersion > If this proposal meets your approval I am willing to create a pull request > with these changes implemented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15409006#comment-15409006 ] ASF GitHub Bot commented on BAHIR-39: - Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73649345 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var client: MqttClient = _ + + private def fetchLastProcessedOffset(): Int = { +Try(store.maxProcessedOffset) match { + case Success(x) => +log.info(s"Recovering from last stored offset $x") +x + case Failure(e) => 0 +} + } + + initialize() + private def initialize(): Unit = { + +client = new MqttClient(brokerUrl, clientId, persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +// This is required to support recovery. TODO: configurable ? +mqttConnectOptions.setCleanSession(false) + +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized { +initLock.await() // Wait for initialization to complete. +val temp = offset + 1 +messages.put(temp, messageParser(message.getPayload)) +offset = temp +log.trace(s"Message arrived, $topic_ $message") + } + + override def deliveryComplete(token: IMqttDeliveryToken): Unit = { --- End diff -- This is never called in case of a receiver only client. > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410031#comment-15410031 ] ASF GitHub Bot commented on BAHIR-39: - Github user lresende commented on the issue: https://github.com/apache/bahir/pull/13 LGTM Merging if there is no more comments > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410011#comment-15410011 ] ASF GitHub Bot commented on BAHIR-39: - Github user lresende commented on the issue: https://github.com/apache/bahir/pull/13 Based on the discussion at [spark-dev](https://www.mail-archive.com/dev@spark.apache.org/msg15209.html) it seems that some of the issues raised by @frreiss are current limitations of Spark Structured Streaming. Based on that, I think we are good to merge this, and address enhancements with new jira/prs and we might need to coordinate with Spark runtime changes required... > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410385#comment-15410385 ] ASF GitHub Bot commented on BAHIR-39: - Github user ScrapCodes commented on the issue: https://github.com/apache/bahir/pull/13 Hi Fred and Luciano, I have gone through the discussion on spark-dev. In Spark 2.0, they need only upto one previous batch. Can we use this information to do cleanup, like maybe data older than two batches ? Then we can say that this connector is Spark 2.0 only, due to the limitation we have (JIRA id ). > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410177#comment-15410177 ] ASF GitHub Bot commented on BAHIR-39: - Github user frreiss commented on the issue: https://github.com/apache/bahir/pull/13 LGTM. > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-46) Handle re-delivery of message in MQTT structured streaming source.
[ https://issues.apache.org/jira/browse/BAHIR-46?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15413387#comment-15413387 ] ASF GitHub Bot commented on BAHIR-46: - Github user ScrapCodes commented on the issue: https://github.com/apache/bahir/pull/19 @frreiss I have tried to handle re-delivery case and now message is stored to disk as soon as it arrives. > Handle re-delivery of message in MQTT structured streaming source. > -- > > Key: BAHIR-46 > URL: https://issues.apache.org/jira/browse/BAHIR-46 > Project: Bahir > Issue Type: Bug >Reporter: Prashant Sharma > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-51) Add additional MQTT options/parameters to MQTTInputDStream and MqttStreamSource
[ https://issues.apache.org/jira/browse/BAHIR-51?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15422342#comment-15422342 ] ASF GitHub Bot commented on BAHIR-51: - Github user ScrapCodes commented on the issue: https://github.com/apache/bahir/pull/22 Thanks Luciano ! > Add additional MQTT options/parameters to MQTTInputDStream and > MqttStreamSource > --- > > Key: BAHIR-51 > URL: https://issues.apache.org/jira/browse/BAHIR-51 > Project: Bahir > Issue Type: Improvement > Components: Spark Streaming Connectors, Spark Structured Streaming > Connectors >Reporter: Sebastian Woehrl > > We are using Spark Streaming in the automotive IOT environment with MQTT as > the data source. > For security reasons our MQTT broker is protected by username and password > (as is default for these kind of environments). At the moment it is not > possible to set username/password when creating an MQTT Receiver > (MQTTInputDStream or MqttStreamSource). > I propose that the MQTTInputDStream and MqttStreamSource be extended to > optionally allow setting the following mqtt options / parameters: > * username > * password > * clientId > * cleanSession > * QoS > * connectionTimeout > * keepAliveInterval > * mqttVersion > If this proposal meets your approval I am willing to create a pull request > with these changes implemented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-51) Add additional MQTT options/parameters to MQTTInputDStream and MqttStreamSource
[ https://issues.apache.org/jira/browse/BAHIR-51?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15421727#comment-15421727 ] ASF GitHub Bot commented on BAHIR-51: - Github user lresende commented on the issue: https://github.com/apache/bahir/pull/22 LGTM > Add additional MQTT options/parameters to MQTTInputDStream and > MqttStreamSource > --- > > Key: BAHIR-51 > URL: https://issues.apache.org/jira/browse/BAHIR-51 > Project: Bahir > Issue Type: Improvement > Components: Spark Streaming Connectors, Spark Structured Streaming > Connectors >Reporter: Sebastian Woehrl > > We are using Spark Streaming in the automotive IOT environment with MQTT as > the data source. > For security reasons our MQTT broker is protected by username and password > (as is default for these kind of environments). At the moment it is not > possible to set username/password when creating an MQTT Receiver > (MQTTInputDStream or MqttStreamSource). > I propose that the MQTTInputDStream and MqttStreamSource be extended to > optionally allow setting the following mqtt options / parameters: > * username > * password > * clientId > * cleanSession > * QoS > * connectionTimeout > * keepAliveInterval > * mqttVersion > If this proposal meets your approval I am willing to create a pull request > with these changes implemented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-51) Add additional MQTT options/parameters to MQTTInputDStream and MqttStreamSource
[ https://issues.apache.org/jira/browse/BAHIR-51?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15421726#comment-15421726 ] ASF GitHub Bot commented on BAHIR-51: - Github user lresende commented on the issue: https://github.com/apache/bahir/pull/22 Never mind, #23 is taking care of these changes on the DStream MQTT extension. > Add additional MQTT options/parameters to MQTTInputDStream and > MqttStreamSource > --- > > Key: BAHIR-51 > URL: https://issues.apache.org/jira/browse/BAHIR-51 > Project: Bahir > Issue Type: Improvement > Components: Spark Streaming Connectors, Spark Structured Streaming > Connectors >Reporter: Sebastian Woehrl > > We are using Spark Streaming in the automotive IOT environment with MQTT as > the data source. > For security reasons our MQTT broker is protected by username and password > (as is default for these kind of environments). At the moment it is not > possible to set username/password when creating an MQTT Receiver > (MQTTInputDStream or MqttStreamSource). > I propose that the MQTTInputDStream and MqttStreamSource be extended to > optionally allow setting the following mqtt options / parameters: > * username > * password > * clientId > * cleanSession > * QoS > * connectionTimeout > * keepAliveInterval > * mqttVersion > If this proposal meets your approval I am willing to create a pull request > with these changes implemented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-51) Add additional MQTT options/parameters to MQTTInputDStream and MqttStreamSource
[ https://issues.apache.org/jira/browse/BAHIR-51?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15421720#comment-15421720 ] ASF GitHub Bot commented on BAHIR-51: - Github user lresende commented on the issue: https://github.com/apache/bahir/pull/22 Would these also apply to the DStream MQTT extension ? if so, should we try to maintain the same level of MQTT supported functionality between the two ? > Add additional MQTT options/parameters to MQTTInputDStream and > MqttStreamSource > --- > > Key: BAHIR-51 > URL: https://issues.apache.org/jira/browse/BAHIR-51 > Project: Bahir > Issue Type: Improvement > Components: Spark Streaming Connectors, Spark Structured Streaming > Connectors >Reporter: Sebastian Woehrl > > We are using Spark Streaming in the automotive IOT environment with MQTT as > the data source. > For security reasons our MQTT broker is protected by username and password > (as is default for these kind of environments). At the moment it is not > possible to set username/password when creating an MQTT Receiver > (MQTTInputDStream or MqttStreamSource). > I propose that the MQTTInputDStream and MqttStreamSource be extended to > optionally allow setting the following mqtt options / parameters: > * username > * password > * clientId > * cleanSession > * QoS > * connectionTimeout > * keepAliveInterval > * mqttVersion > If this proposal meets your approval I am willing to create a pull request > with these changes implemented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-75) WebHDFS: Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15848521#comment-15848521 ] ASF GitHub Bot commented on BAHIR-75: - Github user sourav-mazumder commented on the issue: https://github.com/apache/bahir/pull/28 @ckadner Here goes my response to your comments > Can you elaborate on differences/limitations/advantages over Hadoop default "webhdfs" scheme? i.e. the main problem you are working around it that the Hadoop WebHdfsFileSystem discards Knox gateway path when creating Http URL (principal motivation for this connector) which makes it impossible to use it with Knox Yes > the Hadoop WebHdfsFileSystem implements additional interfaces like: DelegationTokenRenewer.Renewable TokenAspect.TokenManagementDelegator This is automatically taken care of by Apache Knox, in my understanding. That is one of the key goals of Apache Knox to relieve hadoop clients from nitigrity of internal security implementation of a hadoop Cluster. So we don't need to handle this at the code in client level if the webhdfs request is passing through Apache Knox. > performance differences between your approach vs Hadoop's RemoteFS and WebHDFS Say a remote Spark cluster needs to read a file of size 2 GB and the Spark Cluster spawns 16 connections in parallel to do the same. So in turn 16 separate webhdfs calls are made to remote hdfs. However, though each call tries to read the data from different starting point, for each of them the end byte is the end of file. So first connection creates input stream corresponding to 0th byte till end of file, second from 128MB till end of file, the 3rd from 256 MB till and of file and so on. As a result of that the amount of data prepared in the server side for sending as response, the data transferred over the wire, and the data being read by the client side can potentially be much more than the original file size (in this example of 2 GB worth of original file it can potentially be close to 17 GB). This number would increase further more with more number of connections. For larger file size the extent of increase would be further higher too. In the approach used in this PR, for the above example, the total volume of data read and transferred over the wire will be always limited to 2 GB and some extra KBs (for record boundary resolution). This number will increase to a very less extent (still in KBs range) for more number of connections. And this increment will not depend on file size. So if a big volume of file (in GBs) has to be read with high number of connections in parallel the amount of data being processed at server side, transferred over the wire, and read at client side would be always limited to original file size and some extra KBs (for record boundary resolution). > Configuration Some configuration parameters are specific to remote servers that should be specified by server not on connector level (some at server level may override connector level), i.e. Server level: gateway path (assuming one Knox gateway per server) user name and password authentication method (think Kerberos etc) Connector level: certificate validation options (maybe overridden by server level props) trustStore path webhdfs protocol version (maybe overridden by server level props) buffer sizes and file chunk sizes retry intervals etc You are right. However, I would put the 2 levels as Server Level and File Level. Some parameters won't change from file to file - they are specific to a remote hdfs server and therefore Server level parameters. Where as value of some parameters can be different from file to file. These are File level parameters. The Server Level parameters are - Gateway Path, User Name/Pasword, Webhdfs protocol version, Certificate Validation option (and other parameters associated with that). Where as File Level parameters are buffer sizes, file chunks sizes etc which can be different from File to File. I don't see need for any property at connector level (the parameters which which would be same across different remote hdfs servers accessed by the connector). All properties here are related to either the nature of implementation of the remote HDFS server or the type of file being accessed. Let me know if I'm missing out any aspect here. > Usability Given that users need to know about the remote Hadoop server configuration (security, gateway path, etc) for WebHDFS access would it be nicer if ... users could separately configure server specific properties in a config file or registry object and then in Spark jobs only use :/ without having to provide additional properties That's a good idea. We can have a set of default values for these parameters based on typical
[jira] [Commented] (BAHIR-85) Redis Sink Connector should allow update of command without reinstatiation
[ https://issues.apache.org/jira/browse/BAHIR-85?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839511#comment-15839511 ] ASF GitHub Bot commented on BAHIR-85: - Github user rmetzger commented on the issue: https://github.com/apache/bahir-flink/pull/9 I don't think that this change will allow you to update the key while the redis sink is running. > Redis Sink Connector should allow update of command without reinstatiation > --- > > Key: BAHIR-85 > URL: https://issues.apache.org/jira/browse/BAHIR-85 > Project: Bahir > Issue Type: Improvement > Components: Flink Streaming Connectors >Reporter: Atharva Inamdar > > ref: FLINK-5478 > `getCommandDescription()` gets called when RedisSink is instantiated. This > happens only once and thus doesn't allow the command to be updated during run > time. > Use Case: > As a dev I want to store some data by day. So each key will have some date > specified. this will change over course of time. for example: > `counts_for_148426560` for 2017-01-13. This is not limited to any > particular command. > connector: > https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java#L114 > I wish `getCommandDescription()` could be called in `invoke()` so that the > key can be updated without having to restart. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-84) Build log flooded with test log messages
[ https://issues.apache.org/jira/browse/BAHIR-84?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15826989#comment-15826989 ] ASF GitHub Bot commented on BAHIR-84: - Github user lresende commented on the issue: https://github.com/apache/bahir/pull/33 @ckadner Could you please reduce the size of the pr title or commit title before you try to merge. > Build log flooded with test log messages > > > Key: BAHIR-84 > URL: https://issues.apache.org/jira/browse/BAHIR-84 > Project: Bahir > Issue Type: Test > Components: Spark Structured Streaming Connectors > Environment: Mac OS X >Reporter: Christian Kadner >Assignee: Christian Kadner >Priority: Minor > > The maven build log/console gets flooded with INFO messages from > {{org.apache.parquet.hadoop.*}} during the {{test}} phase of module > {{sql-streaming-mqtt}} . This makes it hard to find actual problems and test > results especially when the log messages intersect with build and test status > messages throwing off line breaks etc. > *Excerpt of build log:* > {code:title=$ mvn clean package} > ... > Discovery completed in 293 milliseconds. > Run starting. Expected test count is: 7 > BasicMQTTSourceSuite: > - basic usage > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: > Compression: SNAPPY > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: > Compression: SNAPPY > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: > Compression: SNAPPY > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: > Compression: SNAPPY > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet dictionary page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Dictionary is on > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet dictionary page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > ... > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet dictionary page size to 1048576 > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Dictionary is on > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Validation is o- Send and receive 100 messages. > - no server up > - params not provided. > - Recovering offset from the last processed offset. !!! IGNORED !!! > StressTestMQTTSource: > - Send and receive messages of size 250MB. !!! IGNORED !!! > LocalMessageStoreSuite: > - serialize and deserialize > - Store and retreive > - Max offset stored > MQTTStreamSourceSuite: > Run completed in 20 seconds, 622 milliseconds. > Total number of tests run: 7 > Suites: completed 5, aborted 0 > Tests: succeeded 7, failed 0, canceled 0, ignored 2, pending 0 > All tests passed. > ff > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Writer version is: PARQUET_1_0 > Jan 11, 2017 11:06:03 PM INFO: > org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem > columnStore to file. allocated memory: 48 > Jan 11, 2017 11:06:03 PM INFO: > org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem > columnStore to file. allocated memory: 48 > Jan 11, 2017 11:06:03 PM INFO: > org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 109B for [value] > BINARY: 1 values, 34B raw, 36B comp, 1 pages, encodings: [RLE, PLAIN, > BIT_PACKED] > Jan 11, 2017 11:06:03 PM INFO: > org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 59B for > [timestamp] INT96: 1 values, 8B raw, 10B comp, 1 pages, encodings: [RLE, >
[jira] [Commented] (BAHIR-54) Create initial directory structure in bahir-flink.git
[ https://issues.apache.org/jira/browse/BAHIR-54?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428046#comment-15428046 ] ASF GitHub Bot commented on BAHIR-54: - GitHub user rmetzger opened a pull request: https://github.com/apache/bahir-flink/pull/1 [BAHIR-54][BAHIR-55] Add Redis connector from Flink - drop Scala support from the poms. All Flink connectors are implemented in Java. - Add `.travis.yml` file to automatically test pushes. See here: https://travis-ci.org/rmetzger/bahir-flink/builds/153536694 Once this PR is merged, I can ask INFRA to enable travis for the repository, so that new PRs and pushes are verified - Add `flink-redis-connector` to Bahir. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/bahir-flink bahir55 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir-flink/pull/1.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1 commit ce4bb80585a42522f6df732a8fee3281890513d9 Author: Robert MetzgerDate: 2016-08-19T08:33:11Z [BAHIR-54] Add .travis.yml file commit 8851d6a165e2c89df8590a70f02d516c258cd399 Author: Robert Metzger Date: 2016-08-19T08:54:53Z [BAHIR-54] Remove scala support from parent pom commit 14d508a045a9e82679c7ded43eb3ed8ae199d6e3 Author: Robert Metzger Date: 2016-08-19T08:55:30Z [BAHIR-55] Add Redis connector from Flink commit 860890bff5c0aae1e492e6e908d0bc2377912d44 Author: Robert Metzger Date: 2016-08-19T08:58:19Z [BAHIR-54] Drop enforced maven version > Create initial directory structure in bahir-flink.git > - > > Key: BAHIR-54 > URL: https://issues.apache.org/jira/browse/BAHIR-54 > Project: Bahir > Issue Type: Task >Reporter: Robert Metzger > > As per INFRA-12440, the bahir-flink repository has been created. > We need to set up the initial directory structure in that repository (license > file, maven pom, ...) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-54) Create initial directory structure in bahir-flink.git
[ https://issues.apache.org/jira/browse/BAHIR-54?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430425#comment-15430425 ] ASF GitHub Bot commented on BAHIR-54: - Github user rmetzger commented on the issue: https://github.com/apache/bahir-flink/pull/1 Thank you for merging! I requested travis: https://issues.apache.org/jira/browse/INFRA-12465 > Create initial directory structure in bahir-flink.git > - > > Key: BAHIR-54 > URL: https://issues.apache.org/jira/browse/BAHIR-54 > Project: Bahir > Issue Type: Task > Components: Flink Streaming Connectors >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: Flink-0.1 > > > As per INFRA-12440, the bahir-flink repository has been created. > We need to set up the initial directory structure in that repository (license > file, maven pom, ...) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-46) Handle re-delivery of message in MQTT structured streaming source.
[ https://issues.apache.org/jira/browse/BAHIR-46?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15431536#comment-15431536 ] ASF GitHub Bot commented on BAHIR-46: - Github user lresende commented on the issue: https://github.com/apache/bahir/pull/19 @ScrapCodes, Is this still a WIP? > Handle re-delivery of message in MQTT structured streaming source. > -- > > Key: BAHIR-46 > URL: https://issues.apache.org/jira/browse/BAHIR-46 > Project: Bahir > Issue Type: Bug >Reporter: Prashant Sharma > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-91) Upgrade bahir-flink to support Flink 1.2
[ https://issues.apache.org/jira/browse/BAHIR-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889880#comment-15889880 ] ASF GitHub Bot commented on BAHIR-91: - Github user rmetzger commented on the issue: https://github.com/apache/bahir-flink/pull/11 I think the travis tests will fail because they are overwriting the Flink version. I have a constant there for the Flink version. My initial plan was to support multiple Flink versions at the same time. Do you think there is an easy way to still support 1.1. and 1.2 with the Bahir connectors? > Upgrade bahir-flink to support Flink 1.2 > > > Key: BAHIR-91 > URL: https://issues.apache.org/jira/browse/BAHIR-91 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors >Affects Versions: Flink-0.1 > Environment: Ubuntu Yakkety Yak >Reporter: Markus Müller > Fix For: Flink-0.1 > > > After working around https://issues.apache.org/jira/browse/FLINK-4813 by > specifying Flink version 1.3-SNAPSHOT instead of 1.2.0 in the bahir-flink pom > file, I get the following compiler errors for the ActiveMQ connector: > ERROR] > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[32,33] > error: cannot find symbol > [ERROR] symbol: class ForkableFlinkMiniCluster > location: package org.apache.flink.test.util > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[54,19] > error: cannot find symbol > [ERROR] symbol: class ForkableFlinkMiniCluster > location: class ActiveMQConnectorITCase > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[66,20] > error: cannot find symbol > [ERROR] symbol: class ForkableFlinkMiniCluster > location: class ActiveMQConnectorITCase > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[216,22] > error: method snapshotState in class > MessageAcknowledgingSourceBasecannot be applied to given types; > [ERROR] required: FunctionSnapshotContext > found: long,long > reason: actual and formal argument lists differ in length > where Type,UId are type-variables: > Type extends Object declared in class MessageAcknowledgingSourceBase > UId extends Object declared in class MessageAcknowledgingSourceBase > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[229,4] > error: ActiveMQConnectorITCase.TestSourceContext is not abstract and does > not override abstract method markAsTemporarilyIdle() in SourceContext > Excluding ActiveMQ from compilation leads to the next issue in the > Akka-Connector: > [ERROR] > /home/markus/src/bahir-flink/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java:[210,17] > error: DummySourceContext is not abstract and does not override abstract > method markAsTemporarilyIdle() in SourceContext > [INFO] 1 error -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-94) Apache Flink extension READMEs should include link to Flink's "Linking with Optional Modules" doc
[ https://issues.apache.org/jira/browse/BAHIR-94?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889950#comment-15889950 ] ASF GitHub Bot commented on BAHIR-94: - Github user rmetzger commented on the issue: https://github.com/apache/bahir-flink/pull/12 I'm merging this one as well > Apache Flink extension READMEs should include link to Flink's "Linking with > Optional Modules" doc > - > > Key: BAHIR-94 > URL: https://issues.apache.org/jira/browse/BAHIR-94 > Project: Bahir > Issue Type: Improvement > Components: Flink Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > > Without this link, Bahir users will most likely bump into questions when > trying to include the connector for Flink cluster execution. > We should also try to improve the overall doc quality of the Flink extensions > for the 1.0 release. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-94) Apache Flink extension READMEs should include link to Flink's "Linking with Optional Modules" doc
[ https://issues.apache.org/jira/browse/BAHIR-94?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889953#comment-15889953 ] ASF GitHub Bot commented on BAHIR-94: - Github user asfgit closed the pull request at: https://github.com/apache/bahir-flink/pull/12 > Apache Flink extension READMEs should include link to Flink's "Linking with > Optional Modules" doc > - > > Key: BAHIR-94 > URL: https://issues.apache.org/jira/browse/BAHIR-94 > Project: Bahir > Issue Type: Improvement > Components: Flink Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > Fix For: Flink-0.1 > > > Without this link, Bahir users will most likely bump into questions when > trying to include the connector for Flink cluster execution. > We should also try to improve the overall doc quality of the Flink extensions > for the 1.0 release. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-91) Upgrade bahir-flink to support Flink 1.2
[ https://issues.apache.org/jira/browse/BAHIR-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890097#comment-15890097 ] ASF GitHub Bot commented on BAHIR-91: - Github user tzulitai commented on the issue: https://github.com/apache/bahir-flink/pull/11 @rmetzger I don't think there's an easy way to do this, unless we try to make the affected connectors backwards compatible. Just rebased on {{master}} to resolve conflicts. > Upgrade bahir-flink to support Flink 1.2 > > > Key: BAHIR-91 > URL: https://issues.apache.org/jira/browse/BAHIR-91 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors >Affects Versions: Flink-0.1 > Environment: Ubuntu Yakkety Yak >Reporter: Markus Müller > Fix For: Flink-0.1 > > > After working around https://issues.apache.org/jira/browse/FLINK-4813 by > specifying Flink version 1.3-SNAPSHOT instead of 1.2.0 in the bahir-flink pom > file, I get the following compiler errors for the ActiveMQ connector: > ERROR] > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[32,33] > error: cannot find symbol > [ERROR] symbol: class ForkableFlinkMiniCluster > location: package org.apache.flink.test.util > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[54,19] > error: cannot find symbol > [ERROR] symbol: class ForkableFlinkMiniCluster > location: class ActiveMQConnectorITCase > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[66,20] > error: cannot find symbol > [ERROR] symbol: class ForkableFlinkMiniCluster > location: class ActiveMQConnectorITCase > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[216,22] > error: method snapshotState in class > MessageAcknowledgingSourceBasecannot be applied to given types; > [ERROR] required: FunctionSnapshotContext > found: long,long > reason: actual and formal argument lists differ in length > where Type,UId are type-variables: > Type extends Object declared in class MessageAcknowledgingSourceBase > UId extends Object declared in class MessageAcknowledgingSourceBase > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[229,4] > error: ActiveMQConnectorITCase.TestSourceContext is not abstract and does > not override abstract method markAsTemporarilyIdle() in SourceContext > Excluding ActiveMQ from compilation leads to the next issue in the > Akka-Connector: > [ERROR] > /home/markus/src/bahir-flink/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java:[210,17] > error: DummySourceContext is not abstract and does not override abstract > method markAsTemporarilyIdle() in SourceContext > [INFO] 1 error -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-89) New API for subscribing from a list of MQTT topics and Return tuple of <Topic,Message> as output
[ https://issues.apache.org/jira/browse/BAHIR-89?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885046#comment-15885046 ] ASF GitHub Bot commented on BAHIR-89: - GitHub user anntinutj opened a pull request: https://github.com/apache/bahir/pull/37 [BAHIR-89] Multi topic support API for streaming MQTT New API which accept array of MQTT topics as input and return Tuple2as output. Help to consume from multiple MQTT with efficient user of resources. You can merge this pull request into a Git repository by running: $ git pull https://github.com/anntinutj/bahir bahir-89 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir/pull/37.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #37 commit f87c0eb2a4747068d909b911e702f5506a8c3627 Author: Anntinu Date: 2017-02-27T02:07:07Z [BAHIR-89] Multi topic support API for streaming MQTT New API which accept array of MQTT topics as input and return Tuple2 as output. Help to consume from multiple MQTT with efficient user of resources. > New API for subscribing from a list of MQTT topics and Return tuple of > as output > > > Key: BAHIR-89 > URL: https://issues.apache.org/jira/browse/BAHIR-89 > Project: Bahir > Issue Type: New Feature > Components: Spark Streaming Connectors >Affects Versions: Not Applicable > Environment: Spark Streaming MQTT Connector >Reporter: Anntinu Josy > Labels: MQTT, SPARK > Fix For: Not Applicable > > Original Estimate: 168h > Remaining Estimate: 168h > > I am working in IoT Project. As part of MQTT-Kafka bridge program development > I used Bahir. I feel that it will be a good feature to prove a new API to > support a list of MQTT topic as input and output as a tuple of Message>. This will be useful to reduce resource usage in case of multiple > topic subscription. I had developed this feature a like to integrate -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-93) Wrong version in README.md for several Apache Flink extensions
[ https://issues.apache.org/jira/browse/BAHIR-93?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888657#comment-15888657 ] ASF GitHub Bot commented on BAHIR-93: - Github user rmetzger commented on the issue: https://github.com/apache/bahir-flink/pull/10 Thank you for opening a pull request. The change looks good +1 to merge. > Wrong version in README.md for several Apache Flink extensions > -- > > Key: BAHIR-93 > URL: https://issues.apache.org/jira/browse/BAHIR-93 > Project: Bahir > Issue Type: Bug > Components: Flink Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > > In several Apache Flink extensions (redis, activemq, netty, etc.), the > current version is incorrectly {{1.0}} (missing "SNAPSHOT" tag). > Also, in the akka extension, the version is {{1.0.0-SNAPSHOT}} (From the > looks of https://www.mail-archive.com/dev@bahir.apache.org/msg00448.html, it > seems like the agreed version format is "major-minor" for the {{bahir-flink}} > module). > Therefore, we should uniformly correct them to be {{1.0-SNAPSHOT}}. I think > we should do this before creating the first {{bahir-flink}} release. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-93) Wrong version in README.md for several Apache Flink extensions
[ https://issues.apache.org/jira/browse/BAHIR-93?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888647#comment-15888647 ] ASF GitHub Bot commented on BAHIR-93: - GitHub user tzulitai opened a pull request: https://github.com/apache/bahir-flink/pull/10 [BAHIR-93] Correct all version strings to be 1.0-SNAPSHOT According to the discussion in https://www.mail-archive.com/dev@bahir.apache.org/msg00448.html, we will be following a "major.minor" versioning format for bahir-flink, starting from `1.0`. Therefore, the current state being pre-release of 1.0, the correct version in master should be `1.0-SNAPSHOT`. This PR corrects all version string occurrences in `bahir-flink`, to prepare it for the upcoming 1.0 release. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/bahir-flink BAHIR-93 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir-flink/pull/10.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #10 commit 714d6736f241b7f7c67b4fadd56e40be3fcc4e27 Author: Tzu-Li (Gordon) TaiDate: 2017-02-28T17:38:58Z [BAHIR-93] Correct all version strings to be 1.0-SNAPSHOT > Wrong version in README.md for several Apache Flink extensions > -- > > Key: BAHIR-93 > URL: https://issues.apache.org/jira/browse/BAHIR-93 > Project: Bahir > Issue Type: Bug > Components: Flink Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > > In several Apache Flink extensions (redis, activemq, netty, etc.), the > current version is incorrectly {{1.0}} (missing "SNAPSHOT" tag). > Also, in the akka extension, the version is {{1.0.0-SNAPSHOT}} (From the > looks of https://www.mail-archive.com/dev@bahir.apache.org/msg00448.html, it > seems like the agreed version format is "major-minor" for the {{bahir-flink}} > module). > Therefore, we should uniformly correct them to be {{1.0-SNAPSHOT}}. I think > we should do this before creating the first {{bahir-flink}} release. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-91) Upgrade bahir-flink to support Flink 1.2
[ https://issues.apache.org/jira/browse/BAHIR-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889619#comment-15889619 ] ASF GitHub Bot commented on BAHIR-91: - GitHub user tzulitai opened a pull request: https://github.com/apache/bahir-flink/pull/11 [BAHIR-91] Upgrade Flink version to 1.2.0 This required some Flink state API updates in the tests. We probably should keep an extra eye for these kind of Flink API changes; the source APIs may be stable, but API changes like these can also break code in bahir-flink. Note about the additional `maven-bundle-plugin` in the Redis connector: After the 1.2.0 upgrade, the project had a new dependency to Apacheds JDBM 2.0.0-M2, which packages using the "bundle" type. The plugin helps recognizes the type when building. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/bahir-flink BAHIR-91 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir-flink/pull/11.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #11 commit 9b9be286543fb8c972d48e715a724da261f71d24 Author: Tzu-Li (Gordon) TaiDate: 2017-03-01T06:41:21Z [BAHIR-91] Upgrade Flink version to 1.2.0 > Upgrade bahir-flink to support Flink 1.2 > > > Key: BAHIR-91 > URL: https://issues.apache.org/jira/browse/BAHIR-91 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors >Affects Versions: Flink-0.1 > Environment: Ubuntu Yakkety Yak >Reporter: Markus Müller > Fix For: Flink-0.1 > > > After working around https://issues.apache.org/jira/browse/FLINK-4813 by > specifying Flink version 1.3-SNAPSHOT instead of 1.2.0 in the bahir-flink pom > file, I get the following compiler errors for the ActiveMQ connector: > ERROR] > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[32,33] > error: cannot find symbol > [ERROR] symbol: class ForkableFlinkMiniCluster > location: package org.apache.flink.test.util > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[54,19] > error: cannot find symbol > [ERROR] symbol: class ForkableFlinkMiniCluster > location: class ActiveMQConnectorITCase > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[66,20] > error: cannot find symbol > [ERROR] symbol: class ForkableFlinkMiniCluster > location: class ActiveMQConnectorITCase > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[216,22] > error: method snapshotState in class > MessageAcknowledgingSourceBase cannot be applied to given types; > [ERROR] required: FunctionSnapshotContext > found: long,long > reason: actual and formal argument lists differ in length > where Type,UId are type-variables: > Type extends Object declared in class MessageAcknowledgingSourceBase > UId extends Object declared in class MessageAcknowledgingSourceBase > /home/markus/src/bahir-flink/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java:[229,4] > error: ActiveMQConnectorITCase.TestSourceContext is not abstract and does > not override abstract method markAsTemporarilyIdle() in SourceContext > Excluding ActiveMQ from compilation leads to the next issue in the > Akka-Connector: > [ERROR] > /home/markus/src/bahir-flink/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java:[210,17] > error: DummySourceContext is not abstract and does not override abstract > method markAsTemporarilyIdle() in SourceContext > [INFO] 1 error -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-46) Handle re-delivery of message in MQTT structured streaming source.
[ https://issues.apache.org/jira/browse/BAHIR-46?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434714#comment-15434714 ] ASF GitHub Bot commented on BAHIR-46: - Github user ScrapCodes commented on the issue: https://github.com/apache/bahir/pull/19 Yes, I am awaiting the fixing of SPARK-16963. > Handle re-delivery of message in MQTT structured streaming source. > -- > > Key: BAHIR-46 > URL: https://issues.apache.org/jira/browse/BAHIR-46 > Project: Bahir > Issue Type: Bug >Reporter: Prashant Sharma > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-59) Improve ActiveMQ Connector
[ https://issues.apache.org/jira/browse/BAHIR-59?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15468441#comment-15468441 ] ASF GitHub Bot commented on BAHIR-59: - GitHub user rmetzger opened a pull request: https://github.com/apache/bahir-flink/pull/4 [BAHIR-59][AMQ] Fix constructor visibility and error messages You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/bahir-flink bahir59 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir-flink/pull/4.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4 commit c6fef9d709ed64e0c69e8de54804ebfaf9665dad Author: Robert MetzgerDate: 2016-09-06T20:23:26Z [BAHIR-59][AQM] Fix ctor visibility and error messages > Improve ActiveMQ Connector > -- > > Key: BAHIR-59 > URL: https://issues.apache.org/jira/browse/BAHIR-59 > Project: Bahir > Issue Type: Improvement > Components: Flink Streaming Connectors >Affects Versions: Flink-0.1 >Reporter: Robert Metzger > > While trying out the ActiveMQ connector for Flink, I found some issues that I > would like to address using this JIRA. > - Make AMQSource constructor public > - Improve error messages on wrong configuration -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-59) Improve ActiveMQ Connector
[ https://issues.apache.org/jira/browse/BAHIR-59?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15503771#comment-15503771 ] ASF GitHub Bot commented on BAHIR-59: - Github user tzulitai commented on the issue: https://github.com/apache/bahir-flink/pull/4 LGTM, +1 to merge > Improve ActiveMQ Connector > -- > > Key: BAHIR-59 > URL: https://issues.apache.org/jira/browse/BAHIR-59 > Project: Bahir > Issue Type: Improvement > Components: Flink Streaming Connectors >Affects Versions: Flink-0.1 >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: Flink-0.1 > > > While trying out the ActiveMQ connector for Flink, I found some issues that I > would like to address using this JIRA. > - Make AMQSource constructor public > - Improve error messages on wrong configuration -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-59) Improve ActiveMQ Connector
[ https://issues.apache.org/jira/browse/BAHIR-59?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15503961#comment-15503961 ] ASF GitHub Bot commented on BAHIR-59: - Github user lresende commented on the issue: https://github.com/apache/bahir-flink/pull/4 Looks like this has already been merged by @tedyu in a105a7c3455b9851f951506ba91f1472002d323f @rmetzger Could you please close the pr. > Improve ActiveMQ Connector > -- > > Key: BAHIR-59 > URL: https://issues.apache.org/jira/browse/BAHIR-59 > Project: Bahir > Issue Type: Improvement > Components: Flink Streaming Connectors >Affects Versions: Flink-0.1 >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: Flink-0.1 > > > While trying out the ActiveMQ connector for Flink, I found some issues that I > would like to address using this JIRA. > - Make AMQSource constructor public > - Improve error messages on wrong configuration -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-59) Improve ActiveMQ Connector
[ https://issues.apache.org/jira/browse/BAHIR-59?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15506546#comment-15506546 ] ASF GitHub Bot commented on BAHIR-59: - Github user rmetzger closed the pull request at: https://github.com/apache/bahir-flink/pull/4 > Improve ActiveMQ Connector > -- > > Key: BAHIR-59 > URL: https://issues.apache.org/jira/browse/BAHIR-59 > Project: Bahir > Issue Type: Improvement > Components: Flink Streaming Connectors >Affects Versions: Flink-0.1 >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: Flink-0.1 > > > While trying out the ActiveMQ connector for Flink, I found some issues that I > would like to address using this JIRA. > - Make AMQSource constructor public > - Improve error messages on wrong configuration -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-51) Add additional MQTT options/parameters to MQTTInputDStream and MqttStreamSource
[ https://issues.apache.org/jira/browse/BAHIR-51?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15431191#comment-15431191 ] ASF GitHub Bot commented on BAHIR-51: - Github user asfgit closed the pull request at: https://github.com/apache/bahir/pull/22 > Add additional MQTT options/parameters to MQTTInputDStream and > MqttStreamSource > --- > > Key: BAHIR-51 > URL: https://issues.apache.org/jira/browse/BAHIR-51 > Project: Bahir > Issue Type: Improvement > Components: Spark Streaming Connectors, Spark Structured Streaming > Connectors >Reporter: Sebastian Woehrl > > We are using Spark Streaming in the automotive IOT environment with MQTT as > the data source. > For security reasons our MQTT broker is protected by username and password > (as is default for these kind of environments). At the moment it is not > possible to set username/password when creating an MQTT Receiver > (MQTTInputDStream or MqttStreamSource). > I propose that the MQTTInputDStream and MqttStreamSource be extended to > optionally allow setting the following mqtt options / parameters: > * username > * password > * clientId > * cleanSession > * QoS > * connectionTimeout > * keepAliveInterval > * mqttVersion > If this proposal meets your approval I am willing to create a pull request > with these changes implemented. > *Note*: The part for MqttInputDStream has been split off into BAHIR-53. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-71) invalid jira link of bahir-flink README.md
[ https://issues.apache.org/jira/browse/BAHIR-71?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15601233#comment-15601233 ] ASF GitHub Bot commented on BAHIR-71: - GitHub user shijinkui opened a pull request: https://github.com/apache/bahir-flink/pull/6 [BAHIR-71] invalid jira link of bahir-flink README.md [JIRA issue](issues.apache.org/jira/browse/BAHIR) to [JIRA issue](http://issues.apache.org/jira/browse/BAHIR) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/bahir-flink doc-link-invalid Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir-flink/pull/6.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6 commit 564f82820e05c1eb763f94e073cbca9bcaf95c55 Author: shijinkuiDate: 2016-10-24T07:25:17Z [BAHIR-71] invalid jira link of bahir-flink README.md > invalid jira link of bahir-flink README.md > --- > > Key: BAHIR-71 > URL: https://issues.apache.org/jira/browse/BAHIR-71 > Project: Bahir > Issue Type: Improvement > Components: Website >Reporter: shijinkui > > We ask contributors to first open a [JIRA > issue](issues.apache.org/jira/browse/BAHIR) describing the planned changes. > Please make sure to put "Flink Streaming Connector" in the "Component/s" > field. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-71) invalid jira link of bahir-flink README.md
[ https://issues.apache.org/jira/browse/BAHIR-71?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15601880#comment-15601880 ] ASF GitHub Bot commented on BAHIR-71: - Github user asfgit closed the pull request at: https://github.com/apache/bahir-flink/pull/6 > invalid jira link of bahir-flink README.md > --- > > Key: BAHIR-71 > URL: https://issues.apache.org/jira/browse/BAHIR-71 > Project: Bahir > Issue Type: Improvement > Components: Website >Reporter: shijinkui > > We ask contributors to first open a [JIRA > issue](issues.apache.org/jira/browse/BAHIR) describing the planned changes. > Please make sure to put "Flink Streaming Connector" in the "Component/s" > field. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-73) [bahir-flink] flink-streaming-akka source connector
[ https://issues.apache.org/jira/browse/BAHIR-73?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689040#comment-15689040 ] ASF GitHub Bot commented on BAHIR-73: - Github user sbcd90 commented on the issue: https://github.com/apache/bahir-flink/pull/8 Hello @rmetzger , Thanks for the code review. I made the code changes to accomodate the following points: - Add README - support user-specified configuration for receiver actor system. - to allows acks or not, can be configured by user. - code refactored to move the `receiverActorSystem.awaitTermination()` call & remove the `byte[]` data handling. > [bahir-flink] flink-streaming-akka source connector > --- > > Key: BAHIR-73 > URL: https://issues.apache.org/jira/browse/BAHIR-73 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors >Reporter: Subhobrata Dey > Fix For: Flink-0.1 > > > Hello, > This issue is created to propose the idea of having a flink-streaming-akka > source connector. > The source connector can be used to receive messages from an Akka feeder or > publisher actor & these messages can then be processed using flink streaming. > The source connector has the following features. > 1. It can supports several different message formats like iterable data, > bytes array & data with timestamp. > 2. It can send back acknowledgements to the feeder actor. > Thanks & regards, > Subhobrata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-73) [bahir-flink] flink-streaming-akka source connector
[ https://issues.apache.org/jira/browse/BAHIR-73?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692262#comment-15692262 ] ASF GitHub Bot commented on BAHIR-73: - Github user sbcd90 commented on the issue: https://github.com/apache/bahir-flink/pull/8 Hello @rmetzger, Thanks for the code review. I did the code refactoring to accomodate the following changes: - waitlock removed - configuration moved to constructor - the byte[] scenario removed I did some basic tests in a non-cluster mode using this. ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); AkkaApp app = new AkkaApp(); Config feederActorConfig = ConfigFactory.parseFile(new File(app.getFeederConfigFile())); ActorSystem feederActorSystem = ActorSystem.create("feederActorSystem", feederActorConfig); feederActorSystem.actorOf(Props.create(FeederActor.class), "feederActor"); Config config = ConfigFactory.parseFile(new File(app.getReceiverConfigFile())); String feederActorUrl = "akka.tcp://feederActorSystem@127.0.0.1:5156/user/feederActor"; DataStream source = env.addSource(new AkkaSource("receiverActor", feederActorUrl, config)); source.print(); env.execute(); ``` I would look to do a test run in cluster mode to see the throughput. > [bahir-flink] flink-streaming-akka source connector > --- > > Key: BAHIR-73 > URL: https://issues.apache.org/jira/browse/BAHIR-73 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors >Reporter: Subhobrata Dey > Fix For: Flink-0.1 > > > Hello, > This issue is created to propose the idea of having a flink-streaming-akka > source connector. > The source connector can be used to receive messages from an Akka feeder or > publisher actor & these messages can then be processed using flink streaming. > The source connector has the following features. > 1. It can supports several different message formats like iterable data, > bytes array & data with timestamp. > 2. It can send back acknowledgements to the feeder actor. > Thanks & regards, > Subhobrata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-73) [bahir-flink] flink-streaming-akka source connector
[ https://issues.apache.org/jira/browse/BAHIR-73?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689679#comment-15689679 ] ASF GitHub Bot commented on BAHIR-73: - Github user rmetzger commented on the issue: https://github.com/apache/bahir-flink/pull/8 Thank you for addressing the issues so quickly. I found some more issues, once they are addressed we can merge the code. Did you do a test run on a cluster of the code? It would be interesting to see the throughput you can achieve with the source. > [bahir-flink] flink-streaming-akka source connector > --- > > Key: BAHIR-73 > URL: https://issues.apache.org/jira/browse/BAHIR-73 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors >Reporter: Subhobrata Dey > Fix For: Flink-0.1 > > > Hello, > This issue is created to propose the idea of having a flink-streaming-akka > source connector. > The source connector can be used to receive messages from an Akka feeder or > publisher actor & these messages can then be processed using flink streaming. > The source connector has the following features. > 1. It can supports several different message formats like iterable data, > bytes array & data with timestamp. > 2. It can send back acknowledgements to the feeder actor. > Thanks & regards, > Subhobrata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-72) support netty: pushed tcp/http connector
[ https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689713#comment-15689713 ] ASF GitHub Bot commented on BAHIR-72: - Github user rmetzger commented on the issue: https://github.com/apache/bahir-flink/pull/7 Thanks a lot for the updates. I was traveling the last few weeks, that's why I didn't have time to review your changes earlier. I'll merge the PR. > support netty: pushed tcp/http connector > > > Key: BAHIR-72 > URL: https://issues.apache.org/jira/browse/BAHIR-72 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors >Reporter: shijinkui > > Also discuss from flink: https://issues.apache.org/jira/browse/FLINK-4630 > When source stream get start, listen a provided tcp port, receive stream data > from user data source. > This netty tcp source is keepping alive and end-to-end, that is from business > system to flink worker directly. > Such source service is needed in produce indeed. > describe the source in detail below: > 1.source run as a netty tcp and http server > 2.user provide a tcp port, if the port is in used, increace the port > number between 1024 to 65535. Source can parallel. > 3.callback the provided url to report the real port to listen > 4.user push streaming data to netty server, then collect the data to flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-72) support netty: pushed tcp/http connector
[ https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689726#comment-15689726 ] ASF GitHub Bot commented on BAHIR-72: - Github user asfgit closed the pull request at: https://github.com/apache/bahir-flink/pull/7 > support netty: pushed tcp/http connector > > > Key: BAHIR-72 > URL: https://issues.apache.org/jira/browse/BAHIR-72 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors >Reporter: shijinkui > > Also discuss from flink: https://issues.apache.org/jira/browse/FLINK-4630 > When source stream get start, listen a provided tcp port, receive stream data > from user data source. > This netty tcp source is keepping alive and end-to-end, that is from business > system to flink worker directly. > Such source service is needed in produce indeed. > describe the source in detail below: > 1.source run as a netty tcp and http server > 2.user provide a tcp port, if the port is in used, increace the port > number between 1024 to 65535. Source can parallel. > 3.callback the provided url to report the real port to listen > 4.user push streaming data to netty server, then collect the data to flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-73) [bahir-flink] flink-streaming-akka source connector
[ https://issues.apache.org/jira/browse/BAHIR-73?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689670#comment-15689670 ] ASF GitHub Bot commented on BAHIR-73: - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/bahir-flink/pull/8#discussion_r89286832 --- Diff: flink-connector-akka/README.md --- @@ -0,0 +1,45 @@ +# Flink Akka connector + +This connector provides a sink to [Akka](http://akka.io/) source actors in an ActorSystem. +To use this connector, add the following dependency to your project: + + + + org.apache.bahir + flink-connector-akka_2.11 + 1.0.0-SNAPSHOT + + +*Version Compatibility*: This module is compatible with Akka 2.0+. + +## Configuration + +The configurations for the Receiver Actor System in Flink Akka connector can be created using the `Configuration (org.apache.flink.configuration.Configuration)` object in Flink. + +To enable acknowledgements, the custom configuration `akka.remote.auto-ack` can be used. + +The user can set any of the default configuration allowed by Akka as well as custom configuration allowed by the connector. + +A sample configuration can be defined as follows: + +Configuration configuration = new Configuration(); +configuration.setString("akka.loglevel", "INFO"); +configuration.setString("akka.actor.provider", "akka.remote.RemoteActorRefProvider"); +configuration.setString("akka.remote.netty.tcp.hostname", "127.0.0.1"); +configuration.setString("akka.remote.enabled-transports", "[akka.remote.netty.tcp]"); +configuration.setString("akka.remote.netty.tcp.port", "5150"); +configuration.setString("akka.remote.log-sent-messages", "on"); +configuration.setString("akka.remote.log-received-messages", "on"); +configuration.setString("akka.remote.auto-ack", "on"); --- End diff -- How can a user pass the `configuration` to the Akka source? Afaik its not possible because the open(Configuration c) is not really supported in the DataStream API of Flink. > [bahir-flink] flink-streaming-akka source connector > --- > > Key: BAHIR-73 > URL: https://issues.apache.org/jira/browse/BAHIR-73 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors >Reporter: Subhobrata Dey > Fix For: Flink-0.1 > > > Hello, > This issue is created to propose the idea of having a flink-streaming-akka > source connector. > The source connector can be used to receive messages from an Akka feeder or > publisher actor & these messages can then be processed using flink streaming. > The source connector has the following features. > 1. It can supports several different message formats like iterable data, > bytes array & data with timestamp. > 2. It can send back acknowledgements to the feeder actor. > Thanks & regards, > Subhobrata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-73) [bahir-flink] flink-streaming-akka source connector
[ https://issues.apache.org/jira/browse/BAHIR-73?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689669#comment-15689669 ] ASF GitHub Bot commented on BAHIR-73: - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/bahir-flink/pull/8#discussion_r89286046 --- Diff: flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.akka.utils.ReceiverActor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Properties; + +/** + * Implementation of {@link SourceFunction} specialized to read messages + * from Akka actors. + */ +public class AkkaSource extends RichSourceFunction + implements StoppableFunction { + + private static final Logger LOG = LoggerFactory.getLogger(AkkaSource.class); + + private static final long serialVersionUID = 1L; + + // --- Fields set by the constructor + + private final Class classForActor; + + private final String actorName; + + private final String urlOfPublisher; + + // --- Runtime fields + private transient ActorSystem receiverActorSystem; + private transient ActorRef receiverActor; + private transient Object waitLock; + private transient boolean running = true; + + protected transient boolean autoAck; + + /** + * Creates {@link AkkaSource} for Streaming + * + * @param actorName Receiver Actor name + * @param urlOfPublisher tcp url of the publisher or feeder actor + */ + public AkkaSource(String actorName, + String urlOfPublisher) { +super(); +this.classForActor = ReceiverActor.class; +this.actorName = actorName; +this.urlOfPublisher = urlOfPublisher; + } + + @Override + public void open(Configuration parameters) throws Exception { +Properties customProperties = new Properties(); +parameters.addAllToProperties(customProperties); + +waitLock = new Object(); +receiverActorSystem = createDefaultActorSystem(customProperties); + +if (customProperties.containsKey("akka.remote.auto-ack") && + customProperties.getProperty("akka.remote.auto-ack").equals("on")) { + autoAck = true; +} else { + autoAck = false; +} + } + + @Override + public void run(SourceFunction.SourceContext ctx) throws Exception { +LOG.info("Starting the Receiver actor {}", actorName); +receiverActor = receiverActorSystem.actorOf( + Props.create(classForActor, ctx, urlOfPublisher, autoAck), actorName); + +running = true; +LOG.info("Started the Receiver actor {} successfully", actorName); +receiverActorSystem.awaitTermination(); + +while (running) { + synchronized (waitLock) { +waitLock.wait(100L); --- End diff -- I don't think the wait lock is needed anymore when doing `awaitTermination()`. > [bahir-flink] flink-streaming-akka source connector > --- > > Key: BAHIR-73 > URL: https://issues.apache.org/jira/browse/BAHIR-73 > Project: Bahir > Issue Type: New Feature >
[jira] [Commented] (BAHIR-73) [bahir-flink] flink-streaming-akka source connector
[ https://issues.apache.org/jira/browse/BAHIR-73?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686042#comment-15686042 ] ASF GitHub Bot commented on BAHIR-73: - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/bahir-flink/pull/8#discussion_r89061655 --- Diff: flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.akka.utils.ReceiverActor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link SourceFunction} specialized to read messages + * from Akka actors. + */ +public class AkkaSource extends RichSourceFunction + implements StoppableFunction { + + private static final Logger LOG = LoggerFactory.getLogger(AkkaSource.class); + + private static final long serialVersionUID = 1L; + + // --- Fields set by the constructor + + private final Class classForActor; + + private final String actorName; + + private final String urlOfPublisher; + + // --- Runtime fields + private transient ActorSystem receiverActorSystem; + private transient ActorRef receiverActor; + private transient Object waitLock; + private transient boolean running = true; + + protected transient boolean autoAck; + + /** + * Creates {@link AkkaSource} for Streaming + * + * @param actorName Receiver Actor name + * @param urlOfPublisher tcp url of the publisher or feeder actor + */ + public AkkaSource(String actorName, + String urlOfPublisher) { +super(); +this.classForActor = ReceiverActor.class; +this.actorName = actorName; +this.urlOfPublisher = urlOfPublisher; + } + + @Override + public void open(Configuration parameters) throws Exception { +waitLock = new Object(); +receiverActorSystem = createDefaultActorSystem(); + +RuntimeContext runtimeContext = getRuntimeContext(); +if (runtimeContext instanceof StreamingRuntimeContext + && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) { + autoAck = false; +} else { + autoAck = true; --- End diff -- Why is the acking dependent on the checkpointing? Maybe it would make sense to allow the user to configure this independently. > [bahir-flink] flink-streaming-akka source connector > --- > > Key: BAHIR-73 > URL: https://issues.apache.org/jira/browse/BAHIR-73 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors >Reporter: Subhobrata Dey > Fix For: Flink-0.1 > > > Hello, > This issue is created to propose the idea of having a flink-streaming-akka > source connector. > The source connector can be used to receive messages from an Akka feeder or > publisher actor & these messages can then be processed using flink streaming. > The source connector has the following features. > 1. It can supports several different message formats like iterable data, > bytes array &
[jira] [Commented] (BAHIR-73) [bahir-flink] flink-streaming-akka source connector
[ https://issues.apache.org/jira/browse/BAHIR-73?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692677#comment-15692677 ] ASF GitHub Bot commented on BAHIR-73: - Github user asfgit closed the pull request at: https://github.com/apache/bahir-flink/pull/8 > [bahir-flink] flink-streaming-akka source connector > --- > > Key: BAHIR-73 > URL: https://issues.apache.org/jira/browse/BAHIR-73 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors >Reporter: Subhobrata Dey > Fix For: Flink-0.1 > > > Hello, > This issue is created to propose the idea of having a flink-streaming-akka > source connector. > The source connector can be used to receive messages from an Akka feeder or > publisher actor & these messages can then be processed using flink streaming. > The source connector has the following features. > 1. It can supports several different message formats like iterable data, > bytes array & data with timestamp. > 2. It can send back acknowledgements to the feeder actor. > Thanks & regards, > Subhobrata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-73) [bahir-flink] flink-streaming-akka source connector
[ https://issues.apache.org/jira/browse/BAHIR-73?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15685996#comment-15685996 ] ASF GitHub Bot commented on BAHIR-73: - Github user rmetzger commented on the issue: https://github.com/apache/bahir-flink/pull/8 Thank you for opening this pull request. I'll review it soon. > [bahir-flink] flink-streaming-akka source connector > --- > > Key: BAHIR-73 > URL: https://issues.apache.org/jira/browse/BAHIR-73 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors >Reporter: Subhobrata Dey > Fix For: Flink-0.1 > > > Hello, > This issue is created to propose the idea of having a flink-streaming-akka > source connector. > The source connector can be used to receive messages from an Akka feeder or > publisher actor & these messages can then be processed using flink streaming. > The source connector has the following features. > 1. It can supports several different message formats like iterable data, > bytes array & data with timestamp. > 2. It can send back acknowledgements to the feeder actor. > Thanks & regards, > Subhobrata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-73) [bahir-flink] flink-streaming-akka source connector
[ https://issues.apache.org/jira/browse/BAHIR-73?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686037#comment-15686037 ] ASF GitHub Bot commented on BAHIR-73: - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/bahir-flink/pull/8#discussion_r89061030 --- Diff: flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka.utils; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.UntypedActor; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; + +import java.util.Iterator; + +/** + * Generalized receiver actor which receives messages + * from the feeder or publisher actor. + */ +public class ReceiverActor extends UntypedActor { + // --- Fields set by the constructor + private final SourceContext ctx; + + private final String urlOfPublisher; + + private final boolean autoAck; + + // --- Runtime fields + private ActorSelection remotePublisher; + + public ReceiverActor(SourceContext ctx, +String urlOfPublisher, +boolean autoAck) { +this.ctx = ctx; +this.urlOfPublisher = urlOfPublisher; +this.autoAck = autoAck; + } + + @Override + public void preStart() throws Exception { +remotePublisher = getContext().actorSelection(urlOfPublisher); +remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf()); + } + + @SuppressWarnings("unchecked") + @Override + public void onReceive(Object message) +throws Exception { +if (message instanceof Iterable) { + collect((Iterable) message); +} else if (message instanceof byte[]) { + byte[] messageBytes = (byte[]) message; + collect(messageBytes); +} else if (message instanceof Tuple2) { + Tuple2
[jira] [Commented] (BAHIR-75) Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15671890#comment-15671890 ] ASF GitHub Bot commented on BAHIR-75: - Github user lresende commented on a diff in the pull request: https://github.com/apache/bahir/pull/27#discussion_r88347239 --- Diff: pom.xml --- @@ -446,6 +447,7 @@ .classpath .project **/dependency-reduced-pom.xml + **/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister --- End diff -- Did we try just adding license file using # character as comment ? It should work and avoid rat exclusion > Initial Code Delivery > - > > Key: BAHIR-75 > URL: https://issues.apache.org/jira/browse/BAHIR-75 > Project: Bahir > Issue Type: Sub-task > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-75) Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15671891#comment-15671891 ] ASF GitHub Bot commented on BAHIR-75: - Github user lresende commented on a diff in the pull request: https://github.com/apache/bahir/pull/27#discussion_r88346116 --- Diff: datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/DefaultSource.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.datasource.webhdfs + +import java.text.SimpleDateFormat +import java.sql.{Timestamp, Date} + +import scala.collection.mutable.HashMap + +import org.apache.spark.sql.types.{DateType, TimestampType} + +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} + +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.StructType +import scala.annotation.switch + +import org.apache.bahir.datasource.webhdfs.util._ +import org.apache.bahir.datasource.webhdfs.csv._ + +/** + * This class contains functions for reading/writing data from/to remote webhdfs server in Spark DataSource + * This function is written in line with the DataSource function in com.databricks.spark.csv. +*/ + + +class DefaultSource + extends RelationProvider + with SchemaRelationProvider + with CreatableRelationProvider + with DataSourceRegister { + + override def shortName() : String = "webhdfs" + + private def checkPath(parameters: Map[String, String]): String = { + parameters.getOrElse("path", sys.error("'path' must be specified ")) + } + + /** + * Creates a new relation for data store in CSV given parameters. + * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header' + */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + createRelation(sqlContext, parameters, null) + } + + /** + * Creates a new relation for data store in CSV given parameters and user supported schema. + * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header' + */ --- End diff -- What is the relation with webHDFS and Csv ? Does it mean that we can only read csv files from the remote webHDFS at this time ? > Initial Code Delivery > - > > Key: BAHIR-75 > URL: https://issues.apache.org/jira/browse/BAHIR-75 > Project: Bahir > Issue Type: Sub-task > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-75) Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15672143#comment-15672143 ] ASF GitHub Bot commented on BAHIR-75: - Github user ckadner commented on a diff in the pull request: https://github.com/apache/bahir/pull/27#discussion_r88360001 --- Diff: pom.xml --- @@ -446,6 +447,7 @@ .classpath .project **/dependency-reduced-pom.xml + **/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister --- End diff -- @lresende even though [java.util.ServiceLoader](http://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html) ignores `#`-comment lines I did not see any examples of it being used in DataSourceRegister files of other projects i.e. Spark does not include license comments in its 3 DataSourceRegister service provider configuration files. What's wrong with excluding it from the RAT license checks? > Initial Code Delivery > - > > Key: BAHIR-75 > URL: https://issues.apache.org/jira/browse/BAHIR-75 > Project: Bahir > Issue Type: Sub-task > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-75) Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15671893#comment-15671893 ] ASF GitHub Bot commented on BAHIR-75: - Github user lresende commented on a diff in the pull request: https://github.com/apache/bahir/pull/27#discussion_r88345454 --- Diff: datasource-webhdfs/pom.xml --- @@ -0,0 +1,83 @@ + + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + +org.apache.bahir +bahir-parent_2.11 +2.1.0-SNAPSHOT +../pom.xml + + + org.apache.bahir + datasource-webhdfs_2.11 + +datasource-webhdfs + + jar + Apache Bahir - Spark DataSource WebHDFS + http://bahir.apache.org/ + + + + org.scalaj + scalaj-http_2.11 --- End diff -- Please use _${scala.binary.version} instead of _2.11 > Initial Code Delivery > - > > Key: BAHIR-75 > URL: https://issues.apache.org/jira/browse/BAHIR-75 > Project: Bahir > Issue Type: Sub-task > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-64) Add test that Akka streaming connector can receive data
[ https://issues.apache.org/jira/browse/BAHIR-64?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15673322#comment-15673322 ] ASF GitHub Bot commented on BAHIR-64: - Github user lresende commented on a diff in the pull request: https://github.com/apache/bahir/pull/24#discussion_r86030626 --- Diff: NOTICE --- @@ -2,4 +2,17 @@ Apache Bahir Copyright (c) 2016 The Apache Software Foundation. This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file +The Apache Software Foundation (http://www.apache.org/). + + +=== --- End diff -- Maybe this comment should just go in the header of the class. > Add test that Akka streaming connector can receive data > --- > > Key: BAHIR-64 > URL: https://issues.apache.org/jira/browse/BAHIR-64 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Christian Kadner >Assignee: Christian Kadner > Labels: test > > Add test cases that verify that the *Akka streaming connector* can receive > streaming data. > See [BAHIR-63|https://issues.apache.org/jira/browse/BAHIR-63] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-75) Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668929#comment-15668929 ] ASF GitHub Bot commented on BAHIR-75: - Github user ckadner commented on the issue: https://github.com/apache/bahir/pull/25 @sourav-mazumder, please close/delete this "place-holder" PR We will continue the work in progress using the PR you just created (#27) > Initial Code Delivery > - > > Key: BAHIR-75 > URL: https://issues.apache.org/jira/browse/BAHIR-75 > Project: Bahir > Issue Type: Sub-task > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-75) Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668778#comment-15668778 ] ASF GitHub Bot commented on BAHIR-75: - GitHub user sourav-mazumder opened a pull request: https://github.com/apache/bahir/pull/27 [BAHIR-75][WIP] Initital code delivery for WebHDFS data source Initial code delivery for webhdfs data source for Bahir. WE are still working on this. Comments are welcome. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sourav-mazumder/bahir spark-webhdfs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir/pull/27.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #27 commit eee70dc6dac8aa1b3b9387aaa136250d340f2b46 Author: Sourav MazumderDate: 2016-11-15T23:34:14Z [BAHIR-75] Initital code delivery for WebHDFS data source > Initial Code Delivery > - > > Key: BAHIR-75 > URL: https://issues.apache.org/jira/browse/BAHIR-75 > Project: Bahir > Issue Type: Sub-task > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-64) Add test that Akka streaming connector can receive data
[ https://issues.apache.org/jira/browse/BAHIR-64?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630617#comment-15630617 ] ASF GitHub Bot commented on BAHIR-64: - Github user ckadner commented on a diff in the pull request: https://github.com/apache/bahir/pull/24#discussion_r86248567 --- Diff: streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala --- @@ -65,6 +65,7 @@ object ActorReceiver { val akkaConf = ConfigFactory.parseString( s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] + |akka.remote.netty.tcp.port = "0" --- End diff -- we set it to `"0"` to have the port chosen automatically for the _Feeder_ actor and the _Receiver_ actor will "pick it up" when it subscribes to the _Feeder_ actor [Akka Remoting docs](http://doc.akka.io/docs/akka/2.3.11/scala/remoting.html#Preparing_your_ActorSystem_for_Remoting) > Add test that Akka streaming connector can receive data > --- > > Key: BAHIR-64 > URL: https://issues.apache.org/jira/browse/BAHIR-64 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Christian Kadner >Assignee: Christian Kadner > Labels: test > > Add test cases that verify that the *Akka streaming connector* can receive > streaming data. > See [BAHIR-63|https://issues.apache.org/jira/browse/BAHIR-63] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-67) WebHDFS Data Source for Spark SQL
[ https://issues.apache.org/jira/browse/BAHIR-67?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15631140#comment-15631140 ] ASF GitHub Bot commented on BAHIR-67: - Github user ckadner commented on the issue: https://github.com/apache/bahir/pull/25 @sourav-mazumder please add some description to this PR which could include outstanding issues and tag the PR title with the JIRA key and add the tag `[WIP]` while work on this PR is ongoing... i.e. the title could look like `"[BAHIR-67][WIP] Create WebHDFS data source for Spark"` -- Thanks > WebHDFS Data Source for Spark SQL > - > > Key: BAHIR-67 > URL: https://issues.apache.org/jira/browse/BAHIR-67 > Project: Bahir > Issue Type: Improvement > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 336h > Remaining Estimate: 336h > > Ability to read/write data in Spark from/to HDFS of a remote Hadoop Cluster > In today's world of Analytics many use cases need capability to access data > from multiple remote data sources in Spark. Though Spark has great > integration with local Hadoop cluster it lacks heavily on capability for > connecting to a remote Hadoop cluster. However, in reality not all data of > enterprises in Hadoop and running Spark Cluster locally with Hadoop Cluster > is not always a solution. > In this improvement we propose to create a connector for accessing data (read > and write) from/to HDFS of a remote Hadoop cluster from Spark using webhdfs > api. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-84) Build log flooded with test log messages
[ https://issues.apache.org/jira/browse/BAHIR-84?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15820522#comment-15820522 ] ASF GitHub Bot commented on BAHIR-84: - Github user ApacheBahir commented on the issue: https://github.com/apache/bahir/pull/33 Refer to this link for build results (access rights to CI server needed): http://169.45.79.58:8080/job/Apache%20Bahir%20-%20Pull%20Request%20Builder/30/ > Build log flooded with test log messages > > > Key: BAHIR-84 > URL: https://issues.apache.org/jira/browse/BAHIR-84 > Project: Bahir > Issue Type: Test > Components: Spark Structured Streaming Connectors > Environment: Mac OS X >Reporter: Christian Kadner >Assignee: Christian Kadner >Priority: Minor > > The maven build log/console gets flooded with INFO messages from > {{org.apache.parquet.hadoop.*}} during the {{test}} phase of module > {{sql-streaming-mqtt}} . This makes it hard to find actual problems and test > results especially when the log messages intersect with build and test status > messages throwing off line breaks etc. > *Excerpt of build log:* > {code:title=$ mvn clean package} > ... > Discovery completed in 293 milliseconds. > Run starting. Expected test count is: 7 > BasicMQTTSourceSuite: > - basic usage > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: > Compression: SNAPPY > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: > Compression: SNAPPY > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: > Compression: SNAPPY > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: > Compression: SNAPPY > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet dictionary page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Dictionary is on > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet dictionary page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > ... > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet dictionary page size to 1048576 > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Dictionary is on > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Validation is o- Send and receive 100 messages. > - no server up > - params not provided. > - Recovering offset from the last processed offset. !!! IGNORED !!! > StressTestMQTTSource: > - Send and receive messages of size 250MB. !!! IGNORED !!! > LocalMessageStoreSuite: > - serialize and deserialize > - Store and retreive > - Max offset stored > MQTTStreamSourceSuite: > Run completed in 20 seconds, 622 milliseconds. > Total number of tests run: 7 > Suites: completed 5, aborted 0 > Tests: succeeded 7, failed 0, canceled 0, ignored 2, pending 0 > All tests passed. > ff > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Writer version is: PARQUET_1_0 > Jan 11, 2017 11:06:03 PM INFO: > org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem > columnStore to file. allocated memory: 48 > Jan 11, 2017 11:06:03 PM INFO: > org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem > columnStore to file. allocated memory: 48 > Jan 11, 2017 11:06:03 PM INFO: > org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 109B for [value] > BINARY: 1 values, 34B raw, 36B comp, 1 pages, encodings: [RLE, PLAIN, > BIT_PACKED] > Jan 11, 2017 11:06:03 PM INFO: > org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 59B for > [timestamp] INT96: 1
[jira] [Commented] (BAHIR-84) Build log flooded with test log messages
[ https://issues.apache.org/jira/browse/BAHIR-84?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15820521#comment-15820521 ] ASF GitHub Bot commented on BAHIR-84: - Github user ApacheBahir commented on the issue: https://github.com/apache/bahir/pull/33 Build successful > Build log flooded with test log messages > > > Key: BAHIR-84 > URL: https://issues.apache.org/jira/browse/BAHIR-84 > Project: Bahir > Issue Type: Test > Components: Spark Structured Streaming Connectors > Environment: Mac OS X >Reporter: Christian Kadner >Assignee: Christian Kadner >Priority: Minor > > The maven build log/console gets flooded with INFO messages from > {{org.apache.parquet.hadoop.*}} during the {{test}} phase of module > {{sql-streaming-mqtt}} . This makes it hard to find actual problems and test > results especially when the log messages intersect with build and test status > messages throwing off line breaks etc. > *Excerpt of build log:* > {code:title=$ mvn clean package} > ... > Discovery completed in 293 milliseconds. > Run starting. Expected test count is: 7 > BasicMQTTSourceSuite: > - basic usage > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: > Compression: SNAPPY > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: > Compression: SNAPPY > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: > Compression: SNAPPY > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: > Compression: SNAPPY > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet dictionary page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Dictionary is on > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet dictionary page size to 1048576 > Jan 11, 2017 11:05:54 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > ... > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet block size to 134217728 > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet page size to 1048576 > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Parquet dictionary page size to 1048576 > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Dictionary is on > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Validation is o- Send and receive 100 messages. > - no server up > - params not provided. > - Recovering offset from the last processed offset. !!! IGNORED !!! > StressTestMQTTSource: > - Send and receive messages of size 250MB. !!! IGNORED !!! > LocalMessageStoreSuite: > - serialize and deserialize > - Store and retreive > - Max offset stored > MQTTStreamSourceSuite: > Run completed in 20 seconds, 622 milliseconds. > Total number of tests run: 7 > Suites: completed 5, aborted 0 > Tests: succeeded 7, failed 0, canceled 0, ignored 2, pending 0 > All tests passed. > ff > Jan 11, 2017 11:06:03 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: > Writer version is: PARQUET_1_0 > Jan 11, 2017 11:06:03 PM INFO: > org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem > columnStore to file. allocated memory: 48 > Jan 11, 2017 11:06:03 PM INFO: > org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem > columnStore to file. allocated memory: 48 > Jan 11, 2017 11:06:03 PM INFO: > org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 109B for [value] > BINARY: 1 values, 34B raw, 36B comp, 1 pages, encodings: [RLE, PLAIN, > BIT_PACKED] > Jan 11, 2017 11:06:03 PM INFO: > org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 59B for > [timestamp] INT96: 1 values, 8B raw, 10B comp, 1 pages, encodings: [RLE, > PLAIN_DICTIONARY, BIT_PACKED], dic { 1 entries,... > {code} -- This message was
[jira] [Commented] (BAHIR-75) WebHDFS: Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803454#comment-15803454 ] ASF GitHub Bot commented on BAHIR-75: - Github user ckadner commented on the issue: https://github.com/apache/bahir/pull/28 retest this please > WebHDFS: Initial Code Delivery > -- > > Key: BAHIR-75 > URL: https://issues.apache.org/jira/browse/BAHIR-75 > Project: Bahir > Issue Type: Sub-task > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-75) WebHDFS: Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803459#comment-15803459 ] ASF GitHub Bot commented on BAHIR-75: - Github user akchinSTC commented on the issue: https://github.com/apache/bahir/pull/28 Build finished. > WebHDFS: Initial Code Delivery > -- > > Key: BAHIR-75 > URL: https://issues.apache.org/jira/browse/BAHIR-75 > Project: Bahir > Issue Type: Sub-task > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-75) WebHDFS: Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803435#comment-15803435 ] ASF GitHub Bot commented on BAHIR-75: - Github user akchinSTC commented on the issue: https://github.com/apache/bahir/pull/28 Build finished. > WebHDFS: Initial Code Delivery > -- > > Key: BAHIR-75 > URL: https://issues.apache.org/jira/browse/BAHIR-75 > Project: Bahir > Issue Type: Sub-task > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-75) WebHDFS: Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803430#comment-15803430 ] ASF GitHub Bot commented on BAHIR-75: - Github user ckadner commented on the issue: https://github.com/apache/bahir/pull/28 Test this Jenkins > WebHDFS: Initial Code Delivery > -- > > Key: BAHIR-75 > URL: https://issues.apache.org/jira/browse/BAHIR-75 > Project: Bahir > Issue Type: Sub-task > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-75) WebHDFS: Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15773942#comment-15773942 ] ASF GitHub Bot commented on BAHIR-75: - GitHub user sourav-mazumder opened a pull request: https://github.com/apache/bahir/pull/28 [BAHIR-75] [WIP] Remote HDFS connector for Apache Spark using webhdfs protocol with support for Apache Knox This component implements Hadoop File System (org.apache,hadoop.fs.FileSystem) to provide an alternate mechanism (instead of using 'webhdfs or swebhdfs' file uri) for Spark to access (read/write) files from/to a remote Hadoop cluster using webhdfs protocol. This component takes care of the following requirements related to accessing files (read/write) from/to a remote enterprise Hadoop cluster from a remote Spark cluster- 1. Support for Apache Knox 2. Support for passing user id/password different from the user who has started the spark-shell/spark-submit process. 3. Support for SSL in three modes - Ignoring certificate validation, certificate validation through user supplied trust store path and password, and automatic creation of certificate using openssl and keytool. 4. Optimized way of getting data from remote HDFS where each connection will get only its part of data. This component is not a full fledged implementation of Hadoop File System. It implements only those interfaces those are needed by Spark for reading data form remote HDFS and writing back the data to remote HDFS. **Example Usage -** Step 1: Set Hadoop configuration to define a custom uri of your choice and specify the class name BahirWebHdfsFileSystem. For example - `sc.hadoopConfiguration.set("fs.remoteHdfs.impl","org.apache.bahir.datasource.webhdfs.BahirWebHdfsFileSystem")`. You can use any name (apart form the standard uris like hdfs, webhdfs, file etc. already used by Spark) instead of 'remoteHdfs'. However subsequently while loading the file (or writing a file) the same should be used. Step 2: Set the user name and password as below - `val userid = "biadmin"` `val password = "password"` `val userCred = userid + ":" + password` `sc.hadoopConfiguration.set("usrCredStr",userCred)` Step 3 : Now you are ready to load any file from the remote Hadoop cluster using Spark's standard Dataframe/DataSet APIs. For example - `val filePath = "biginsights/spark-enablement/datasets/NewYorkCity311Service/311_Service_Requests_from_2010_to_Present.csv"` `val srvr = "ehaasp-577-mastermanager.bi.services.bluemix.net:8443/gateway/default/webhdfs/v1"` `val knoxPath = "gateway/default"` `val webhdfsPath = "webhdfs/v1"` `val prtcl = "remoteHdfs"` `val fullPath = s"$prtcl://$srvr/$knoxPath/$webhdfsPath/$filePath"` `val df = spark.read.format("csv").option("header", "true").load(fullPath)` Please not the use of 'gateway/default' and 'webhdfs/v1' used for specifying the server specific information in the path. The first one is specific to Apache Knox and the second one is specific for webhdfs protocol. Step 4; To write data back to remote HDFS following steps can be used (using standard Dataframe writer of spark) `val filePathWrite = "biginsights/spark-enablement/datasets/NewYorkCity311Service/Result.csv"` `val srvr = "ehaasp-577-mastermanager.bi.services.bluemix.net:8443"` `val knoxPath = "gateway/default"` `val webhdfsPath = "webhdfs/v1"` `val prtcl = "remoteHdfs"` `val fullPath = s"$prtcl://$srvr/$knoxPath/$webhdfsPath/$filePathWrite"` `df.write.format("csv").option("header", "true").save(filePathw)` **We are still working on followings -** - Unit Testing - Code cleanup - Examples showcasing various configuration parameters - API documentation You can merge this pull request into a Git repository by running: $ git pull https://github.com/sourav-mazumder/bahir BAHIR-75-WebHdfsFileSystem Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir/pull/28.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #28 commit eee70dc6dac8aa1b3b9387aaa136250d340f2b46 Author: Sourav MazumderDate: 2016-11-15T23:34:14Z [BAHIR-75] Initital code delivery for WebHDFS data source commit c2d53fdd55eee69120cf00dc17869786945ed93a Author: Christian Kadner Date: 2016-11-16T01:00:39Z [BAHIR-75] - fix RAT excludes for DataSourceRegister commit af805e3226cbc31b2a9993aa635795a3d1fdd8c7 Author: Christian Kadner Date: 2016-11-16T01:27:08Z [BAHIR-75] - minor README fixes commit 78ff29c8885534935d43a911af9c27f667725989 Author: Christian
[jira] [Commented] (BAHIR-75) WebHDFS: Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15773971#comment-15773971 ] ASF GitHub Bot commented on BAHIR-75: - Github user sourav-mazumder closed the pull request at: https://github.com/apache/bahir/pull/27 > WebHDFS: Initial Code Delivery > -- > > Key: BAHIR-75 > URL: https://issues.apache.org/jira/browse/BAHIR-75 > Project: Bahir > Issue Type: Sub-task > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-75) WebHDFS: Initial Code Delivery
[ https://issues.apache.org/jira/browse/BAHIR-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15773972#comment-15773972 ] ASF GitHub Bot commented on BAHIR-75: - Github user sourav-mazumder commented on the issue: https://github.com/apache/bahir/pull/27 This is now addressed in PR # 28. The new approach uses implementation of Hadoop File System which can be accessed from Spark using custom file system uri by setting SparkContext.hadoopConfiguration. This new approach can be used in a generic way for any type of file format (csv, json, et.) without any file format specific code. > WebHDFS: Initial Code Delivery > -- > > Key: BAHIR-75 > URL: https://issues.apache.org/jira/browse/BAHIR-75 > Project: Bahir > Issue Type: Sub-task > Components: Spark SQL Data Sources >Reporter: Sourav Mazumder > Original Estimate: 504h > Remaining Estimate: 504h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-97) Akka as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-97?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944689#comment-15944689 ] ASF GitHub Bot commented on BAHIR-97: - Github user ApacheBahir commented on the issue: https://github.com/apache/bahir/pull/38 Refer to this link for build results (access rights to CI server needed): http://169.45.79.58:8080/job/bahir_spark_pr_builder/38/ > Akka as a streaming source for SQL Streaming. > - > > Key: BAHIR-97 > URL: https://issues.apache.org/jira/browse/BAHIR-97 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: Spark-2.1.0 >Reporter: Subhobrata Dey > > Hello, > This issue is created to propose the addition of Akka compatible streaming > source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-97) Akka as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-97?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944648#comment-15944648 ] ASF GitHub Bot commented on BAHIR-97: - Github user ckadner commented on the issue: https://github.com/apache/bahir/pull/38 @sbcd90 -- can you change your test suite to chose the `akka.remote.netty.tcp.port` dynamically? > Akka as a streaming source for SQL Streaming. > - > > Key: BAHIR-97 > URL: https://issues.apache.org/jira/browse/BAHIR-97 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: Spark-2.1.0 >Reporter: Subhobrata Dey > > Hello, > This issue is created to propose the addition of Akka compatible streaming > source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-97) Akka as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-97?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949610#comment-15949610 ] ASF GitHub Bot commented on BAHIR-97: - Github user sbcd90 commented on the issue: https://github.com/apache/bahir/pull/38 Hi @ckadner , the test cases are skipped..any reason for that? > Akka as a streaming source for SQL Streaming. > - > > Key: BAHIR-97 > URL: https://issues.apache.org/jira/browse/BAHIR-97 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: Spark-2.1.0 >Reporter: Subhobrata Dey > > Hello, > This issue is created to propose the addition of Akka compatible streaming > source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-97) Akka as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-97?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950074#comment-15950074 ] ASF GitHub Bot commented on BAHIR-97: - Github user sbcd90 commented on the issue: https://github.com/apache/bahir/pull/38 retest this please > Akka as a streaming source for SQL Streaming. > - > > Key: BAHIR-97 > URL: https://issues.apache.org/jira/browse/BAHIR-97 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: Spark-2.1.0 >Reporter: Subhobrata Dey > > Hello, > This issue is created to propose the addition of Akka compatible streaming > source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-97) Akka as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-97?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950075#comment-15950075 ] ASF GitHub Bot commented on BAHIR-97: - Github user sbcd90 commented on the issue: https://github.com/apache/bahir/pull/38 Hi @ckadner , Thanks for your reply. I have added `log4j.properties` in the test sources folder. > Akka as a streaming source for SQL Streaming. > - > > Key: BAHIR-97 > URL: https://issues.apache.org/jira/browse/BAHIR-97 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: Spark-2.1.0 >Reporter: Subhobrata Dey > > Hello, > This issue is created to propose the addition of Akka compatible streaming > source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-97) Akka as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-97?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950081#comment-15950081 ] ASF GitHub Bot commented on BAHIR-97: - Github user ApacheBahir commented on the issue: https://github.com/apache/bahir/pull/38 Refer to this link for build results (access rights to CI server needed): http://169.45.79.58:8080/job/bahir_spark_pr_builder/42/ > Akka as a streaming source for SQL Streaming. > - > > Key: BAHIR-97 > URL: https://issues.apache.org/jira/browse/BAHIR-97 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: Spark-2.1.0 >Reporter: Subhobrata Dey > > Hello, > This issue is created to propose the addition of Akka compatible streaming > source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-97) Akka as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-97?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950080#comment-15950080 ] ASF GitHub Bot commented on BAHIR-97: - Github user ApacheBahir commented on the issue: https://github.com/apache/bahir/pull/38 Build successful > Akka as a streaming source for SQL Streaming. > - > > Key: BAHIR-97 > URL: https://issues.apache.org/jira/browse/BAHIR-97 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: Spark-2.1.0 >Reporter: Subhobrata Dey > > Hello, > This issue is created to propose the addition of Akka compatible streaming > source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-101) Add SparkSQL for Cloudant
[ https://issues.apache.org/jira/browse/BAHIR-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949985#comment-15949985 ] ASF GitHub Bot commented on BAHIR-101: -- Github user lresende commented on the issue: https://github.com/apache/bahir/pull/39 ok to test > Add SparkSQL for Cloudant > -- > > Key: BAHIR-101 > URL: https://issues.apache.org/jira/browse/BAHIR-101 > Project: Bahir > Issue Type: Improvement >Reporter: Yang Lei > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-101) Add SparkSQL for Cloudant
[ https://issues.apache.org/jira/browse/BAHIR-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949991#comment-15949991 ] ASF GitHub Bot commented on BAHIR-101: -- Github user ApacheBahir commented on the issue: https://github.com/apache/bahir/pull/39 Refer to this link for build results (access rights to CI server needed): http://169.45.79.58:8080/job/bahir_spark_pr_builder/41/ > Add SparkSQL for Cloudant > -- > > Key: BAHIR-101 > URL: https://issues.apache.org/jira/browse/BAHIR-101 > Project: Bahir > Issue Type: Improvement >Reporter: Yang Lei > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-101) Add SparkSQL for Cloudant
[ https://issues.apache.org/jira/browse/BAHIR-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949990#comment-15949990 ] ASF GitHub Bot commented on BAHIR-101: -- Github user ApacheBahir commented on the issue: https://github.com/apache/bahir/pull/39 Build successful > Add SparkSQL for Cloudant > -- > > Key: BAHIR-101 > URL: https://issues.apache.org/jira/browse/BAHIR-101 > Project: Bahir > Issue Type: Improvement >Reporter: Yang Lei > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-101) Add SparkSQL for Cloudant
[ https://issues.apache.org/jira/browse/BAHIR-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947940#comment-15947940 ] ASF GitHub Bot commented on BAHIR-101: -- GitHub user yanglei99 opened a pull request: https://github.com/apache/bahir/pull/39 [BAHIR-101] Initial code of SparkSQL for Cloudant You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanglei99/bahir sql-cloudant Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir/pull/39.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #39 commit 914c55cbe2732dc34e549082a4d67832c5c502c5 Author: Yang LeiDate: 2017-03-29T21:32:51Z [BAHIR-101] Initial code of SparkSQL for Cloudant > Add SparkSQL for Cloudant > -- > > Key: BAHIR-101 > URL: https://issues.apache.org/jira/browse/BAHIR-101 > Project: Bahir > Issue Type: Improvement >Reporter: Yang Lei > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BAHIR-101) Add SparkSQL for Cloudant
[ https://issues.apache.org/jira/browse/BAHIR-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947941#comment-15947941 ] ASF GitHub Bot commented on BAHIR-101: -- Github user ApacheBahir commented on the issue: https://github.com/apache/bahir/pull/39 Can one of the admins verify this patch? > Add SparkSQL for Cloudant > -- > > Key: BAHIR-101 > URL: https://issues.apache.org/jira/browse/BAHIR-101 > Project: Bahir > Issue Type: Improvement >Reporter: Yang Lei > -- This message was sent by Atlassian JIRA (v6.3.15#6346)