Repository: cassandra
Updated Branches:
  refs/heads/trunk e0524c099 -> 63945228f


Use the correct IP/Port for Streaming when localAddress is left unbound

patch by Dinesh Joshi; reviewed by jasobrown for CASSANDRA-14389


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/63945228
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/63945228
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/63945228

Branch: refs/heads/trunk
Commit: 63945228fc0fabea2cfcf1f1b4d0a29ed3964107
Parents: e0524c0
Author: Dinesh A. Joshi <dinesh.jo...@apple.com>
Authored: Thu Apr 19 17:23:19 2018 -0700
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Sun Apr 22 16:35:47 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/dht/RangeStreamer.java |  4 +-
 .../apache/cassandra/net/MessagingService.java  | 10 +++
 .../cassandra/net/async/NettyFactory.java       |  1 -
 .../repair/AsymmetricLocalSyncTask.java         |  4 +-
 .../apache/cassandra/repair/LocalSyncTask.java  |  9 ++-
 .../cassandra/repair/StreamingRepairTask.java   | 12 ++--
 .../cassandra/service/StorageService.java       | 12 ++--
 .../cassandra/streaming/StreamCoordinator.java  | 21 +++---
 .../apache/cassandra/streaming/StreamPlan.java  | 39 ++----------
 .../cassandra/streaming/StreamResultFuture.java | 14 ++--
 .../cassandra/streaming/StreamSession.java      | 42 ++++++++----
 .../streaming/StreamingMessageSender.java       |  3 +
 .../async/NettyStreamingMessageSender.java      | 12 +++-
 .../streaming/CassandraStreamManagerTest.java   |  1 -
 .../cassandra/dht/StreamStateStoreTest.java     |  4 +-
 .../cassandra/net/MessagingServiceTest.java     | 51 ++++++++++++++-
 .../cassandra/repair/LocalSyncTaskTest.java     |  6 +-
 .../repair/StreamingRepairTaskTest.java         |  6 +-
 .../cassandra/streaming/StreamSessionTest.java  | 67 ++++++++++++++++++++
 .../streaming/StreamTransferTaskTest.java       |  4 +-
 .../streaming/StreamingTransferTest.java        |  2 +-
 .../async/NettyStreamingMessageSenderTest.java  |  2 +-
 .../async/StreamingInboundHandlerTest.java      |  2 +-
 24 files changed, 218 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0e15a8d..4cdd8ba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Use the correct IP/Port for Streaming when localAddress is left unbound 
(CASSANDAR-14389)
  * nodetool listsnapshots is missing local system keyspace snapshots 
(CASSANDRA-14381)
  * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402)
  * Rename nodetool --with-port to --print-port to disambiguate from --port 
(CASSANDRA-14392)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java 
b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index dfabac2..110fed6 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IFailureDetector;
@@ -428,7 +427,6 @@ public class RangeStreamer
         {
             String keyspace = entry.getKey();
             InetAddressAndPort source = entry.getValue().getKey();
-            InetAddressAndPort preferred = 
SystemKeyspace.getPreferredIP(source);
             Collection<Range<Token>> ranges = entry.getValue().getValue();
 
             // filter out already streamed ranges
@@ -441,7 +439,7 @@ public class RangeStreamer
             if (logger.isTraceEnabled())
                 logger.trace("{}ing from {} ranges {}", description, source, 
StringUtils.join(ranges, ", "));
             /* Send messages to respective folks to stream data over to me */
-            streamPlan.requestRanges(source, preferred, keyspace, ranges);
+            streamPlan.requestRanges(source, keyspace, ranges);
         }
 
         return streamPlan.execute();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index c6ef986..a590723 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -1594,6 +1594,16 @@ public final class MessagingService implements 
MessagingServiceMBean
                                                    
bounds.left.getPartitioner().getClass().getName()));
     }
 
