Kyle, Another thought here, that I don't see mentioned yet is how NiFi's site-to-site works. Because the Spark Receiver does not know whether the NiFi you're talking to is a standalone instance or a cluster, it first will ask for a list of all NiFi nodes. Assuming that you've got a standalone instance running, it responds with whatever it believes the hostname is.
It's possible that what you're seeing here is that NiFi responds and says "here's the hostname you can connect to," which is not localhost but rather whatever is configured in your OS. You can explicitly override this and tell NiFi which hostname to use by setting the "nifi.remote.input.socket.host" property in conf/nifi.properties. So I would try setting that explicitly to "localhost" or to whatever hostname you want to communicate over. If you change that property, though, you'll have to restart NiFi in order for the change to take effect. Thanks -Mark > On Feb 22, 2016, at 1:20 PM, Bryan Bende <[email protected]> wrote: > > Hi Kyle, > > It seems like the stack trace is suggesting that Spark is trying to download > dependencies from the like that references Executor.updateDependencies: > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L391 > > <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L391> > > Any chance you are behind some kind of firewall preventing this? > > I'm not that familiar with Spark streaming, but I also noticed in one of the > tutorials that it did something like this: > > spark.driver.extraClassPath > /opt/spark-receiver/nifi-spark-receiver-0.4.1.jar:/opt/spark-receiver/nifi-site-to-site-client-0.4.1.jar:/opt/nifi-1.1.1.0-12/lib/nifi-api-1.1.1.0-12.jar:/opt/nifi-1.1.1.0-12/lib/bootstrap/nifi-utils-1.1.1.0-12.jar:/opt/nifi-1.1.1.0-12/work/nar/framework/nifi-framework-nar-1.1.1.0-12.nar-unpacked/META-INF/bundled-dependencies/nifi-client-dto-1.1.1.0-12.jar > > Which I would think means it wouldn't have to go out and download the NiFi > dependencies if it is being provided on the class path, but again not really > sure. > > -Bryan > > > On Mon, Feb 22, 2016 at 1:09 PM, Kyle Burke <[email protected] > <mailto:[email protected]>> wrote: > Joe, > I’m not sure what to do with Bryan’s comment. The spark code I’m running > has no problem reading from a Kafka receiver. I only get the error when > trying to read from a Nifi receiver. When I create a Nifi flow that reads > from the same kafka stream and sends the data to our outport port I get the > issue. > > Respectfully, > > > Kyle Burke | Data Science Engineer > IgnitionOne - Marketing Technology. Simplified. > Office: 1545 Peachtree St NE, Suite 500 | Atlanta, GA | 30309 > Direct: 404.961.3918 <tel:404.961.3918> > > > > > > > > > > On 2/22/16, 1:00 PM, "Joe Witt" <[email protected] > <mailto:[email protected]>> wrote: > > >Kyle, > > > >Did you get a chance to look into what Bryan mentioned? He made a > >great point in that the stacktrace doesn't seem to have any > >relationship to NiFi or NiFi's site-to-site code. > > > >Thanks > >Joe > > > >On Mon, Feb 22, 2016 at 12:58 PM, Kyle Burke <[email protected] > ><mailto:[email protected]>> wrote: > >> Telnet leads me to believe the port is open. (I upgrade to 0.5.0 today in > >> hopes that it will help but no luck) > >> > >> From Telnet: > >> > >> 12:50:11 [~/Dev/nifi/nifi-0.5.0] $ telnet localhost 8080 > >> > >> Trying ::1... > >> > >> Connected to localhost. > >> > >> Escape character is '^]’. > >> > >> > >> Respectfully, > >> > >> Kyle Burke | Data Science Engineer > >> IgnitionOne - Marketing Technology. Simplified. > >> Office: 1545 Peachtree St NE, Suite 500 | Atlanta, GA | 30309 > >> Direct: 404.961.3918 > >> > >> > >> From: Joe Witt > >> Reply-To: "[email protected] <mailto:[email protected]>" > >> Date: Saturday, February 20, 2016 at 5:16 PM > >> To: "[email protected] <mailto:[email protected]>" > >> Subject: Re: Connecting Spark to Nifi 0.4.0 > >> > >> Kyle > >> > >> Can you try connecting to that nifi port using telnet and see if you are > >> able? > >> > >> Use the same host and port as you are in your spark job. > >> > >> Thanks > >> Joe > >> > >> On Feb 20, 2016 4:55 PM, "Kyle Burke" <[email protected] > >> <mailto:[email protected]>> wrote: > >>> > >>> All, > >>> I’m attempting to connect Spark to Nifi but I’m getting a “connect > >>> timed out” error when spark tries to pull records from the input port. I > >>> don’t understand why I”m getting the issue because nifi and spark are both > >>> running on my local laptop. Any suggestions about how to get around the > >>> issue? > >>> > >>> It appears that nifi is listening on the port because I see the following > >>> when running the lsof command: > >>> > >>> java 31455 kyle.burke 1054u IPv4 0x1024ddd67a640091 0t0 TCP > >>> *:9099 (LISTEN) > >>> > >>> > >>> I’ve been following the instructions give in these two articles: > >>> https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark > >>> <https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark> > >>> > >>> https://community.hortonworks.com/articles/12708/nifi-feeding-data-to-spark-streaming.html > >>> > >>> <https://community.hortonworks.com/articles/12708/nifi-feeding-data-to-spark-streaming.html> > >>> > >>> Here is how I have my nifi.properties setting: > >>> > >>> # Site to Site properties > >>> > >>> nifi.remote.input.socket.host= > >>> > >>> nifi.remote.input.socket.port=9099 > >>> > >>> nifi.remote.input.secure=false > >>> > >>> > >>> Below is the full error stack: > >>> > >>> 16/02/20 16:34:45 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID > >>> 0) > >>> > >>> java.net.SocketTimeoutException: connect timed out > >>> > >>> at java.net.PlainSocketImpl.socketConnect(Native Method) > >>> > >>> at > >>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) > >>> > >>> at > >>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) > >>> > >>> at > >>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) > >>> > >>> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) > >>> > >>> at java.net.Socket.connect(Socket.java:589) > >>> > >>> at sun.net.NetworkClient.doConnect(NetworkClient.java:175) > >>> > >>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) > >>> > >>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) > >>> > >>> at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) > >>> > >>> at sun.net.www.http.HttpClient.New(HttpClient.java:308) > >>> > >>> at sun.net.www.http.HttpClient.New(HttpClient.java:326) > >>> > >>> at > >>> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1168) > >>> > >>> at > >>> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1104) > >>> > >>> at > >>> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:998) > >>> > >>> at > >>> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:932) > >>> > >>> at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:555) > >>> > >>> at org.apache.spark.util.Utils$.fetchFile(Utils.scala:369) > >>> > >>> at > >>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:405) > >>> > >>> at > >>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:397) > >>> > >>> at > >>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > >>> > >>> at > >>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > >>> > >>> at > >>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > >>> > >>> at > >>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > >>> > >>> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > >>> > >>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > >>> > >>> at > >>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > >>> > >>> at > >>> org.apache.spark.executor.Executor.org > >>> <http://org.apache.spark.executor.executor.org/>$apache$spark$executor$Executor$$updateDependencies(Executor.scala:397) > >>> > >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193) > >>> > >>> at > >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > >>> > >>> at > >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > >>> > >>> at java.lang.Thread.run(Thread.java:745) > >>> > >>> > >>> Respectfully, > >>> > >>> Kyle Burke | Data Science Engineer > >>> IgnitionOne - Marketing Technology. Simplified. >