+    /**
+     * This method is used to determine the preferred IP & Port of a peer 
using the
+     * {@link OutboundMessagingPool} and SystemKeyspace.
+     */
+    public InetAddressAndPort getPreferredRemoteAddr(InetAddressAndPort to)
+    {
+        OutboundMessagingPool pool = channelManagers.get(to);
+        return pool != null ? pool.getPreferredRemoteAddr() : 
SystemKeyspace.getPreferredIP(to);
+    }
+
     private OutboundMessagingPool getMessagingConnection(InetAddressAndPort to)
     {
         OutboundMessagingPool pool = channelManagers.get(to);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java 
b/src/java/org/apache/cassandra/net/async/NettyFactory.java
index 86ed4e7..5bbac45 100644
--- a/src/java/org/apache/cassandra/net/async/NettyFactory.java
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -334,7 +334,6 @@ public final class NettyFactory
                               .option(ChannelOption.TCP_NODELAY, 
params.tcpNoDelay)
                               .option(ChannelOption.WRITE_BUFFER_WATER_MARK, 
params.waterMark)
                               .handler(new OutboundInitializer(params));
-        bootstrap.localAddress(params.connectionId.local().address, 0);
         InetAddressAndPort remoteAddress = 
params.connectionId.connectionAddress();
         bootstrap.remoteAddress(new InetSocketAddress(remoteAddress.address, 
remoteAddress.port));
         return bootstrap;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java 
b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
index 8d58673..2ca524f 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.repair;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -49,7 +48,6 @@ public class AsymmetricLocalSyncTask extends 
AsymmetricSyncTask implements Strea
 
     public void startSync(List<Range<Token>> rangesToFetch)
     {
-        InetAddressAndPort preferred = 
SystemKeyspace.getPreferredIP(fetchFrom);
         StreamPlan plan = new StreamPlan(StreamOperation.REPAIR,
                                          1, false,
                                          pendingRepair,
@@ -57,7 +55,7 @@ public class AsymmetricLocalSyncTask extends 
AsymmetricSyncTask implements Strea
                           .listeners(this)
                           .flushBeforeTransfer(pendingRepair == null)
                           // request ranges from the remote node
-                          .requestRanges(fetchFrom, preferred, desc.keyspace, 
rangesToFetch, desc.columnFamily);
+                          .requestRanges(fetchFrom, desc.keyspace, 
rangesToFetch, desc.columnFamily);
         plan.execute();
 
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java 
b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 3901c75..d7e0387 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -59,16 +59,16 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
     }
 
     @VisibleForTesting
-    StreamPlan createStreamPlan(InetAddressAndPort dst, InetAddressAndPort 
preferred, List<Range<Token>> differences)
+    StreamPlan createStreamPlan(InetAddressAndPort dst, List<Range<Token>> 
differences)
     {
         StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, 
pendingRepair, previewKind)
                           .listeners(this)
                           .flushBeforeTransfer(pendingRepair == null)
-                          .requestRanges(dst, preferred, desc.keyspace, 
differences, desc.columnFamily);  // request ranges from the remote node
+                          .requestRanges(dst, desc.keyspace, differences, 
desc.columnFamily);  // request ranges from the remote node
         if (!pullRepair)
         {
             // send ranges to the remote node if we are not performing a pull 
repair
-            plan.transferRanges(dst, preferred, desc.keyspace, differences, 
desc.columnFamily);
+            plan.transferRanges(dst, desc.keyspace, differences, 
desc.columnFamily);
         }
 
         return plan;
@@ -84,13 +84,12 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
         // We can take anyone of the node as source or destination, however if 
one is localhost, we put at source to avoid a forwarding
         InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : 
r2.endpoint;
-        InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(dst);
 
         String message = String.format("Performing streaming repair of %d 
ranges with %s", differences.size(), dst);
         logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
 
-        createStreamPlan(dst, preferred, differences).execute();
+        createStreamPlan(dst, differences).execute();
     }
 
     public void handleStreamEvent(StreamEvent event)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java 
b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 725e84d..5d2b2ec 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -25,11 +25,9 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncComplete;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -70,21 +68,19 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
 
     public void run()
     {
-        InetAddressAndPort dest = dst;
-        InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(dest);
         logger.info("[streaming task #{}] Performing streaming repair of {} 
ranges with {}", desc.sessionId, ranges.size(), dst);
-        createStreamPlan(dest, preferred).execute();
+        createStreamPlan(dst).execute();
     }
 
     @VisibleForTesting
-    StreamPlan createStreamPlan(InetAddressAndPort dest, InetAddressAndPort 
preferred)
+    StreamPlan createStreamPlan(InetAddressAndPort dest)
     {
         StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, 
pendingRepair, previewKind)
                .listeners(this)
                .flushBeforeTransfer(pendingRepair == null) // sstables are 
isolated at the beginning of an incremental repair session, so flushing isn't 
neccessary
-               .requestRanges(dest, preferred, desc.keyspace, ranges, 
desc.columnFamily); // request ranges from the remote node
+               .requestRanges(dest, desc.keyspace, ranges, desc.columnFamily); 
// request ranges from the remote node
         if (!asymmetric)
-            sp.transferRanges(dest, preferred, desc.keyspace, ranges, 
desc.columnFamily); // send ranges to the remote node
+            sp.transferRanges(dest, desc.keyspace, ranges, desc.columnFamily); 
// send ranges to the remote node
         return sp;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index de5d62b..5ea8c75 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2805,11 +2805,10 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry 
: rangesToFetch.get(keyspaceName))
             {
                 InetAddressAndPort source = entry.getKey();
-                InetAddressAndPort preferred = 
SystemKeyspace.getPreferredIP(source);
                 Collection<Range<Token>> ranges = entry.getValue();
                 if (logger.isDebugEnabled())
                     logger.debug("Requesting from {} ranges {}", source, 
StringUtils.join(ranges, ", "));
-                stream.requestRanges(source, preferred, keyspaceName, ranges);
+                stream.requestRanges(source, keyspaceName, ranges);
             }
         }
         StreamResultFuture future = stream.execute();
@@ -4327,8 +4326,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                     for (InetAddressAndPort address : endpointRanges.keySet())
                     {
                         logger.debug("Will stream range {} of keyspace {} to 
endpoint {}", endpointRanges.get(address), keyspace, address);
-                        InetAddressAndPort preferred = 
SystemKeyspace.getPreferredIP(address);
-                        streamPlan.transferRanges(address, preferred, 
keyspace, endpointRanges.get(address));
+                        streamPlan.transferRanges(address, keyspace, 
endpointRanges.get(address));
                     }
 
                     // stream requests
@@ -4336,8 +4334,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                     for (InetAddressAndPort address : workMap.keySet())
                     {
                         logger.debug("Will request range {} of keyspace {} 
from endpoint {}", workMap.get(address), keyspace, address);
-                        InetAddressAndPort preferred = 
SystemKeyspace.getPreferredIP(address);
-                        streamPlan.requestRanges(address, preferred, keyspace, 
workMap.get(address));
+                        streamPlan.requestRanges(address, keyspace, 
workMap.get(address));
                     }
 
                     logger.debug("Keyspace {}: work map {}.", keyspace, 
workMap);
@@ -5107,10 +5104,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             {
                 List<Range<Token>> ranges = rangesEntry.getValue();
                 InetAddressAndPort newEndpoint = rangesEntry.getKey();
-                InetAddressAndPort preferred = 
SystemKeyspace.getPreferredIP(newEndpoint);
 
                 // TODO each call to transferRanges re-flushes, this is 
potentially a lot of waste
-                streamPlan.transferRanges(newEndpoint, preferred, 
keyspaceName, ranges);
+                streamPlan.transferRanges(newEndpoint, keyspaceName, ranges);
             }
         }
         return streamPlan.execute();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java 
b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 6b92dfe..6ea8e00 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -147,14 +147,14 @@ public class StreamCoordinator
         return new HashSet<>(peerSessions.keySet());
     }
 
-    public synchronized StreamSession 
getOrCreateNextSession(InetAddressAndPort peer, InetAddressAndPort connecting)
+    public synchronized StreamSession 
getOrCreateNextSession(InetAddressAndPort peer)
     {
-        return getOrCreateHostData(peer).getOrCreateNextSession(peer, 
connecting);
+        return getOrCreateHostData(peer).getOrCreateNextSession(peer);
     }
 
-    public synchronized StreamSession 
getOrCreateSessionById(InetAddressAndPort peer, int id, InetAddressAndPort 
connecting)
+    public synchronized StreamSession 
getOrCreateSessionById(InetAddressAndPort peer, int id)
     {
-        return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, 
connecting);
+        return getOrCreateHostData(peer).getOrCreateSessionById(peer, id);
     }
 
     public StreamSession getSessionById(InetAddressAndPort peer, int id)
@@ -193,13 +193,13 @@ public class StreamCoordinator
 
             for (Collection<OutgoingStream> bucket : buckets)
             {
-                StreamSession session = sessionList.getOrCreateNextSession(to, 
to);
+                StreamSession session = sessionList.getOrCreateNextSession(to);
                 session.addTransferStreams(bucket);
             }
         }
         else
         {
-            StreamSession session = sessionList.getOrCreateNextSession(to, to);
+            StreamSession session = sessionList.getOrCreateNextSession(to);
             session.addTransferStreams(streams);
         }
     }
@@ -230,6 +230,7 @@ public class StreamCoordinator
     private HostStreamingData getHostData(InetAddressAndPort peer)
     {
         HostStreamingData data = peerSessions.get(peer);
+
         if (data == null)
             throw new IllegalArgumentException("Unknown peer requested: " + 
peer);
         return data;
@@ -275,12 +276,12 @@ public class StreamCoordinator
             return false;
         }
 
-        public StreamSession getOrCreateNextSession(InetAddressAndPort peer, 
InetAddressAndPort connecting)
+        public StreamSession getOrCreateNextSession(InetAddressAndPort peer)
         {
             // create
             if (streamSessions.size() < connectionsPerHost)
             {
-                StreamSession session = new StreamSession(streamOperation, 
peer, connecting, factory, streamSessions.size(), pendingRepair, previewKind);
+                StreamSession session = new StreamSession(streamOperation, 
peer, factory, streamSessions.size(), pendingRepair, previewKind);
                 streamSessions.put(++lastReturned, session);
                 return session;
             }
@@ -307,12 +308,12 @@ public class StreamCoordinator
             return Collections.unmodifiableCollection(streamSessions.values());
         }
 
-        public StreamSession getOrCreateSessionById(InetAddressAndPort peer, 
int id, InetAddressAndPort connecting)
+        public StreamSession getOrCreateSessionById(InetAddressAndPort peer, 
int id)
         {
             StreamSession session = streamSessions.get(id);
             if (session == null)
             {
-                session = new StreamSession(streamOperation, peer, connecting, 
factory, id, pendingRepair, previewKind);
+                session = new StreamSession(streamOperation, peer, factory, 
id, pendingRepair, previewKind);
                 streamSessions.put(id, session);
             }
             return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java 
b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 98d68ce..b56f165 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -68,29 +68,27 @@ public class StreamPlan
      * Request data in {@code keyspace} and {@code ranges} from specific node.
      *
      * @param from endpoint address to fetch data from.
-     * @param connecting Actual connecting address for the endpoint
      * @param keyspace name of keyspace
      * @param ranges ranges to fetch
      * @return this object for chaining
      */
-    public StreamPlan requestRanges(InetAddressAndPort from, 
InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges)
+    public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, 
Collection<Range<Token>> ranges)
     {
-        return requestRanges(from, connecting, keyspace, ranges, 
EMPTY_COLUMN_FAMILIES);
+        return requestRanges(from, keyspace, ranges, EMPTY_COLUMN_FAMILIES);
     }
 
     /**
      * Request data in {@code columnFamilies} under {@code keyspace} and 
{@code ranges} from specific node.
      *
      * @param from endpoint address to fetch data from.
-     * @param connecting Actual connecting address for the endpoint
      * @param keyspace name of keyspace
      * @param ranges ranges to fetch
      * @param columnFamilies specific column families
      * @return this object for chaining
      */
-    public StreamPlan requestRanges(InetAddressAndPort from, 
InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> 
ranges, String... columnFamilies)
+    public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, 
Collection<Range<Token>> ranges, String... columnFamilies)
     {
-        StreamSession session = coordinator.getOrCreateNextSession(from, 
connecting);
+        StreamSession session = coordinator.getOrCreateNextSession(from);
         session.addStreamRequest(keyspace, ranges, 
Arrays.asList(columnFamilies));
         return this;
     }
@@ -98,40 +96,15 @@ public class StreamPlan
     /**
      * Add transfer task to send data of specific {@code columnFamilies} under 
{@code keyspace} and {@code ranges}.
      *
-     * @see #transferRanges(InetAddressAndPort, InetAddressAndPort, String, 
java.util.Collection, String...)
-     */
-    public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, 
Collection<Range<Token>> ranges, String... columnFamilies)
-    {
-        return transferRanges(to, to, keyspace, ranges, columnFamilies);
-    }
-
-    /**
-     * Add transfer task to send data of specific keyspace and ranges.
-     *
      * @param to endpoint address of receiver
-     * @param connecting Actual connecting address of the endpoint
-     * @param keyspace name of keyspace
-     * @param ranges ranges to send
-     * @return this object for chaining
-     */
-    public StreamPlan transferRanges(InetAddressAndPort to, InetAddressAndPort 
connecting, String keyspace, Collection<Range<Token>> ranges)
-    {
-        return transferRanges(to, connecting, keyspace, ranges, 
EMPTY_COLUMN_FAMILIES);
-    }
-
-    /**
-     * Add transfer task to send data of specific {@code columnFamilies} under 
{@code keyspace} and {@code ranges}.
-     *
-     * @param to endpoint address of receiver
-     * @param connecting Actual connecting address of the endpoint
      * @param keyspace name of keyspace
      * @param ranges ranges to send
      * @param columnFamilies specific column families
      * @return this object for chaining
      */
-    public StreamPlan transferRanges(InetAddressAndPort to, InetAddressAndPort 
connecting, String keyspace, Collection<Range<Token>> ranges, String... 
columnFamilies)
+    public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, 
Collection<Range<Token>> ranges, String... columnFamilies)
     {
-        StreamSession session = coordinator.getOrCreateNextSession(to, 
connecting);
+        StreamSession session = coordinator.getOrCreateNextSession(to);
         session.addTransferRanges(keyspace, ranges, 
Arrays.asList(columnFamilies), flushBeforeTransfer);
         return this;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java 
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 3b11fb6..ef8976d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -112,14 +112,16 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
         StreamResultFuture future = 
StreamManager.instance.getReceivingStream(planId);
         if (future == null)
         {
-            logger.info("[Stream #{} ID#{}] Creating new streaming plan for 
{}", planId, sessionIndex, streamOperation.getDescription());
+            logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} 
from {} channel.remote {} channel.local {} channel.id {}",
+                        planId, sessionIndex, 
streamOperation.getDescription(), from, channel.remoteAddress(), 
channel.localAddress(), channel.id());
 
             // The main reason we create a StreamResultFuture on the receiving 
side is for JMX exposure.
             future = new StreamResultFuture(planId, streamOperation, 
pendingRepair, previewKind);
             StreamManager.instance.registerReceiving(future);
         }
         future.attachConnection(from, sessionIndex, channel);
-        logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", 
planId, sessionIndex, streamOperation.getDescription());
+        logger.info("[Stream #{}, ID#{}] Received streaming plan for {} from 
{} channel.remote {} channel.local {} channel.id {}",
+                    planId, sessionIndex, streamOperation.getDescription(), 
from, channel.remoteAddress(), channel.localAddress(), channel.id());
         return future;
     }
 
@@ -137,13 +139,7 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
 
     private void attachConnection(InetAddressAndPort from, int sessionIndex, 
Channel channel)
     {
-        SocketAddress addr = channel.remoteAddress();
-        //In the case of unit tests, if you use the EmbeddedChannel, 
channel.remoteAddress()
-        //does not return an InetSocketAddress, but an EmbeddedSocketAddress. 
Hence why we need the type check here
-        InetAddress connecting = (addr instanceof InetSocketAddress ? 
((InetSocketAddress) addr).getAddress() : from.address);
-        //Need to turn connecting into a InetAddressAndPort with the correct 
port. I think getting the port from "from"
-        //Will work since we don't actually have ports diverge across network 
interfaces
-        StreamSession session = coordinator.getOrCreateSessionById(from, 
sessionIndex, InetAddressAndPort.getByAddressOverrideDefaults(connecting, 
from.port));
+        StreamSession session = coordinator.getOrCreateSessionById(from, 
sessionIndex);
         session.init(this);
         session.attach(channel);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 42d1d97..c56616e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -21,6 +21,7 @@ import java.net.SocketTimeoutException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
@@ -124,6 +125,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     private static final Logger logger = 
LoggerFactory.getLogger(StreamSession.class);
 
     private final StreamOperation streamOperation;
+
     /**
      * Streaming endpoint.
      *
@@ -131,10 +133,12 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
      */
     public final InetAddressAndPort peer;
 
-    private final int index;
+    /**
+     * Preferred IP Address/Port of the peer; this is the address that will be 
connect to. Can be the same as {@linkplain #peer}.
+     */
+    private final InetAddressAndPort preferredPeerInetAddressAndPort;
 
-    /** Actual connecting address. Can be the same as {@linkplain #peer}. */
-    public final InetAddressAndPort connecting;
+    private final int index;
 
     // should not be null when session is started
     private StreamResultFuture streamResult;
@@ -172,23 +176,33 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
 
     /**
      * Create new streaming session with the peer.
-     * @param streamOperation
-     * @param peer Address of streaming peer
-     * @param connecting Actual connecting address
      */
-    public StreamSession(StreamOperation streamOperation, InetAddressAndPort 
peer, InetAddressAndPort connecting, StreamConnectionFactory factory, int 
index, UUID pendingRepair, PreviewKind previewKind)
+    public StreamSession(StreamOperation streamOperation, InetAddressAndPort 
peer, StreamConnectionFactory factory,
+                         int index, UUID pendingRepair, PreviewKind 
previewKind)
+    {
+        this(streamOperation, peer, factory, index, pendingRepair, 
previewKind, MessagingService.instance()::getPreferredRemoteAddr);
+    }
+
+    @VisibleForTesting
+    public StreamSession(StreamOperation streamOperation, InetAddressAndPort 
peer, StreamConnectionFactory factory,
+                         int index, UUID pendingRepair, PreviewKind 
previewKind,
+                         Function<InetAddressAndPort, InetAddressAndPort> 
preferredIpMapper)
     {
         this.streamOperation = streamOperation;
         this.peer = peer;
-        this.connecting = connecting;
         this.index = index;
+        InetAddressAndPort preferredPeerEndpoint = 
preferredIpMapper.apply(peer);
+        this.preferredPeerInetAddressAndPort = (preferredPeerEndpoint == null) 
? peer : preferredPeerEndpoint;
 
         OutboundConnectionIdentifier id = 
OutboundConnectionIdentifier.stream(InetAddressAndPort.getByAddressOverrideDefaults(FBUtilities.getJustLocalAddress(),
 0),
-                                                                              
InetAddressAndPort.getByAddressOverrideDefaults(connecting.address, 
MessagingService.instance().portFor(connecting)));
+                                                                              
preferredPeerInetAddressAndPort);
+
         this.messageSender = new NettyStreamingMessageSender(this, id, 
factory, StreamMessage.CURRENT_VERSION, previewKind.isPreview());
-        this.metrics = StreamingMetrics.get(connecting);
+        this.metrics = StreamingMetrics.get(preferredPeerInetAddressAndPort);
         this.pendingRepair = pendingRepair;
         this.previewKind = previewKind;
+
+        logger.debug("Creating stream session peer={} 
preferredPeerInetAddressAndPort={}", peer, preferredPeerInetAddressAndPort);
     }
 
     public UUID planId()
@@ -267,7 +281,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         {
             logger.info("[Stream #{}] Starting streaming to {}{}", planId(),
                                                                    peer,
-                                                                   
peer.equals(connecting) ? "" : " through " + connecting);
+                                                                   
peer.equals(preferredPeerInetAddressAndPort) ? "" : " through " + 
preferredPeerInetAddressAndPort);
             messageSender.initialize();
             onInitializationComplete();
         }
@@ -515,7 +529,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
             logger.error("[Stream #{}] Did not receive response from peer {}{} 
for {} secs. Is peer down? " +
                          "If not, maybe try increasing 
streaming_keep_alive_period_in_secs.", planId(),
                          peer.getHostAddress(true),
-                         peer.equals(connecting) ? "" : " through " + 
connecting.getHostAddress(true),
+                         peer.equals(preferredPeerInetAddressAndPort) ? "" : " 
through " + preferredPeerInetAddressAndPort.getHostAddress(true),
                          2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(),
                          e);
         }
@@ -523,7 +537,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         {
             logger.error("[Stream #{}] Streaming error occurred on session 
with peer {}{}", planId(),
                          peer.getHostAddress(true),
-                         peer.equals(connecting) ? "" : " through " + 
connecting.getHostAddress(true),
+                         peer.equals(preferredPeerInetAddressAndPort) ? "" : " 
through " + preferredPeerInetAddressAndPort.getHostAddress(true),
                          e);
         }
     }
@@ -677,7 +691,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         List<StreamSummary> transferSummaries = Lists.newArrayList();
         for (StreamTask transfer : transfers.values())
             transferSummaries.add(transfer.getSummary());
-        return new SessionInfo(peer, index, connecting, receivingSummaries, 
transferSummaries, state);
+        return new SessionInfo(peer, index, preferredPeerInetAddressAndPort, 
receivingSummaries, transferSummaries, state);
     }
 
     public synchronized void taskCompleted(StreamReceiveTask completedTask)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java 
b/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java
index 9562981..accf554 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.streaming;
 
 import java.io.IOException;
 
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
 import org.apache.cassandra.streaming.messages.StreamMessage;
 
 public interface StreamingMessageSender
@@ -28,6 +29,8 @@ public interface StreamingMessageSender
 
     void sendMessage(StreamMessage message) throws IOException;
 
+    OutboundConnectionIdentifier getConnectionId();
+
     boolean connected();
 
     void close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
 
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
index bbc451d..1bcb013 100644
--- 
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
+++ 
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -135,7 +135,7 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
     }
 
     @Override
-    public void initialize() throws IOException
+    public void initialize()
     {
         StreamInitMessage message = new 
StreamInitMessage(FBUtilities.getBroadcastAddressAndPort(),
                                                           
session.sessionIndex(),
@@ -184,6 +184,7 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
         ChannelPipeline pipeline = channel.pipeline();
         pipeline.addLast(NettyFactory.instance.streamingGroup, 
NettyFactory.INBOUND_STREAM_HANDLER_NAME, new 
StreamingInboundHandler(connectionId.remote(), protocolVersion, session));
         channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
+        logger.debug("Creating channel id {} local {} remote {}", 
channel.id(), channel.localAddress(), channel.remoteAddress());
         return channel;
     }
 
@@ -495,7 +496,8 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
     {
         closed = true;
         logger.debug("{} Closing stream connection channels on {}", 
createLogTag(session, null), connectionId);
-        channelKeepAlives.stream().map(scheduledFuture -> 
scheduledFuture.cancel(false));
+        for (ScheduledFuture<?> future : channelKeepAlives)
+            future.cancel(false);
         channelKeepAlives.clear();
 
         List<Future<Void>> futures = new 
ArrayList<>(threadToChannelMap.size());
@@ -508,4 +510,10 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
         if (controlMessageChannel != null)
             controlMessageChannel.close();
     }
+
+    @Override
+    public OutboundConnectionIdentifier getConnectionId()
+    {
+        return connectionId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java 
b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
index 8497e71..80e4bfb 100644
--- 
a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
+++ 
b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
@@ -96,7 +96,6 @@ public class CassandraStreamManagerTest
         {
             return new StreamSession(StreamOperation.REPAIR,
                                      InetAddressAndPort.getByName("127.0.0.1"),
-                                     InetAddressAndPort.getByName("127.0.0.2"),
                                      connectionFactory,
                                      0,
                                      pendingRepair,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java 
b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index 8f0a407..bf71c09 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -52,7 +52,7 @@ public class StreamStateStoreTest
         Range<Token> range = new Range<>(factory.fromString("0"), 
factory.fromString("100"));
 
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
-        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, 
local, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
+        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, 
local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
         session.addStreamRequest("keyspace1", Collections.singleton(range), 
Collections.singleton("cf"));
 
         StreamStateStore store = new StreamStateStore();
@@ -73,7 +73,7 @@ public class StreamStateStoreTest
 
         // add different range within the same keyspace
         Range<Token> range2 = new Range<>(factory.fromString("100"), 
factory.fromString("200"));
-        session = new StreamSession(StreamOperation.BOOTSTRAP, local, local, 
new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
+        session = new StreamSession(StreamOperation.BOOTSTRAP, local, new 
DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
         session.addStreamRequest("keyspace1", Collections.singleton(range2), 
Collections.singleton("cf"));
         session.state(StreamSession.State.COMPLETE);
         store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java 
b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index c3ebe32..b56cd62 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -309,7 +309,7 @@ public class MessagingServiceTest
 
     private static void addDCLatency(long sentAt, long nowTime) throws 
IOException
     {
-        MessageIn.deriveConstructionTime(InetAddressAndPort.getLocalHost(), 
(int)sentAt, nowTime);
+        MessageIn.deriveConstructionTime(InetAddressAndPort.getLocalHost(), 
(int) sentAt, nowTime);
     }
 
     public static class MockBackPressureStrategy implements 
BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState>
@@ -422,6 +422,7 @@ public class MessagingServiceTest
     /**
      * Make sure that if internode authenticatino fails for an outbound 
connection that all the code that relies
      * on getting the connection pool handles the null return
+     *
      * @throws Exception
      */
     @Test
@@ -660,4 +661,52 @@ public class MessagingServiceTest
             Assert.assertEquals(0, messagingService.serverChannels.size());
         }
     }
+
+
+    @Test
+    public void getPreferredRemoteAddrUsesPrivateIp() throws 
UnknownHostException
+    {
+        MessagingService ms = MessagingService.instance();
+        InetAddressAndPort local = 
InetAddressAndPort.getByNameOverrideDefaults("127.0.0.4", 7000);
+        InetAddressAndPort remote = 
InetAddressAndPort.getByNameOverrideDefaults("127.0.0.151", 7000);
+        InetAddressAndPort privateIp = 
InetAddressAndPort.getByName("127.0.0.6");
+
+        OutboundMessagingPool pool = new OutboundMessagingPool(privateIp, 
local, null,
+                                                               new 
MockBackPressureStrategy(null).newState(remote),
+                                                               
ALLOW_NOTHING_AUTHENTICATOR);
+        ms.channelManagers.put(remote, pool);
+
+        Assert.assertEquals(privateIp, ms.getPreferredRemoteAddr(remote));
+    }
+
+    @Test
+    public void getPreferredRemoteAddrUsesPreferredIp() throws 
UnknownHostException
+    {
+        MessagingService ms = MessagingService.instance();
+        InetAddressAndPort remote = 
InetAddressAndPort.getByNameOverrideDefaults("127.0.0.115", 7000);
+
+        InetAddressAndPort preferredIp = 
InetAddressAndPort.getByName("127.0.0.16");
+        SystemKeyspace.updatePreferredIP(remote, preferredIp);
+
+        Assert.assertEquals(preferredIp, ms.getPreferredRemoteAddr(remote));
+    }
+
+    @Test
+    public void getPreferredRemoteAddrUsesPrivateIpOverridesPreferredIp() 
throws UnknownHostException
+    {
+        MessagingService ms = MessagingService.instance();
+        InetAddressAndPort local = 
InetAddressAndPort.getByNameOverrideDefaults("127.0.0.4", 7000);
+        InetAddressAndPort remote = 
InetAddressAndPort.getByNameOverrideDefaults("127.0.0.105", 7000);
+        InetAddressAndPort privateIp = 
InetAddressAndPort.getByName("127.0.0.6");
+
+        OutboundMessagingPool pool = new OutboundMessagingPool(privateIp, 
local, null,
+                                                               new 
MockBackPressureStrategy(null).newState(remote),
+                                                               
ALLOW_NOTHING_AUTHENTICATOR);
+        ms.channelManagers.put(remote, pool);
+
+        InetAddressAndPort preferredIp = 
InetAddressAndPort.getByName("127.0.0.16");
+        SystemKeyspace.updatePreferredIP(remote, preferredIp);
+
+        Assert.assertEquals(privateIp, ms.getPreferredRemoteAddr(remote));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java 
b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 95046bd..802a673 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -61,7 +61,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
     public static ColumnFamilyStore cfs;
 
     @BeforeClass
-    public static void defineSchema() throws Exception
+    public static void defineSchema()
     {
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
@@ -148,7 +148,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
 
         LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
NO_PENDING_REPAIR, false, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, 
Lists.newArrayList(RANGE1));
+        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, 
Lists.newArrayList(RANGE1));
 
         assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
         assertTrue(plan.getFlushBeforeTransfer());
@@ -165,7 +165,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
 
         LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
desc.parentSessionId, false, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, 
Lists.newArrayList(RANGE1));
+        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, 
Lists.newArrayList(RANGE1));
 
         assertEquals(desc.parentSessionId, plan.getPendingRepair());
         assertFalse(plan.getFlushBeforeTransfer());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java 
b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
index f0ff0e0..b845e93 100644
--- a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
@@ -60,7 +60,7 @@ public class StreamingRepairTaskTest extends 
AbstractRepairTest
     }
 
     @Test
-    public void incrementalStreamPlan() throws Exception
+    public void incrementalStreamPlan()
     {
         UUID sessionID = registerSession(cfs, true, true);
         ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
@@ -69,7 +69,7 @@ public class StreamingRepairTaskTest extends 
AbstractRepairTest
         SyncRequest request = new SyncRequest(desc, PARTICIPANT1, 
PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE);
         StreamingRepairTask task = new StreamingRepairTask(desc, 
request.initiator, request.src, request.dst, request.ranges, desc.sessionId, 
PreviewKind.NONE, false);
 
-        StreamPlan plan = task.createStreamPlan(request.src, request.dst);
+        StreamPlan plan = task.createStreamPlan(request.dst);
         Assert.assertFalse(plan.getFlushBeforeTransfer());
     }
 
@@ -82,7 +82,7 @@ public class StreamingRepairTaskTest extends 
AbstractRepairTest
         SyncRequest request = new SyncRequest(desc, PARTICIPANT1, 
PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE);
         StreamingRepairTask task = new StreamingRepairTask(desc, 
request.initiator, request.src, request.dst, request.ranges, null, 
PreviewKind.NONE, false);
 
-        StreamPlan plan = task.createStreamPlan(request.src, request.dst);
+        StreamPlan plan = task.createStreamPlan(request.dst);
         Assert.assertTrue(plan.getFlushBeforeTransfer());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
new file mode 100644
index 0000000..7ea09ea
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingServiceTest;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamSessionTest
+{
+    @BeforeClass
+    public static void beforeClass() throws UnknownHostException
+    {
+        DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setBackPressureStrategy(new 
MessagingServiceTest.MockBackPressureStrategy(Collections.emptyMap()));
+        
DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.3"));
+    }
+
+    @Test
+    public void testStreamSessionUsesCorrectRemoteIp_Succeeds() throws 
UnknownHostException
+    {
+        InetAddressAndPort localAddr = 
InetAddressAndPort.getByName("127.0.0.1:7000");
+        InetAddressAndPort preferredAddr = 
InetAddressAndPort.getByName("127.0.0.2:7000");
+        StreamSession streamSession = new 
StreamSession(StreamOperation.BOOTSTRAP, localAddr,
+                          new DefaultConnectionFactory(), 0, 
UUID.randomUUID(), PreviewKind.ALL,
+                          inetAddressAndPort -> preferredAddr);
+
+        assertEquals(preferredAddr, 
streamSession.getMessageSender().getConnectionId().connectionAddress());
+    }
+
+    @Test
+    public void testStreamSessionUsesCorrectRemoteIpNullMapper_Succeeds() 
throws UnknownHostException
+    {
+        InetAddressAndPort localAddr = 
InetAddressAndPort.getByName("127.0.0.1:7000");
+
+        StreamSession streamSession = new 
StreamSession(StreamOperation.BOOTSTRAP, localAddr,
+                          new DefaultConnectionFactory(), 0, 
UUID.randomUUID(), PreviewKind.ALL, (peer) -> null);
+
+        assertEquals(localAddr, 
streamSession.getMessageSender().getConnectionId().connectionAddress());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 45c917a..63dcfba 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -76,7 +76,7 @@ public class StreamTransferTaskTest
     public void testScheduleTimeout() throws Exception
     {
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
-        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, 
peer, peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, 
UUID.randomUUID(), PreviewKind.ALL);
+        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, 
peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, 
UUID.randomUUID(), PreviewKind.ALL);
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
         // create two sstables
@@ -124,7 +124,7 @@ public class StreamTransferTaskTest
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
         StreamCoordinator streamCoordinator = new 
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), 
false, null, PreviewKind.NONE);
         StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), 
StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), 
streamCoordinator);
-        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, 
peer, peer, null, 0, null, PreviewKind.NONE);
+        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, 
peer, null, 0, null, PreviewKind.NONE);
         session.init(future);
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 575200a..2143903 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -144,7 +144,7 @@ public class StreamingTransferTest
         ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), 
p.getMinimumToken()));
 
         StreamResultFuture futureResult = new StreamPlan(StreamOperation.OTHER)
-                                                  .requestRanges(LOCAL, LOCAL, 
KEYSPACE2, ranges)
+                                                  .requestRanges(LOCAL, 
KEYSPACE2, ranges)
                                                   .execute();
 
         UUID planId = futureResult.planId;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
 
b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
index fd22a65..52f097a 100644
--- 
a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
@@ -63,7 +63,7 @@ public class NettyStreamingMessageSenderTest
         channel = new EmbeddedChannel();
         
channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
         UUID pendingRepair = UUID.randomUUID();
-        session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR, 
REMOTE_ADDR, (connectionId, protocolVersion) -> null, 0, pendingRepair, 
PreviewKind.ALL);
+        session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR, 
(connectionId, protocolVersion) -> null, 0, pendingRepair, PreviewKind.ALL);
         StreamResultFuture future = StreamResultFuture.initReceivingSide(0, 
UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, pendingRepair, 
session.getPreviewKind());
         session.init(future);
         sender = session.getMessageSender();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
 
b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
index 78a7879..0a13596 100644
--- 
a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
@@ -132,7 +132,7 @@ public class StreamingInboundHandlerTest
 
     private StreamSession createSession(SessionIdentifier sid)
     {
-        return new StreamSession(StreamOperation.BOOTSTRAP, sid.from, 
sid.from, (connectionId, protocolVersion) -> null, sid.sessionIndex, 
UUID.randomUUID(), PreviewKind.ALL);
+        return new StreamSession(StreamOperation.BOOTSTRAP, sid.from, 
(connectionId, protocolVersion) -> null, sid.sessionIndex, UUID.randomUUID(), 
PreviewKind.ALL);
     }
 
     @Test (expected = IllegalStateException.class)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to