[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-09 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 13: Verified+1


--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 13
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Impala Public Jenkins
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Thu, 09 Nov 2017 20:05:07 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-09 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has submitted this change and it was merged. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents avoids the possibility that a thread is stuck in the RPC code
for extended amount of time without checking for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, the number of service threads equals the
number of logical cores. The service threads are shared across all queries so
the RPC handler should avoid blocking as much as possible. In thrift RPC
implementation, we make a thrift thread handling a TransmitData() RPC to block
for extended period of time when the receiver is not yet created when the call
arrives. In KRPC implementation, we store TransmitData() or EndDataStream()
requests which arrive before the receiver is ready in a per-receiver early
sender list stored in KrpcDataStreamMgr. These RPC calls will be processed
and responded to when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a queue for deferred processing.
The stashed RPC requests will not be responded to until they are processed
so as to exert back pressure to the senders. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Reviewed-on: http://gerrit.cloudera.org:8080/8023
Reviewed-by: Michael Ho 
Tested-by: Impala Public Jenkins
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
33 files changed, 3,155 insertions(+), 184 deletions(-)

Approvals:
  Michael 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-09 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 13:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/8023/13/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/13/be/src/runtime/krpc-data-stream-recvr.cc@163
PS13, Line 163: condition_variable_any
these should be changed to impala ConditionVariables now, but you can do that 
as a follow on



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 13
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Impala Public Jenkins
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Thu, 09 Nov 2017 18:24:52 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-09 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 13:

Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/1454/


--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 13
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Impala Public Jenkins
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Thu, 09 Nov 2017 16:44:09 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-09 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 13: Code-Review+2

Retested the new PS. Carry+2.


--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 13
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Thu, 09 Nov 2017 16:35:40 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-08 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 13:

(5 comments)

Rebased. Removed a scoped timer which is racy, removed unneeded 
WARN_UNUSED_RESULTS and fixed some clang-tidy errors.

http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc@408
PS10, Line 408: // Dequeues the deferred batch an
> not needed.
Done


http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc@425
PS10, Line 425: deferred_rpcs_.push(move(ctx));
> can be done without holding the lock.
Done


http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc@562
PS10, Line 562: // Add all batches to the same queu
There is a subtle race condition here. receiver may be closed already at this 
point so we need to do something similar to CANCEL_SAFE_SCOPED_TIMER(). 
However, the race may still happen if that macro is not used under the sender 
queue's lock. Removed from this patch for now and it will be added back in a 
follow on patch which introduces more diagnostics information.


http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc@569
PS10, Line 569: sender_queues_[use_sender_id]->De
Same comment as AddBatch().


http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/row-batch.h
File be/src/runtime/row-batch.h:

http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/row-batch.h@145
PS10, Line 145:
> typo.
Done



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 13
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Thu, 09 Nov 2017 00:42:11 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-08 Thread Michael Ho (Code Review)
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/8023

to look at the new patch set (#13).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents avoids the possibility that a thread is stuck in the RPC code
for extended amount of time without checking for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, the number of service threads equals the
number of logical cores. The service threads are shared across all queries so
the RPC handler should avoid blocking as much as possible. In thrift RPC
implementation, we make a thrift thread handling a TransmitData() RPC to block
for extended period of time when the receiver is not yet created when the call
arrives. In KRPC implementation, we store TransmitData() or EndDataStream()
requests which arrive before the receiver is ready in a per-receiver early
sender list stored in KrpcDataStreamMgr. These RPC calls will be processed
and responded to when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a queue for deferred processing.
The stashed RPC requests will not be responded to until they are processed
so as to exert back pressure to the senders. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
33 files changed, 3,155 insertions(+), 184 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/13
--

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-08 Thread Michael Ho (Code Review)
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/8023

to look at the new patch set (#12).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents avoids the possibility that a thread is stuck in the RPC code
for extended amount of time without checking for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, the number of service threads equals the
number of logical cores. The service threads are shared across all queries so
the RPC handler should avoid blocking as much as possible. In thrift RPC
implementation, we make a thrift thread handling a TransmitData() RPC to block
for extended period of time when the receiver is not yet created when the call
arrives. In KRPC implementation, we store TransmitData() or EndDataStream()
requests which arrive before the receiver is ready in a per-receiver early
sender list stored in KrpcDataStreamMgr. These RPC calls will be processed
and responded to when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a queue for deferred processing.
The stashed RPC requests will not be responded to until they are processed
so as to exert back pressure to the senders. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
33 files changed, 3,154 insertions(+), 184 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/12
--

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-08 Thread Michael Ho (Code Review)
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/8023

to look at the new patch set (#11).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents avoids the possibility that a thread is stuck in the RPC code
for extended amount of time without checking for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, the number of service threads equals the
number of logical cores. The service threads are shared across all queries so
the RPC handler should avoid blocking as much as possible. In thrift RPC
implementation, we make a thrift thread handling a TransmitData() RPC to block
for extended period of time when the receiver is not yet created when the call
arrives. In KRPC implementation, we store TransmitData() or EndDataStream()
requests which arrive before the receiver is ready in a per-receiver early
sender list stored in KrpcDataStreamMgr. These RPC calls will be processed
and responded to when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a queue for deferred processing.
The stashed RPC requests will not be responded to until they are processed
so as to exert back pressure to the senders. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
33 files changed, 3,151 insertions(+), 184 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/11
--

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-08 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 10: Code-Review+2


--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 10
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Wed, 08 Nov 2017 20:22:06 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-08 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 10:

(3 comments)

http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc@408
PS10, Line 408: deferred_rpcs_.front().release();
not needed.


http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc@425
PS10, Line 425: COUNTER_ADD(recvr_->num_deferred_batches_, 1);
can be done without holding the lock.


http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/row-batch.h
File be/src/runtime/row-batch.h:

http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/row-batch.h@145
PS10, Line 145: tuple_ofsets
typo.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 10
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Wed, 08 Nov 2017 19:32:05 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-07 Thread Michael Ho (Code Review)
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/8023

to look at the new patch set (#10).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents some issues with thrift RPC in which a thread may be stuck in
an RPC call without being able to check for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, there are 64 service threads. The service
threads are shared across all queries so the RPC handler should avoid
blocking as much as possible. In thrift RPC implementation, we make a thrift
thread handling a TransmitData() RPC to block for extended period of time
when the receiver is not yet created when the call arrives. In KRPC
implementation, we store TransmitData() or EndDataStream() requests
which arrive before the receiver is ready in a per-receiver early sender list
stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to
when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a blocked_sender_ queue to be 
processed
later. The stashed RPC request will not be responded to until it is processed
so as to exert back pressure to the client. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Build passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
33 files changed, 3,159 insertions(+), 183 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/10
--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-07 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 10:

(6 comments)

http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-mgr.cc@62
PS9, Line 62:   FLAGS_datastream_service_num_deserialization_threads, 1,
> why disallow the setting of this greater than num_cores? Given that it take
Yes. The assumption is that the critical section is small and most threads 
won't block for too long so no point in pushing above number of cores. To be 
consistent with other knobs we have (e.g. # service threads, reactor threads), 
we shouldn't impose an implicit upper bound.


http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@67
PS9, Line 67: the
> the resources
Done


http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@195
PS9, Line 195: current_ba
> current_batch_
Done


http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@225
PS9, Line 225: DCHECK
> shoudl we also DCHECK that num_pending_enqueue_ == 0 (otherwise, we could h
Done


http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@248
PS9, Line 248: current_batch_.reset(resu
> this is part of the same operation as line 244 (both are adjusting the queu
Done


http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@252
PS9, Line 252:   // It's important that the dequeuing of 'deferred_rpcs_' is 
done after the entry
> how about moving this to line 250 now that it's that scope that drops the l
Done



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 10
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Wed, 08 Nov 2017 03:35:43 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-07 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 9:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.h
File be/src/runtime/krpc-data-stream-recvr.h:

http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.h@128
PS9, Line 128: Takes over the RPC payload of an early sender to 
'deferred_rpcs_' queue of the
Takes over the RPC state of an early sender for deferred processing and kicks...



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 9
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Wed, 08 Nov 2017 00:54:50 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-07 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 9:

(6 comments)

http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-mgr.cc@62
PS9, Line 62:   min(FLAGS_datastream_service_num_deserialization_threads, 
CpuInfo::num_cores()),
why disallow the setting of this greater than num_cores? Given that it takes 
locks, there could be some benefit to doing so, right?


http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@67
PS9, Line 67: data
the resources


http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@195
PS9, Line 195: cur_batch_
current_batch_


http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@225
PS9, Line 225: DCHECK
shoudl we also DCHECK that num_pending_enqueue_ == 0 (otherwise, we could have 
acquired the lock while it was dropped for deserialization and there is still a 
row batch)?


http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@248
PS9, Line 248: batch_queue_.pop_front();
this is part of the same operation as line 244 (both are adjusting the queue 
state), so mind moving it to be adjacent?


http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@252
PS9, Line 252: // Don't hold lock when calling EnqueueDeserializeTask() as 
it may block.
how about moving this to line 250 now that it's that scope that drops the lock. 
Also, it's important that this happens after batch_queue_.pop_front(), right? 
maybe note that.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 9
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Wed, 08 Nov 2017 01:32:58 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-07 Thread Michael Ho (Code Review)
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/8023

to look at the new patch set (#9).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents some issues with thrift RPC in which a thread may be stuck in
an RPC call without being able to check for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, there are 64 service threads. The service
threads are shared across all queries so the RPC handler should avoid
blocking as much as possible. In thrift RPC implementation, we make a thrift
thread handling a TransmitData() RPC to block for extended period of time
when the receiver is not yet created when the call arrives. In KRPC
implementation, we store TransmitData() or EndDataStream() requests
which arrive before the receiver is ready in a per-receiver early sender list
stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to
when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a blocked_sender_ queue to be 
processed
later. The stashed RPC request will not be responded to until it is processed
so as to exert back pressure to the client. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Build passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
33 files changed, 3,156 insertions(+), 183 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/9
--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-07 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 9:

(21 comments)

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.h@435
PS8, Line 435: 'num_requ
> 'num_request' requests
Done


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc@62
PS8, Line 62: ds, C
> how was that chosen? do we have a test case that causes this queue to fill
Just a large enough number to reduce the chance of the queue filling up.  Yes, 
need to come up with a test case to fill up this queue.


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc@109
PS8, Line 109:   // Let the receiver take over the RPC payloads of early 
senders and process them
 :   // asynchronously.
 :   for (unique_ptr this comment seems out of place. this is more an implementation detail of t
Done


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h
File be/src/runtime/krpc-data-stream-recvr.h:

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@129
PS8, Line 129: n
> and start a deserialization task to process it asynchronously.
Done


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@130
PS8, Line 130: ask to p
> this "transfer" is in the oppose direction of how our "Transfer" methods us
Renamed to TakeOverEarlySender().


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@197
PS8, Line 197: eive
> cpu time or wall time?
wall-clock time.


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@200
PS8, Line 200: _;
> same question
wall-clock time.


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@67
PS8, Line 67: data
> the resources
Done


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@72
PS8, Line 72: imit, the
> RPC state
Done


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@73
PS8, Line 73: nto 'deferred_
> we shouldn't normally refer to private fields in public class comments, but
Done


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@87
PS8, Line 87:   void TakeOverEarlySender(std::unique_ptr ctx);
> same comment as recvr header comment.
Done


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@195
PS8, Line 195:   // cur_batch_ must be replaced with the
> I don't think we need this loop. see other comments in this function.
Done


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@197
PS8, Line 197: atch = nullptr;
> nit: consider swapping the order of these so that the fast case comes first
Done


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@210
PS8, Line 210: }
> nit: i think we could do without some of the blank lines in this method to
I leave a blank line between each loop exiting conditions.


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@211
PS8, Line 211:
> Actually, I think the loop exiting condition is not quite right, which led
There is an invariant that EOS cannot be sent by a sender when there is 
outstanding TransmitData() RPC so we should be able to get by by just checking 
for the termination condition of: (num_remaining_senders_ == 0 && 
batch_queue_.empty())


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@218
PS8, Line 218:
> given that we just checked the other two loop exit conditions, isn't this d
Converted to DCHECK().


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@229
PS8, Line 229: d
> to parallelize the CPU bound deserialization work.
Done


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@239
PS8, Line 239: }
> once you get rid of the loop, I think you'll be able to eliminate this unlo
Done


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@389
PS8, Line 389:   status.ToProto(ctx->response->mutable_status());
> at this point, 'ctx' effectively takes ownership, right? we should add a co
Done


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@406
PS8, Line 406: const RowBatchHeaderPB& header = 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-07 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 8:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@211
PS8, Line 211: deferred_rpcs_.empty() && batch_queue_.empty()
> why not write this condition as:
Actually, I think the loop exiting condition is not quite right, which led to 
this confusin conditional.  The loop exiting condition for "we're done" should 
check that there are no more senders, and that there's nothing left to drain 
from the deferred_rpcs_ queue and that there's no pending insert into 
batch_queue_.

So, the third wait loop condition should be something like:

(num_remaining_senders > 0 || !deferred_rpcs_.empty() || num_pending_enqueue_ > 
0)

and then this if-stmt conditional can just be:
if (batch_queue_.empty())

and then the DCHECK can be the negation of that third wait loop conditional:

// Wait loop is exited with an empty batch_queue_ only when there will be no 
more batches.
DCHECK(num_remaining_senders == 0 && deferred_rpcs_.empty() && 
num_pending_enqueue_ == 0);

And then you can get rid of the outer loop. That outer loop should be removed 
since it's effectively a busy wait (and I think we could get into a busy wait 
state in the previous patchsets).



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 8
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Tue, 07 Nov 2017 15:58:21 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-06 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 8:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-sender.cc@434
PS8, Line 434:   proxy_->TransmitDataAsync(req, _, _controller_,
May want to call resp_.Clear() too.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 8
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Mon, 06 Nov 2017 22:06:34 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-06 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 8:

(23 comments)

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.h@435
PS8, Line 435: a request
'num_request' requests


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc@62
PS8, Line 62: 1
how was that chosen? do we have a test case that causes this queue to fill up?


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc@109
PS8, Line 109:   // Transfer the early senders into 'deferred_rpcs_' queue 
of the corresponding
 :   // sender queue. This makes sure new incoming RPCs won't 
pass these early senders,
 :   // leading to starvation.
this comment seems out of place. this is more an implementation detail of the 
receiver and handled properly inside ProcessEarlySender().  You could 
incorporate this in the comment for ProcessEarlySender() (to motivate why it 
uses the deferred_rpcs_ queue).


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h
File be/src/runtime/krpc-data-stream-recvr.h:

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@129
PS8, Line 129: .
and start a deserialization task to process it asynchronously.


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@130
PS8, Line 130: Transfer
this "transfer" is in the oppose direction of how our "Transfer" methods 
usually go (e.g. src->TransferResourcesOwnership(dest)). Maybe call this 
ProcessEarlySender() (though I don't love "process" either since it's so vague).


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@197
PS8, Line 197: time
cpu time or wall time?


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@200
PS8, Line 200: time
same question


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@67
PS8, Line 67: data
the resources


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@72
PS8, Line 72: 'payload'
RPC state


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@73
PS8, Line 73: deferred_rpcs_
we shouldn't normally refer to private fields in public class comments, but 
given this is an internal class to the recvr, we can leave this.


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@87
PS8, Line 87:   void TransferEarlySender(std::unique_ptr ctx);
same comment as recvr header comment.


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@195
PS8, Line 195: while (current_batch_.get() == nullptr) {
I don't think we need this loop. see other comments in this function.


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@197
PS8, Line 197: !is_cancelled_ && batch_queue_.empty()
nit: consider swapping the order of these so that the fast case comes first 
(!batch_queue_.empty()) but also to match the comment ("or we know we're done" 
corresponds to the is_cancelled_ and num_remaining_senders_ == 0 cases).


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@201
PS8, Line 201:   CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, 
_cancelled_);
 :   CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, 
_cancelled_);
 :   CANCEL_SAFE_SCOPED_TIMER(
 :   received_first_batch_ ? nullptr : 
recvr_->first_batch_wait_total_timer_,
 :   _cancelled_);
there's got to be a cleaner way to do this but ignore for now


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@210
PS8, Line 210:
nit: i think we could do without some of the blank lines in this method to make 
more code fit on a screen


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@211
PS8, Line 211: deferred_rpcs_.empty() && batch_queue_.empty()
why not write this condition as:
num_renaming_senders_ == 0

then, it's more clear that these three conditions correspond to the loop exit 
conditions.


http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@218
PS8, Line 218: !batch_queue_.empty()
given that we just checked the other two loop exit conditions, isn't this 
definitely true? i.e. we 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-05 Thread Michael Ho (Code Review)
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/8023

to look at the new patch set (#8).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents some issues with thrift RPC in which a thread may be stuck in
an RPC call without being able to check for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, there are 64 service threads. The service
threads are shared across all queries so the RPC handler should avoid
blocking as much as possible. In thrift RPC implementation, we make a thrift
thread handling a TransmitData() RPC to block for extended period of time
when the receiver is not yet created when the call arrives. In KRPC
implementation, we store TransmitData() or EndDataStream() requests
which arrive before the receiver is ready in a per-receiver early sender list
stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to
when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a blocked_sender_ queue to be 
processed
later. The stashed RPC request will not be responded to until it is processed
so as to exert back pressure to the client. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Build passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
33 files changed, 3,138 insertions(+), 183 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/8
--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-03 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 7:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@199
PS7, Line 199:   DeserializeTask payload =
 :   {DeserializeTaskType::EARLY_SENDERS, finst_id, 
dest_node_id, 0};
 :   deserialize_pool_.Offer(move(payload));
> doesn't this mean we make early sender draining single threaded? shoudl we
Yes. Actually, I just realized that early senders may also be passed by 
incoming row batches before they are deserialized by the deserialization thread 
pool, leading to extended response time for early senders.

A simpler scheme would be to actually put the early senders into the 
'deferred_batches' queue of the corresponding sender's queue so new incoming 
row batches cannot pass it.

Regarding the parallelism for draining the deferred_batches queue, one simple 
thing to do is to enqueue as many deserialization requests as there are entries 
in the 'deferred_batch' queue. The deserialization thread logic will simply 
peek the first entry of the queue and try to insert it if there is space. An 
entry is popped off the queue only if it can be inserted. This may be wasteful 
if the 'batch_queue' fills up before all deserialization thread requests are 
drained but hopefully the peeking logic shouldn't take too long. We can be more 
fancy and record the deserialized size of each entry in deferred_batches_ and 
determine how many entries we can pop off deferred_batches_ queue without going 
over the limit.


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@235
PS7, Line 235: for (const unique_ptr& ctx : 
early_senders.waiting_sender_ctxs) {
> shouldn't we process waiting_sender_ctxs before closed_sender_ctxs? Otherwi
Yes, it's impossible for the same sender in both queues at the same time but 
yeah, I can switch the order if it's easier to understand.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 7
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Sat, 04 Nov 2017 04:00:00 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-03 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 7:

(37 comments)

http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@102
PS7, Line 102: the batch is added
 : /// to the receiver's 'deferred_batches_'
it's not really the batch added. and it's not just a single structure for the 
receiver (it may go into one of many queues for merging exchange). So how about 
saying:
... the RPC state is saved into a deferred queue.


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@104
PS7, Line 104: from the pending sender queue
how about: ... from a deferred RPC queue and the row batch is deserialized.


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@393
PS7, Line 393:
quick comment for why we define a move constructor and move operator=, since we 
don't typically want to define those.


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@199
PS7, Line 199:   DeserializeTask payload =
 :   {DeserializeTaskType::EARLY_SENDERS, finst_id, 
dest_node_id, 0};
 :   deserialize_pool_.Offer(move(payload));
doesn't this mean we make early sender draining single threaded? shoudl we 
instead use the sender_id in this case as well and offer work per sender? or do 
we think this doesn't matter?


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@217
PS7, Line 217: already_unregistered
that shouldn't be possible in the DEFERRED_BATCHES case, right? so i'd probably 
move this DCHECK into the cases below so you can tighten it up.


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@235
PS7, Line 235: for (const unique_ptr& ctx : 
early_senders.waiting_sender_ctxs) {
shouldn't we process waiting_sender_ctxs before closed_sender_ctxs? Otherwise, 
if the same sender is in both lists we'll process those RPCs out of order. I 
guess that can't really happen given the current implementation of not 
responding to early RPCs and that senders only let one in flight, but it still 
seems to make more sense to do it the other way around.


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@236
PS7, Line 236: already_unregistered
why is this possible in the waiting_sender_ctxs case but not the 
closed_sender_ctxs case?


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@247
PS7, Line 247: already_unregistered
why is that possible?


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@248
PS7, Line 248: recvr->AddDeferredBatches(task.sender_id);
So I guess we no longer multithread within a single sender queue (and for 
non-merging, within a single receiver) doing it this way. I think that's okay 
but was it intentional?


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.h
File be/src/runtime/krpc-data-stream-recvr.h:

http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.h@78
PS7, Line 78: The caller must acquire data from the
:   /// returned batch
is that talking about calling TransferAllResources(), or can the caller do it 
directly?


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@127
PS6, Line 127:   // If true, the receiver fragment for this stream got 
cancelled.
> For the non-merging case, there is essentially only one queue.
As mentioned elsewhere, I'm not totally convinced yet that this is the right 
way to do it but, yes, we can think about it more and change it later if 
necessary.


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@72
PS7, Line 72: s
typo


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@77
PS7, Line 77: Adds as many deferred batches as possible
hmm I'm still not convinced this is the right thing to do (in the merging 
case). It seems like it's left up to chance as to the order that deferred 
batches are drained across the sender queues. But we can think about this more 
and address it later.


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@97
PS7, Line 97: (1) 'batch_queue' is empty and there is no 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-03 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 7:

(5 comments)

http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@430
PS7, Line 430: i
id


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@436
PS7, Line 436: specifies
identifies


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@61
PS7, Line 61:
nit: blank space


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.h
File be/src/runtime/krpc-data-stream-sender.h:

http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.h@136
PS7, Line 136: //
nit: ///


http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@182
PS7, Line 182:  It points to
delete.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 7
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Fri, 03 Nov 2017 18:13:57 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-03 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 7:

(81 comments)

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/exec/kudu-util.h
File be/src/exec/kudu-util.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/exec/kudu-util.h@45
PS6, Line 45: .ok())
> nit: use same mangling between this and KUDU_RETURN_IF_ERROR (prepend seems
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/exec-env.cc
File be/src/runtime/exec-env.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/exec-env.cc@89
PS6, Line 89: "processing threads. If left at default value 0, it will be 
set to number of CPU "
> how are these defaults chosen?
I didn't really change these parameters from Henry's patch.

There are no good defaults for the service queue size any way (IMPALA-6116) but 
1024 seems to be a reasonable bound for untracked memory consumption.

The latest PS sets num_svc_threads to match the number of cores on the system.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@97
PS6, Line 97: etion at any one
: /// time). The rate of transmission is controlled by the 
receiver: a sender will only
: /// schedule batch transmission when the previous transmissi
> i'm confused what the "two" cases are from this comment.  Also, I think it'
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@103
PS6, Line 103: batch que
> deserialized?
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@104
PS6, Line 104: moved from t
> what is this "respectively" referring to?
The two cases. Removed as this seems unnecessary.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@249
PS6, Line 249:  for Transm
> unclear what that means, maybe stale comment? And this should say something
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@258
PS6, Line 258:
> isn't that part of "memory pointed to by 'request'"? If so and you want to
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@294
PS6, Line 294:
> How about getting rid of this typedef? The code seems easier to understand
This data structure has been renamed to DeserializeTask and also changed in the 
latest patch.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@428
PS6, Line 428:
> a sender
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@314
PS3, Line 314: t
> Done
Oops..missed it. Will update in later patch.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc@59
PS6, Line 59:
> I don't have a strong preference either way, but it'd be nice to be consist
My general guideline is to use lambda if it's one or two liners and use bind 
for larger functions. I guess there many different opinions on this topic. I 
try to optimize for readability.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc@322
PS6, Line 322: tonicMillis() + STREAM_E
> shouldn't be plural
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.h
File be/src/runtime/krpc-data-stream-recvr.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.h@119
PS6, Line 119: row batch i
> row batch is deserialized
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@52
PS6, Line 52: total amount of
> it's not clear what that means just from reading the comment. It'd be nice
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@113
PS6, Line 113: erialized
> how about using unique_ptr since this owns the row batch (until it's transf
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@127
PS6, Line 127:   // If true, the receiver fragment for this stream got 
cancelled.
> given that a single soft limit is imposed across all sender queues, does it
For the non-merging case, there is essentially only one queue.

For the merging case, I can see that it's possible for a blocked row batch to a 
sender queue being passed by either new incoming row batches or blocked row 
batches to another sender queue. 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-03 Thread Michael Ho (Code Review)
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/8023

to look at the new patch set (#7).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents some issues with thrift RPC in which a thread may be stuck in
an RPC call without being able to check for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, there are 64 service threads. The service
threads are shared across all queries so the RPC handler should avoid
blocking as much as possible. In thrift RPC implementation, we make a thrift
thread handling a TransmitData() RPC to block for extended period of time
when the receiver is not yet created when the call arrives. In KRPC
implementation, we store TransmitData() or EndDataStream() requests
which arrive before the receiver is ready in a per-receiver early sender list
stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to
when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a blocked_sender_ queue to be 
processed
later. The stashed RPC request will not be responded to until it is processed
so as to exert back pressure to the client. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Build passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
33 files changed, 3,124 insertions(+), 180 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/7
--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-01 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 6:

(8 comments)

Here's my last set of comments for this round.

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc@59
PS6, Line 59: boost::bind
I don't have a strong preference either way, but it'd be nice to be consistent 
between either using bind or [], rather than both...


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc@322
PS6, Line 322: RespondToTimedOutSenders
shouldn't be plural


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@52
PS6, Line 52: overflows the queue
it's not clear what that means just from reading the comment. It'd be nice to 
briefly explain that this is talking about the soft limit of the number of 
bytes across all sender queues.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@105
PS6, Line 105:   // number of pending row batch insertion.
 :   int num_pending_enqueue_ = 0;
it's not clear what a "pending insertion" means or why we have this.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@113
PS6, Line 113: RowBatch*
how about using unique_ptr since this owns the row batch (until it's 
transferred to current_batch_)?


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@127
PS6, Line 127:   queue blocked_senders_;
given that a single soft limit is imposed across all sender queues, does it 
make sense that the blocked_senders_ are maintained per sender? Why don't we 
maintain a single blocked_senders_ list per datastream recvr?

Hmm, I guess we need to know if this sender has a blocked sender in GetBatch(). 
But given the single limit, it seems wrong that one sender's row batches can 
bypass another sender once we get into the blocked sender situation. i.e. the 
flow of batches across senders seems quite different depending on when the 
limit was reached.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@231
PS6, Line 231: proto_batch
update


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@322
PS6, Line 322: payload->rpc_context->RespondSuccess();
doing this in Close() goes against the paradigm that Close() is only about 
releasing (local) resources. We've been going that way because there might be 
no where to bubble up a status from Close().  At least RespondSuccess() doesn't 
return a status, I suppose.  But is there any place sooner we could do this? 
Does it make sense to do in during Cancel instead?



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 6
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Thu, 02 Nov 2017 00:10:49 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-11-01 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 6:

(14 comments)

Note to self: remaining files: krpc-data-stream-{mgr,recvr}.cc

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/exec-env.cc
File be/src/runtime/exec-env.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/exec-env.cc@89
PS6, Line 89: "Number of datastream service processing threads");
how are these defaults chosen?


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h
File be/src/runtime/row-batch.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@50
PS6, Line 50: outbound
I think we should say something about KRPC to at least give that hint. maybe:

A KRPC outbound row batch...


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@60
PS6, Line 60: sizeof(int32_t)
sizeof(tuple_offsets_[0]) seems clearer and more robust


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@354
PS6, Line 354:   /// it is ignored. This function does not Reset().
we should preserve this comment when removing the thrift variant. So you could 
just put the new decl here now so we don't forget that.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@424
PS6, Line 424:   ///
nit: i don't think we generally have all these line breaks between parameter 
comments.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@426
PS6, Line 426:  .
delete space


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@444
PS6, Line 444: nput_
delete


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@447
PS6, Line 447: input_
delete


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@537
PS6, Line 537:   std::string compression_scratch_;
this seems like a hack and we could do something simpler, but let's leave it 
alone for now.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.cc
File be/src/runtime/row-batch.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.cc@241
PS6, Line 241:   // as sidecars to the RpcController.
this comment was probably meant to be deleted?


http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto
File common/protobuf/data_stream_service.proto:

http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@29
PS6, Line 29: fragment
isn't this the id of the instance?  The comment in KrpcDataStreamSender is 
clearer, let's copy that:
  /// Sender instance id, unique within a fragment.
  int sender_id_;


http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@59
PS6, Line 59:   // Id of this fragment in its role as a sender.
same


http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto
File common/protobuf/row_batch.proto:

http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@32
PS3, Line 32: = 2;
> That's the tuple data sent as sidecar. Clarified in the new comments.
My point is that writing it like 'tuple_data' doesn't make sense since it's not 
a field in this struct. You should just write:
Size of the tuple data (sent as a sidecar) in bytes ...


http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto
File common/protobuf/row_batch.proto:

http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto@32
PS6, Line 32: epeated int32 row_tuples = 2;
why is this needed? i don't see it used. The size of it is used, though it 
seems like even that can be inferred from the descriptors.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 6
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Wed, 01 Nov 2017 21:48:21 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-31 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 6:

(37 comments)

Next batch.

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@103
PS6, Line 103: processed
deserialized?


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@104
PS6, Line 104: respectively
what is this "respectively" referring to?


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@249
PS6, Line 249: proto batch
unclear what that means, maybe stale comment? And this should say something 
about the row batch being contained in 'request'.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@258
PS6, Line 258: the actual row batch
isn't that part of "memory pointed to by 'request'"? If so and you want to 
explicitly mention row batch, maybe say "including the serialized row batch"?


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@294
PS6, Line 294:   typedef std::unique_ptr DeserializeWorkItem;
How about getting rid of this typedef? The code seems easier to understand if 
the unique_ptr is visible in the fn decls. it's a bit harder than necessary to 
reasonable about DeserializeWorkItem& and &&, given that this is now directly a 
unique_ptr.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@428
PS6, Line 428: senders
a sender


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.h
File be/src/runtime/krpc-data-stream-recvr.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.h@119
PS6, Line 119: 'row_batch'
row batch is deserialized

('row_batch' isn't a variable in this context)


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h
File be/src/runtime/krpc-data-stream-sender.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@57
PS6, Line 57: per_channel_buffer_size' is the buffer size in bytes allocated to 
each channel's
:   /// accumulated row batch.
still not clear what that means. This isn't really the size of a buffer, is it? 
How about something like:

... is a soft limit on the buffering, in bytes, into the channel's accumulating 
row batch before it will be sent.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@111
PS6, Line 111: cached protobuf
serialized


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@130
PS6, Line 130: cached proto
outbound row


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@133
PS6, Line 133: Two OutboundRowBatch reused across RPCs
Maybe say:
The outbound row batches are double-buffered.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@67
PS6, Line 67: can
it looks like there's a third interface now: SerializeAndSendBatch() that takes 
an Impala rowbatch.

But now that we have the outbound batch on the sender, why not just use that 
for the RANDOM case and do SerializeBatch() then TransmitData(), so that we can 
simplify the abstraction?


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@81
PS6, Line 81: TearDown
Teardown()


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@82
PS6, Line 82: TearDown
Teardown()


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@126
PS6, Line 126: .
or if the preceding RPC failed.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@141
PS6, Line 141:   // Flushes any buffered row batches and sends the EOS RPC to 
close the channel.
Returns error status if...
Also indicate that it blocks until the EOS RPC is complete.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@157
PS6, Line 157: below
delete.

If that's all we need it for, maybe just remember the capacity?


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@166
PS6, Line 166:   RuntimeState* runtime_state_ = nullptr;
is that actually used?


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@176
PS6, Line 176: current_outbound_batch_
the name of this is confusing because it's so similar to current_batch_idx_, 
but it means something different (and often the opposite).  How about calling 
this:

rpc_outbound_batch_ or rpc_in_flight_batch_?

You could even get rid of the 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-30 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 6:

(5 comments)

Some initial comments for this round.

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/exec/kudu-util.h
File be/src/exec/kudu-util.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/exec/kudu-util.h@45
PS6, Line 45: status_
nit: use same mangling between this and KUDU_RETURN_IF_ERROR (prepend seems 
better too since it's not a member of a class and status_ is a comment member 
name).


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@97
PS6, Line 97: two things:
: /// process it immediately, add it to a fixed-size 'batch queue' 
for later processing,
: /// or defer processing the RPC if the 'batch-queue' is full
i'm confused what the "two" cases are from this comment.  Also, I think it's 
kind of confusing what "processing" means. Should it read something like:

two things: deserialize it immediately adding it to the a 'batch queue', or 
defer deserializing and respond to the RPC later if the 'batch queue' is full.

Also, the batch queue isn't really "fixed size", right?


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@171
PS6, Line 171: if (!blocked_senders_.empty()) {
we talked about this in person, but want to note it so I don't forget: this 
loop will deque a "random" number of blocked senders, until the first one 
happens to finish deserializing and shows up in batch_queue_. That seems wrong.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@220
PS6, Line 220: DCHECK_GT(num_remaining_senders_, 0);
why do we have this DCHECK (i.e. why is this condition important here), and 
could it be violated with out of order RPCs?


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@237
PS6, Line 237: recvr_->ExceedsLimit(batch_size)
it seems like we should be ORing that with !block_senders_.empty(), or 
something. Otherwise, stuff that's been in the block_senders_ queue for a while 
can be passed by new stuff. i.e. block_senders_ senders can get starved -- 
maybe that explains longer than expected RPC times we've seen in some cases?



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 6
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Mon, 30 Oct 2017 19:35:45 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-27 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 6:

(15 comments)

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@123
PS6, Line 123: until
 :   // any in-flight RPC completes
if the preceding RPC is still in-flight.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@135
PS6, Line 135: free
frees


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@258
PS6, Line 258: stack
code


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@261
PS6, Line 261: stack
code


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@264
PS6, Line 264: thread
threads


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@279
PS6, Line 279: thread
threads


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h
File be/src/runtime/row-batch.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@104
PS6, Line 104: CachedProtobufRowBatch
OutboundRowBatch


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@144
PS6, Line 144:   /// Populate a row batch from a serialized protobuf 
input_batch by copying
 :   /// input_batch's tuple_data into the row batch's mempool and 
converting all
 :   /// offsets in the data back into pointers.
stale


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@455
PS6, Line 455: tuple_offsets
input_*


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@531
PS6, Line 531: CachedProtobufRowBatch
OutboundProtoRowBatch


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/util/network-util.cc
File be/src/util/network-util.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/util/network-util.cc@41
PS6, Line 41: using kudu::Sockaddr;
undo. Bad rebase.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/util/network-util.cc@120
PS6, Line 120: Sockaddr sock;
undo


http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto
File common/protobuf/data_stream_service.proto:

http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@38
PS6, Line 38: 'tuple_offsets'
tuple offsets' buffer


http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@43
PS6, Line 43: 'tuple_data'
tuple data's buffer


http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto
File common/protobuf/row_batch.proto:

http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto@25
PS6, Line 25: The indices of the sidecars are included in the header below.
delete



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 6
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Fri, 27 Oct 2017 22:41:44 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-27 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 6:

(6 comments)

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@75
PS3, Line 75: g two
> see comment in row-batch.h about this terminology.
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@178
PS3, Line 178:   // The two OutboundRowBatch which are re-used across multiple 
RPCs. Each entry contains
 :   // a RowBatchHeaderPB and the buffers for the serialized tuple 
offsets and data. When
 :   // one is being used for an in-flight RPC, the execution 
thread continues to run and
 :   // serializes another row batch into the other entry. 
'current_batch_idx_' is the index
 :   // of the entry being used by the in-flight or last completed 
RPC.
 :   //
 :   // TODO: replace this with an ac
> We need to access these fields from the callback (e.g. due to retry).
req_ removed in PS5.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h
File be/src/runtime/row-batch.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@52
PS3, Line 52: class OutboundRowBatch {
> ProtoRowBatch is a conceptual representation of a serialized row batch in b
Removed in PS5.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@89
PS3, Line 89:
> Had hard time coming up with a good name to indicate the re-use of the vect
Renamed to OutboundRowBatch in PS5.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@431
PS3, Line 431:   ///
 :   /// 'uncompressed_size': Updated with the uncompressed size of 
'tuple_data'.
 :   ///
 :   /// 'is_compressed': Sets to true if compression is applied on 
'tuple_data'.
 :   /// False otherwise.
 :   ///
 :   /// Returns error status if serialization failed. Returns OK 
otherwise.
 :   /// TODO: clean this up once the thrift RPC implementation is 
removed.
 :   Status Serialize(bool full_dedup, vector* 
tuple_offsets, string* tuple_data,
 :   int64_t* uncompressed_size, bool* is_compressed);
 :
 :   /// Shared implementation between thrift and protobuf to 
deserialize a row batch.
 :   ///
 :   /// 'input_tuple_offsets': an int32_t array of tuples; offsets 
into 'input_tuple_data'.
 :   /// Used for populating the tuples in the row batch with 
actual pointers.
 :   ///
 :   /// 'input_tuple_data': contains pointer and size of tuples' 
data buffer.
 :   /// If 'is_compressed' is true, the data is compressed.
 :   ///
 :   /// 'uncompressed_size': the uncompressed size of 
'input_tuple_data' if it's compressed.
 :   ///
 :   /// 'is_compressed': True if 'input_tuple_data' is compressed.
 :   ///
 :   /// TODO: clean this up once the thrift RPC implementation is 
removed.
 :   void Deserialize(const kudu::Slice& tuple_offsets, const 
kudu::Slice& tuple_data,
 :   int64_t uncompressed_size, bool is_compressed);
 :
 :   typedef FixedSizeHashTable DedupMap;
 :
 :   /// The total size of all data represented in this row batch 
(tuples and referenced
 :   /// string and collection data). This is the size of the row 
batch after removing all
 :   /// gaps in the auxiliary and deduplicated tuples (i.e. the 
smallest footprint for the
 :   /// row batch). If the distinct_tuples argument is non-null, 
full deduplication is
 :   /// enabled. The distinct_tuples map must be empty.
 :   int64_t TotalByteSize(DedupMap* distinct_tuples);
 :
 :   void SerializeInternal(int64_t size, DedupMap* distinct_tuples,
 :   vector* tuple_offsets, string* tuple_data);
 :
 :   /// All members below need to be handled in R
> let's leave a TODO about cleaning this all up once we can remove the thrift
TODO added. Cannot file a JIRA as apache JIRA is being re-indexed.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/service/data-stream-service.cc@64
PS3, Line 64:
:
:
:
:
:
:
:
:
> see comment in row-batch.h.  I think we should do this later 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-27 Thread Michael Ho (Code Review)
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/8023

to look at the new patch set (#6).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents some issues with thrift RPC in which a thread may be stuck in
an RPC call without being able to check for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, there are 64 service threads. The service
threads are shared across all queries so the RPC handler should avoid
blocking as much as possible. In thrift RPC implementation, we make a thrift
thread handling a TransmitData() RPC to block for extended period of time
when the receiver is not yet created when the call arrives. In KRPC
implementation, we store TransmitData() or EndDataStream() requests
which arrive before the receiver is ready in a per-receiver early sender list
stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to
when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a blocked_sender_ queue to be 
processed
later. The stashed RPC request will not be responded to until it is processed
so as to exert back pressure to the client. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Build passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M be/src/util/network-util.cc
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
34 files changed, 2,932 insertions(+), 175 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/6
--
To view, visit 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-27 Thread Michael Ho (Code Review)
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/8023

to look at the new patch set (#5).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents some issues with thrift RPC in which a thread may be stuck in
an RPC call without being able to check for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, there are 64 service threads. The service
threads are shared across all queries so the RPC handler should avoid
blocking as much as possible. In thrift RPC implementation, we make a thrift
thread handling a TransmitData() RPC to block for extended period of time
when the receiver is not yet created when the call arrives. In KRPC
implementation, we store TransmitData() or EndDataStream() requests
which arrive before the receiver is ready in a per-receiver early sender list
stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to
when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a blocked_sender_ queue to be 
processed
later. The stashed RPC request will not be responded to until it is processed
so as to exert back pressure to the client. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Build passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M be/src/util/network-util.cc
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
34 files changed, 2,929 insertions(+), 175 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/5
--
To view, visit 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-25 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 4:

(86 comments)

Reply to some of the comments for now. Will look into removing ProtoRowBatch 
next. Will not rebase until next version of the patch is pushed.

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc
File be/src/common/status.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc@246
PS3, Line 246: void Status::FromProto(const StatusPB& status) {
> this is the same as FromThrift() effectively, right? Can we make the two lo
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc@262
PS3, Line 262: void Status::FreeMessage() noexcept {
> same comment. let's make this and ToThrift look the same so it's obvious th
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/exec/exchange-node.cc
File be/src/exec/exchange-node.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/exec/exchange-node.cc@109
PS3, Line 109:   RETURN_IF_CANCELLED(state);
> why do we do this in some Open() but not all? Should we just do it in ExecN
Actually, I noticed similar patterns in other exec nodes. Let me keep this line 
of change for now and do the refactoring in another change.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.h
File be/src/rpc/rpc-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.h@154
PS3, Line 154:   ~RpcMgr() {
> nit: one-liner?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.cc
File be/src/rpc/rpc-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.cc@77
PS3, Line 77:   VLOG_QUERY << "Registered KRPC service: " << 
service_pool->service_name();
> Should we add a log message stating which services we registered with KRPC?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@63
PS3, Line 63: tiple RPCs. The logical connection between a pair of client
> I don't think that's accurate. see questions in krpc-data-stream-recvr.h ab
Comment rephrased.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@93
PS3, Line 93: After the first batch has been received, a sender continues to 
send batches, one
> XXX check whether these are really different
Rephrased.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@94
PS3, Line 94: () RPC
> what buffer? do you mean queue?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@108
PS3, Line 108:
> what does that mean?  Is it saying that during unordinary operation, a send
It means the fragment instance completes without hitting any error. If a 
fragment instance ends early, it may end up not calling EOS() RPC. For 
instance, if there is any cancellation, the stream will just be torn down 
without sending EOS as it's expected that the receivers' fragments will be 
cancelled too.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@140
PS3, Line 140: RPCs, and may be cancell
> what are "it" and "its" here? "the sender" and "the RPC's"?
the result will be dropped silently by the RPC layer.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@141
PS3, Line 141: /// time. If an in-flight RPC is cancelled at the sender side, 
the reply from the receiver
> is that still true now that we have cancellation of RPCs?
Yup. If an RPC is cancelled before the result arrives, the KRPC code will just 
ignore the result.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@153
PS3, Line 153:
> sending fragment?
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@164
PS3, Line 164:  structure is const
> is that because the recvr hasn't showed up yet, or because the stream's que
both. Comments updated.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@166
PS3, Line 166:
> is that talking about the 'request' field below, or something different?
Yes.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@175
PS3, Line 175:   kudu::rpc::RpcContext* rpc_context;
> what's the relationship between this and proto_batch?
proto_batch is the inbound row_batch populated from information in 'request' 
and 'rpc_context'. I agree that it's not strictly necessary to keep it in 
TransmitDataCtx.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@178
PS3, Line 178:   /// such as the destination finst ID, plan node ID and the row 
batch header.
> who owns it?
'context'. Commends added.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@235
PS3, 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-25 Thread Michael Ho (Code Review)
Michael Ho has uploaded a new patch set (#4). ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents some issues with thrift RPC in which a thread may be stuck in
an RPC call without being able to check for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, there are 64 service threads. The service
threads are shared across all queries so the RPC handler should avoid
blocking as much as possible. In thrift RPC implementation, we make a thrift
thread handling a TransmitData() RPC to block for extended period of time
when the receiver is not yet created when the call arrives. In KRPC
implementation, we store TransmitData() or EndDataStream() requests
which arrive before the receiver is ready in a per-receiver early sender list
stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to
when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a blocked_sender_ queue to be 
processed
later. The stashed RPC request will not be responded to until it is processed
so as to exert back pressure to the client. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Build passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/exec/kudu-util.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M be/src/util/network-util.cc
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
34 files changed, 2,969 insertions(+), 175 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/4
--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-25 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 3:

(16 comments)

Some more comments, still going though.

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@75
PS3, Line 75: cached
see comment in row-batch.h about this terminology.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@89
PS3, Line 89: // safe to free them once the callback has been invoked.
I think we should add a reference to KUDU-2011 somewhere here like:

Note that due to KUDU-2011, timeout cannot be used with outbound sidecars. The 
client has no idea when it is safe to reclaim the sidecar buffer (~RpcSidecar() 
should be the right place, except that's currently called too early).  
RpcController::Cancel(), however, ensures that the callback is called only 
after the RPC layer no longer references the sidecar buffer.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@92
PS3, Line 92: query
query? does it mean fragment instance?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@124
PS3, Line 124: Shutdown the RPC thread
is that still accurate?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@139
PS3, Line 139:   int buffer_size_;
that could use a comment.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@141
PS3, Line 141:   const TNetworkAddress address_;
 :   TUniqueId fragment_instance_id_;
 :   PlanNodeId dest_node_id_;
those could be commented together to say they identify the destination. it's a 
little odd that plan node id is prefixed "dest" when the others are not.

it also seems weird that we need both these and the req_ field since shouldn't 
they just be stored there?  Or seems we should get rid of the req_ and just 
generate it when sending.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@159
PS3, Line 159: num_cached_proto_batches_
caps since constant?
also, can this ever be something other than 2 without writing the code? i.e. 
doesn't the code assume this value is 2 in various ways?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@175
PS3, Line 175: proxy
proxy_


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@197
PS3, Line 197:   bool remote_recvr_closed_ = false;
why is that needed now?
also, shouldn't we do something different, at a higher level, in that case 
(like cancel this instance)?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@200
PS3, Line 200:   Status rpc_status_;
I think it would help to associate this with rpc_in_flight_ - move them 
adjacent and say that rpc_status_ is valid only when rpc_in_flight_ is false, 
or something.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@218
PS3, Line 218: 2
if we have the constant, shouldn't that use it?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@334
PS3, Line 334:   auto pred = [this]() -> bool { return !rpc_in_flight_ || 
ShouldTerminate(); };
 :   auto timeout = std::chrono::system_clock::now() + 
milliseconds(50);
 :   while (!rpc_done_cv_.wait_until(*lock, timeout, pred)) {
 : timeout = system_clock::now() + milliseconds(50);
 :   }
seems simpler to just write:

while (rpc_in_flight_ && !ShouldTerminate()) {
  auto timeout = std::chrono::system_clock::now() + milliseconds(50);
  rpc_done_cv_.wait_until(*lock, timeout);
}

or even better to use wait_for() which takes the relative timeout.
Or should we use our ConditionVariable wrapper? Especially if we want to start 
instrumenting these things better. But if it's work to switch it over, it's 
okay to keep it condition_variable, but let's at least make the code more 
straight forward.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h
File be/src/runtime/row-batch.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@52
PS3, Line 52: struct ProtoRowBatch {
I think we should get rid of this structure all together. IMO, the indirection 
just adds confusion.

On the receive side, it seems we can just get the header and sidecars directly 
from the request, which is already threaded through the RPC handler anyway.  
Pulling it into a ProtoRowBatch just makes it unclear where the not yet 
deserialized rowbatch comes from.

On the send side, I think we should just work directly on CachedProtoRowBatch 
(but rename that thing, see below). The 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-12 Thread Tim Armstrong (Code Review)
Tim Armstrong has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 3:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@414
PS3, Line 414: Status
> I think that status is not getting checked by the caller. I thought Tim mad
GCC 4.9.2 doesn't support [[nodiscard]] - we need to upgrade GCC to get this in 
GCC builds. Any clang build (including clang-tidy) will catch this - LMK if it 
doesn't.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 3
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Reviewer: Tim Armstrong 
Gerrit-Comment-Date: Thu, 12 Oct 2017 21:06:02 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-11 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 3:

(34 comments)

Another batch of comments...

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@63
PS3, Line 63: ongoing transmission from a client to a server as a 'stream'
I don't think that's accurate. see questions in krpc-data-stream-recvr.h about 
stream definition.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@93
PS3, Line 93: process it immediately, add it to a fixed-size 'batch queue' for 
later processing
XXX check whether these are really different


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@164
PS3, Line 164: deferred processing
is that because the recvr hasn't showed up yet, or because the stream's queue 
is full, or both?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@235
PS3, Line 235: node_id
dest_node_id?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@239
PS3, Line 239: Ownership of the receiver is shared between this DataStream mgr 
instance and the
 :   /// caller.
that seems unnecessary but don't change it now.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@246
PS3, Line 246:
'proto_batch'?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@248
PS3, Line 248: .
'request'.

Also document what 'response' and 'context' are.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@266
PS3, Line 266: Notifies the receiver
is this an RPC handler? I think we should just be explicit about which of these 
methods are RPC handlers.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@267
PS3, Line 267: The RPC
what RPC is this talking about? If this is a handler, then it's clear.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@274
PS3, Line 274: Closes
Does it close or cancel? (or is there no difference?)


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@284
PS3, Line 284: RPCs which were buffered
To be consistent with terminology used in class comment, maybe say "deferred 
RPCs"


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@340
PS3, Line 340: ragment instance id, 0
what is that saying? is that a misplaced comma or am I reading this wrong?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@341
PS3, Line 341: instance id changes
I don't understand this.  it kinda sounds like we're trying to be able to find 
all instances of this fragment, but then wouldn't we iterate until the fragment 
id changes (not the instance id)?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@349
PS3, Line 349:   struct EarlySendersList {
hmm, I guess we need this now that we can't block the RPC thread?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@358
PS3, Line 358: Time
Monotonic time


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@374
PS3, Line 374: time
monotonic time


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@382
PS3, Line 382:   boost::unordered_set closed_stream_cache_;
all this parallel startup stuff really needs to be revisited (but not for this 
change). it's too complex and brittle.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@386
PS3, Line 386: Deserialize
maybe call it DeserializeDeferred() or DeserializeWorker() to make it clearer 
that this is only for the deferred (slow) path, since the normal path will also 
have to deserialize (but doesn't use this code).


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@404
PS3, Line 404:   void EnqueueRowBatch(DeserializeWorkItem&& payload);
how about grouping this with Deferred function above since it's related. Also, 
I think the name should be less generic. Like maybe EnqueueDeferredBatch() or 
EnqueueDeferredRpc() (does the response happen before or after this deferred 
work?)


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@413
PS3, Line 413: block
what's that?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@414
PS3, Line 414: Status
I think that status is not getting checked by the caller. I thought Tim made 
Status warn on unused result -- why is it not catching this? (Or do we still 
need to annotate each method?).



[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-11 Thread Sailesh Mukil (Code Review)
Sailesh Mukil has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 3:

(23 comments)

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.cc
File be/src/rpc/rpc-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.cc@77
PS3, Line 77:
Should we add a log message stating which services we registered with KRPC?

It might be useful later on as we add more services, while trying to debug user 
issues to know which services are on KRPC and which are on thrift. Granted 
there are other ways to find that, but this is easily accessible and 
straightforward.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@314
PS3, Line 314: w
susper-nit: Capital 'W'


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@198
PS1, Line 198: deseria
> Deserialization pool's purpose is to avoid executing deserialization in lin
Hmm, this seems like it would be a nice thing to have. Is the absence of a 
MemTracker the only hindrance to early deserialization?

Is there some way we could add this to the process MemTracker if we can't 
attribute it to a query? If it's too complicated for now, let's track this with 
a JIRA and write down some ideas there for the next KRPC milestone.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@227
PS1, Line 227: // to make sure that the close operation is performed so add 
to per-recvr list of
 : // pending closes. It's possible for a sender to issue EOS 
RPC without sending any
 : // rows if no rows are m
> This may be a bit subtle but this is equivalent to the logic in the non-KRP
Ah you're right. Case 2 is what changed in IMPALA-5199, but it looks like 
that's automatically fixed here.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@112
PS3, Line 112:   for (unique_ptr& ctx : 
early_senders.waiting_sender_ctxs) {
 :  EnqueueRowBatch({recvr->fragment_instance_id(), move(ctx)});
 :  num_senders_waiting_->Increment(-1);
 :   }
 :   for (unique_ptr& ctx : 
early_senders.closed_sender_ctxs) {
 :  recvr->RemoveSender(ctx->request->sender_id());
 :  Status::OK().ToProto(ctx->response->mutable_status());
 :  ctx->context->RespondSuccess();
 :  num_senders_waiting_->Increment(-1);
 :   }
It's not possible for the same sender to be in waiting_senders_ctxs and 
closed_sender_ctxs for a given receiver right?

Because if it would, it would make more sense to service the 
'closed_sender_ctxs' before the 'waiting_sender_ctxs' since we may as well 
close the receiver instead of wasting CPU processing those RPCs for a while.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@140
PS3, Line 140:   while (range.first != range.second) {
 : shared_ptr recvr = range.first->second;
 : if (recvr->fragment_instance_id() == finst_id &&
 : recvr->dest_node_id() == node_id) {
 :   return recvr;
 : }
 : ++range.first;
 :   }
I'm thinking it makes sense to prioritize finding the receiver with the 
assumption that we will find it in the receiver_map_, rather than assume that 
it most likely will already be unregistered.

In other words, I think it may be more beneficial CPU-wise for general 
workloads to look in the 'receiver_map_' before looking into the 
'closed_stream_cache_'.

What do you think?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@151
PS3, Line 151: KrpcDataStreamMgr::AddEarlySender
We could merge the implementations of AddEarlySender() and 
AddEarlyClosedSender() by using templates and some extra params, but maybe the 
code complexity isn't worth it.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@174
PS3, Line 174: // that it is still preparing, so add payload to 
per-receiver list.
Add comment "In the worst case, this RPC is so late that the receiver is 
already unregistered and removed from the closed_stream_cache_, in which case 
it will be responded to by the Maintenance thread after 
FLAGS_datastream_sender_timeout_ms."


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@242
PS3, Line 242:  {
   

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-11 Thread Mostafa Mokhtar (Code Review)
Mostafa Mokhtar has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 3:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@644
PS3, Line 644: for (int i = 0; i < channels_.size(); ++i) {
The RowBatch is serialized once per channel which is very wasteful.
IMPALA-6041.
Compare to 
https://github.com/michaelhkw/incubator-impala/blob/krpc-testing-hung/be/src/runtime/data-stream-sender.cc#L429



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 3
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Wed, 11 Oct 2017 19:14:14 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-10 Thread Dan Hecht (Code Review)
Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 3:

(17 comments)

Some initial comments.

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc
File be/src/common/status.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc@246
PS3, Line 246:   }
this is the same as FromThrift() effectively, right? Can we make the two look 
the same to make that obvious?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc@262
PS3, Line 262:
same comment. let's make this and ToThrift look the same so it's obvious they 
do the same things.
nit: also, could we order the functions consistently? We currently have 
ToThrift, FromThrift, FromProto, ToProto, and that ordering just makes it 
slightly slower to read through since it doesn't follow a pattern.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/exec/exchange-node.cc
File be/src/exec/exchange-node.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/exec/exchange-node.cc@109
PS3, Line 109:   RETURN_IF_CANCELLED(state);
why do we do this in some Open() but not all? Should we just do it in 
ExecNode::Open() and remove the ones in the derived classes?  okay to do 
separately from this patch.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.h
File be/src/rpc/rpc-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.h@154
PS3, Line 154:   }
nit: one-liner?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@94
PS3, Line 94: buffer
what buffer? do you mean queue?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@108
PS3, Line 108: During ordinary operation
what does that mean?  Is it saying that during unordinary operation, a sender 
can have both a TransmitData() and EndDataStream() call in-flight 
simultaneously?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@140
PS3, Line 140: it will quietly drop its
what are "it" and "its" here? "the sender" and "the RPC's"?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@141
PS3, Line 141: /// it returns.
is that still true now that we have cancellation of RPCs?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@153
PS3, Line 153: fragment
sending fragment?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@166
PS3, Line 166: request
is that talking about the 'request' field below, or something different?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@175
PS3, Line 175:   const TransmitDataRequestPB* request;
what's the relationship between this and proto_batch?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@178
PS3, Line 178:   /// responded to.
who owns it?


http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/data_stream_service.proto
File common/protobuf/data_stream_service.proto:

http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/data_stream_service.proto@29
PS3, Line 29:   optional int32 sender_id = 2;
:
:   optional int32 dest_node_id = 3;
what are "IDs" in these cases? let's improve the documentation here. Especially 
since type is no longer PlanNodeId (and why is that?).


http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto
File common/protobuf/row_batch.proto:

http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@30
PS3, Line 30: int32
in thrift we had TTupleId. Is there a reason we aren't defining those types as 
well to make the structure clearer?


http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@32
PS3, Line 32: tuple_data
what's tuple_data? not a field in this structure...


http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@39
PS3, Line 39: Size
size of what?


http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@42
PS3, Line 42: (TODO(KRPC): native enum)
do we plan to fix that?



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 3
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Dan Hecht 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Tue, 10 Oct 2017 20:15:20 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-09 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 3:

(3 comments)

http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@103
PS2, Line 103: fragment_instance_id_(fragment_instance_id),
> address_(destination)
Done


http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@141
PS2, Line 141:  const TNetworkAddress add
> const TNetworkAddress address_;
Done


http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@485
PS2, Line 485:   batch_->CommitLastRow();
 :   return Status::OK();
 : }
 :
 : void
> This is broken if the RPC was rejected for FLAGS_backend_client_connection_
Changed to using async RPC for this in the new patch. This ensures that we can 
check for cancellation while waiting for replies from the remote server.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 3
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Mon, 09 Oct 2017 18:10:45 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-09 Thread Michael Ho (Code Review)
Hello Sailesh Mukil,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/8023

to look at the new patch set (#3).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents some issues with thrift RPC in which a thread may be stuck in
an RPC call without being able to check for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, there are 64 service threads. The service
threads are shared across all queries so the RPC handler should avoid
blocking as much as possible. In thrift RPC implementation, we make a thrift
thread handling a TransmitData() RPC to block for extended period of time
when the receiver is not yet created when the call arrives. In KRPC
implementation, we store TransmitData() or EndDataStream() requests
which arrive before the receiver is ready in a per-receiver early sender list
stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to
when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a blocked_sender_ queue to be 
processed
later. The stashed RPC request will not be responded to until it is processed
so as to exert back pressure to the client. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Build passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M be/src/util/network-util.cc
M cmake_modules/FindProtobuf.cmake
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
32 files changed, 2,867 insertions(+), 166 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/3
--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-06 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 2:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc@141
PS2, Line 141:   while (true) {
 : // wait until something shows up or we know we're done
 : while (!is_cancelled_ && batch_queue_.empty() && 
blocked_senders_.empty()
 : && num_remaining_senders_ > 0) {
 :   VLOG_ROW << "wait arrival fragment_instance_id=" << 
recvr_->fragment_instance_id()
 :<< " node=" << recvr_->dest_node_id();
 :   // Don't count time spent waiting on the sender as active 
time.
 :   CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, 
_cancelled_);
 :   CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, 
_cancelled_);
 :   CANCEL_SAFE_SCOPED_TIMER(
 :   received_first_batch_ ? nullptr : 
recvr_->first_batch_wait_total_timer_,
 :   _cancelled_);
 :   data_arrival_cv_.wait(l);
 : }
 :
 : if (is_cancelled_) return Status::CANCELLED;
 :
 : if (blocked_senders_.empty() && batch_queue_.empty()) {
 :   DCHECK_EQ(num_remaining_senders_, 0);
 :   return Status::OK();
 : }
 :
 : received_first_batch_ = true;
 :
 : // Either we'll consume a row batch from batch_queue_, or 
it's empty. In either case,
 : // take a blocked sender and retry delivering their batch. 
There is a window between
 : // which a deferred batch is dequeued from blocked_senders_ 
queue and when it's
 : // inserted into batch_queue_. However, a receiver won't 
respond to the sender until
 : // the deferred row batch has been inserted. The sender will 
wait for all in-flight
 : // RPCs to complete before sending EOS RPC so 
num_remaining_senders_ should be > 0.
 : if (!blocked_senders_.empty()) {
 :   recvr_->mgr_->EnqueueRowBatch(
 :   {recvr_->fragment_instance_id(), 
move(blocked_senders_.front())});
 :   blocked_senders_.pop();
 : }
 :
 : if (!batch_queue_.empty()) {
 :   RowBatch* result = batch_queue_.front().second;
 :   
recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
 :   VLOG_ROW << "fetched #rows=" << result->num_rows();
 :   current_batch_.reset(result);
 :   *next_batch = current_batch_.get();
 :   batch_queue_.pop_front();
 :   return Status::OK();
 : }
> This loop may lead to live lock in the rare case in which blocked_senders_
Actually, mis-read the thing in the heat of debugging. If both queues are 
empty, we may return early in line 160 above if num_remaining_senders == 0. So, 
we shouldn't spin forever. Otherwise, the thread should sleep and wait in line 
153. This loop tends to have the unfortunate behavior of popping all entries 
off blocked_senders_ first before dropping the lock and sleeping on line 153.

Although there is a window in which both queues are empty when a row batch is 
deserialized and moved from blocked_senders_ to batch_queue_, it should be 
impossible for num_remaining_senders_ to reach 0 in that window. The reason is 
that the sender of that row batch will not be responded to until after the row 
batch has been inserted into batch_queue_ (after it has been popped from 
blocked_senders_). In which case, batch_queue_ will become non-empty first 
before the remote sender gets a reply.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 2
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Fri, 06 Oct 2017 22:03:51 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-10-06 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 2:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc@141
PS2, Line 141:   while (true) {
 : // wait until something shows up or we know we're done
 : while (!is_cancelled_ && batch_queue_.empty() && 
blocked_senders_.empty()
 : && num_remaining_senders_ > 0) {
 :   VLOG_ROW << "wait arrival fragment_instance_id=" << 
recvr_->fragment_instance_id()
 :<< " node=" << recvr_->dest_node_id();
 :   // Don't count time spent waiting on the sender as active 
time.
 :   CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, 
_cancelled_);
 :   CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, 
_cancelled_);
 :   CANCEL_SAFE_SCOPED_TIMER(
 :   received_first_batch_ ? nullptr : 
recvr_->first_batch_wait_total_timer_,
 :   _cancelled_);
 :   data_arrival_cv_.wait(l);
 : }
 :
 : if (is_cancelled_) return Status::CANCELLED;
 :
 : if (blocked_senders_.empty() && batch_queue_.empty()) {
 :   DCHECK_EQ(num_remaining_senders_, 0);
 :   return Status::OK();
 : }
 :
 : received_first_batch_ = true;
 :
 : // Either we'll consume a row batch from batch_queue_, or 
it's empty. In either case,
 : // take a blocked sender and retry delivering their batch. 
There is a window between
 : // which a deferred batch is dequeued from blocked_senders_ 
queue and when it's
 : // inserted into batch_queue_. However, a receiver won't 
respond to the sender until
 : // the deferred row batch has been inserted. The sender will 
wait for all in-flight
 : // RPCs to complete before sending EOS RPC so 
num_remaining_senders_ should be > 0.
 : if (!blocked_senders_.empty()) {
 :   recvr_->mgr_->EnqueueRowBatch(
 :   {recvr_->fragment_instance_id(), 
move(blocked_senders_.front())});
 :   blocked_senders_.pop();
 : }
 :
 : if (!batch_queue_.empty()) {
 :   RowBatch* result = batch_queue_.front().second;
 :   
recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
 :   VLOG_ROW << "fetched #rows=" << result->num_rows();
 :   current_batch_.reset(result);
 :   *next_batch = current_batch_.get();
 :   batch_queue_.pop_front();
 :   return Status::OK();
 : }
This loop may lead to live lock in the rare case in which blocked_senders_ and 
batch_queue_ are both empty in the window in which the deserialization threads 
are still working on inserting the row batches in blocked_senders_ into 
batch_queue_ after batch_queue_ has been exhausted. Spinning here forever will 
not drop the lock, causing the deserialization threads to wait forever to 
insert into batch_queue_.


http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@485
PS2, Line 485: // Sleep for sometime before retrying.
 : if (RpcMgr::IsServerTooBusy(rpc_controller_)) {
 :   SleepForMs(FLAGS_rpc_retry_interval_ms);
 :   continue;
 : }
This is broken if the RPC was rejected for 
FLAGS_backend_client_connection_num_retries number of times in a row. In which 
case, we will break out of the loop and return Status::OK(). This can lead to 
remote receiver hanging forever as it still thinks there are still active 
senders.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 2
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Fri, 06 Oct 2017 17:58:27 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-09-29 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 2:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@103
PS2, Line 103: address_(MakeNetworkAddress(destination.hostname, 
destination.port)),
address_(destination)


http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@141
PS2, Line 141:  TNetworkAddress address_;
const TNetworkAddress address_;



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 2
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Fri, 29 Sep 2017 17:49:44 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-09-28 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 1:

(20 comments)

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110
PS1, Line 110: sent
> nit: received
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110
PS1, Line 110: no TransmitData() RPCs will successfully deliver their
 : /// payload.
> Why would there be a TransmitData() RPC if EndDataStream() has already been
It's the expectation that the sender will not send any TransmitData() RPC after 
EndDataStream() RPC. Comments updated.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@129
PS1, Line 129: /// In exceptional circumstances, the data stream manager will 
garbage-collect the closed
Comments added.

 > sending its last batch is bounded. That seems possible to solve
 > with sender-side state if the receiver notifies the sender that the
 > receiver was not present and the sender can infer it was closed
 > cleanly.

Not sure I followed the proposed solution. The sender can track whether it has 
ever successfully sent a row batch to the receiver. However, in the example 
above, instance 2 of F3 has never sent a row batch to the receiver before it 
hits the limit and closes. In which case, it's not clear how the sender can 
differentiate between an early sender case (i.e. the receiver is still being 
prepared) vs a closed receiver.

It seems a more fool-proof solution is for coordinator to notify all backend 
nodes about completed/aborted/cancelled queries and that seems to be the 
absolutely safe point to remove closed stream entries. Alternately, we can use 
statestore update to broadcast this information and make the maintenance thread 
in DataStreamMgr remove the receiver entries based on the updates.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@133
PS1, Line 133: /// period expires.
> As per Tim's comment above, I would also reference IMPALA-3990 as a TODO he
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@159
PS1, Line 159: Consider tracking, on the sender, whether a batch has been 
successfully sent or
 : ///   not. That's enough state to realise that a receiver has 
failed (rather than not
 : ///   prepared yet), and the data stream mgr can use that to 
fail an RPC fast, rather than
 : ///   having the closed-stream list.
> It would be nice to have a JIRA for this and reference it here.
Actually, now that I think about it, this idea may not work due to examples 
such as IMPALA-3990 above. The problem is that the receiver may have been 
closed (legitimately) even before a particular sender managed to send a batch 
to it. In which case, it would falsely assume that the receiver has failed. 
Similarly, if no rows were ever materialized from the sender side, we still 
need to closed-stream cache to differentiate between the closed receiver vs 
receiver which is still being prepared.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@353
PS1, Line 353: waiting_senders
> This is a little confusing to follow in the .cc file, since when I see "wai
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@356
PS1, Line 356: closed_senders
> Similarly, we could call this 'closed_senders_ctxs'.
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@168
PS1, Line 168:   num_senders_waiting_->Increment(1);
 :   total_senders_waited_->Increment(1);
 :   RecvrId recvr_id = make_pair(fragment_instance_id, 
request->dest_node_id());
 :   auto payload =
 :   make_unique(proto_batch, context, 
request, response);
 :   
early_senders_map_[recvr_id].waiting_senders.push_back(move(payload));
> I'm wondering if it makes sense to add simple inline functions that encapsu
I don't find it too unreadable being inline but I guess it's less distracting 
if the logic is encapsulated in a function


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@198
PS1, Line 198: AddData
> Isn't the point of the deserialize pool to deserialize the payload early?
Deserialization pool's purpose is to avoid executing deserialization in line in 
the main thread for early or blocked senders. For instances, if there are 
multiple row batches (from multiple early senders) for a given receiver, the 
deserialization thread 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-09-28 Thread Michael Ho (Code Review)
Hello Sailesh Mukil,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/8023

to look at the new patch set (#2).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents some issues with thrift RPC in which a thread may be stuck in
an RPC call without being able to check for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, there are 64 service threads. The service
threads are shared across all queries so the RPC handler should avoid
blocking as much as possible. In thrift RPC implementation, we make a thrift
thread handling a TransmitData() RPC to block for extended period of time
when the receiver is not yet created when the call arrives. In KRPC
implementation, we store TransmitData() or EndDataStream() requests
which arrive before the receiver is ready in a per-receiver early sender list
stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to
when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a blocked_sender_ queue to be 
processed
later. The stashed RPC request will not be responded to until it is processed
so as to exert back pressure to the client. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Build passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
M common/thrift/generate_error_codes.py
30 files changed, 2,822 insertions(+), 163 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/2
--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-09-26 Thread Sailesh Mukil (Code Review)
Sailesh Mukil has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 1:

(5 comments)

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@198
PS1, Line 198: AddData
Isn't the point of the deserialize pool to deserialize the payload early?
Here, we're just calling AddData() on the payloads for early senders after the 
corresponding receiver has been created.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@165
PS1, Line 165: Either we'll consume a row batch from batch_queue_, or it's empty
Shouldn't there always be something in the batch_queue_ if there's something in 
the blocked_senders_ list? Since we fill the blocked_senders_ only if the queue 
is at its limit.

And we also logically service the batches from batch_queue_ first before 
servicing the batches from the blocked_senders_ list.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@166
PS1, Line 166: There is a window
Just to make things clearer, could you specify what there's a window for?


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@225
PS1, Line 225:
There is a problem here. When we release lock_here, an arbitrary number of 
senders could call AddBatch(), and all their batches would get enqueued even 
though the ExceedsLimit() would be true. This breaks the guarantee of the queue 
not being over committed more than a single batch.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@284
PS1, Line 284:   for (const auto& queue_entry: batch_queue_) delete 
queue_entry.second;
batch_queue_.clear() ?



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-Comment-Date: Tue, 26 Sep 2017 18:11:25 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-09-25 Thread Sailesh Mukil (Code Review)
Sailesh Mukil has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 1:

(14 comments)

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110
PS1, Line 110: sent
nit: received


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110
PS1, Line 110: no TransmitData() RPCs will successfully deliver their
 : /// payload.
Why would there be a TransmitData() RPC if EndDataStream() has already been 
sent? Doesn't the sender send it only if it knows all its TransmitData() RPCs 
have been processed?


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@133
PS1, Line 133: /// period expires.
As per Tim's comment above, I would also reference IMPALA-3990 as a TODO here 
for later fixing.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@159
PS1, Line 159: Consider tracking, on the sender, whether a batch has been 
successfully sent or
 : ///   not. That's enough state to realise that a receiver has 
failed (rather than not
 : ///   prepared yet), and the data stream mgr can use that to 
fail an RPC fast, rather than
 : ///   having the closed-stream list.
It would be nice to have a JIRA for this and reference it here.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@353
PS1, Line 353: waiting_senders
This is a little confusing to follow in the .cc file, since when I see 
"waiting_senders", I expect it to be a set of some unique identifiers for a 
Sender ID.

Although this is unique to a specific sender, it would be a little clearer to 
call this 'waiting_senders_ctxs'.

Let me know what you think.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@356
PS1, Line 356: closed_senders
Similarly, we could call this 'closed_senders_ctxs'.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@168
PS1, Line 168:   num_senders_waiting_->Increment(1);
 :   total_senders_waited_->Increment(1);
 :   RecvrId recvr_id = make_pair(fragment_instance_id, 
request->dest_node_id());
 :   auto payload =
 :   make_unique(proto_batch, context, 
request, response);
 :   
early_senders_map_[recvr_id].waiting_senders.push_back(move(payload));
I'm wondering if it makes sense to add simple inline functions that encapsulate 
this functionality; for the sake of readability.

Eg: AddEarlyWaitingSender(), AddEarlyClosedSender()


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@213
PS1, Line 213: If no receiver found, but not in the closed stream cache
nit: If no receiver is found, and the receiver is not in the closed stream 
cache as well, we still need...


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@218
PS1, Line 218:   RecvrId recvr_id = make_pair(fragment_instance_id, 
dest_node_id);
 :   auto payload = make_unique(context, 
request, response);
 :   
early_senders_map_[recvr_id].closed_senders.emplace_back(move(payload));
 :   num_senders_waiting_->Increment(1);
 :   total_senders_waited_->Increment(1);
AddEarlyClosedSender() as per comment above, if you agree.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@227
PS1, Line 227:   if (LIKELY(recvr != nullptr)) 
recvr->RemoveSender(request->sender_id());
 :   Status::OK().ToProto(response->mutable_status());
 :   context->RespondSuccess();
This may need some modification based on the recent commit for IMPALA-5199:
https://github.com/apache/incubator-impala/commit/5119ced50c0e0c4001621c9d4da598c187bdb580


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@75
PS1, Line 75: ("new data")
I'm having some trouble understanding what this means. Could you please clarify?


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@271
PS1, Line 271: data_arrival_cv_.notify_all();
Shouldn't this notify be done while holding the lock_ ?


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@285
PS1, Line 285:   while (!blocked_senders_.empty()) {
nit: Add comment: Respond to blocked senders' RPCs



[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-09-11 Thread Tim Armstrong (Code Review)
Tim Armstrong has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 1:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

Line 129: /// In exceptional circumstances, the data stream manager will 
garbage-collect the closed
There's a pre-existing flaw in the reasoning here that we should call out. 
"Exceptional circumstances" is vague and I think hides a distinction between an 
unhealthy cluster with extreme delays and the expected behaviour of certain 
long-running queries. I think the problem is an invalid assumption that the the 
receiver sends batches on a regular cadence with a bounded delay before the 
first batch is sent and when each subsequent batch is sent. That assumption is 
incorrect. I think we should call it out in this comment so that readers 
understand the current flaw. Here's an example where it's wrong.

Consider a plan with three fragments.

  F1 (long-running)
   |
   V
  F2 (limit = 1 on exchange)
   |
   V
  F3 (long-running selective scan)

1. The fragments all start up.
2. Instance 1 of F3 immediately finds and returns a matching row, which is sent 
to F2.
3. This causes F2 to hit its limit, close its exchange and tear itself down.
4. Let's assume F1 also has a lot of work to do and won't finish for 20 minutes
5. Instance 2 of F3 is still churning away on the scan. After 10 minutes it 
finally find a matching row.
6. F3 tries to send the row, can't find the receiver after a timeout and 
returns an error to the coordinator
7. The coordinator cancels the query and returns an error

There are two problems here:
1. The query failed when it shouldn't have
2. F3 wasn't cancelled when it was no longer needed and used lots of resources 
unnecessarily.

The JIRA is IMPALA-3990. I believe that the main reason we haven't seen this in 
practice is that it can only occur when there's a limit without order in a 
subquery. Most queries with that property are non-deterministic and it doesn't 
really make a lot of sense to have a long-running query that returns 
non-deterministic results.

But this actually blocked me from implementing early-close for joins with empty 
build sides, which is a nice optimisations.

There may also be a slightly different invalid assumption that the time between 
the receiver closing the exchange and the sender sending its last batch is 
bounded. That seems possible to solve with sender-side state if the receiver 
notifies the sender that the receiver was not present and the sender can infer 
it was closed cleanly.


-- 
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Michael Ho 
Gerrit-Reviewer: Tim Armstrong 
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-09-09 Thread Michael Ho (Code Review)
Michael Ho has uploaded a new change for review.

  http://gerrit.cloudera.org:8080/8023

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents some issues with thrift RPC in which a thread may be stuck in
an RPC call without being able to check for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, there are 64 service threads. The service
threads are shared across all queries so the RPC handler should avoid
blocking as much as possible. In thrift RPC implementation, we make a thrift
thread handling a TransmitData() RPC to block for extended period of time
when the receiver is not yet created when the call arrives. In KRPC
implementation, we store TransmitData() or EndDataStream() requests
which arrive before the receiver is ready in a per-receiver early sender list
stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to
when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a blocked_sender_ queue to be 
processed
later. The stashed RPC request will not be responded to until it is processed
so as to exert back pressure to the client. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
---

* Build passed with FLAGS_use_krpc=true.

TO DO
-

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
---
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exec/data-sink.cc
M be/src/exec/exchange-node.cc
M be/src/rpc/CMakeLists.txt
M be/src/rpc/rpc-mgr.cc
M be/src/rpc/rpc-mgr.h
M be/src/runtime/CMakeLists.txt
M be/src/runtime/data-stream-mgr-base.h
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/krpc-data-stream-mgr.cc
M be/src/runtime/krpc-data-stream-mgr.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/krpc-data-stream-recvr.h
A be/src/runtime/krpc-data-stream-sender.cc
A be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/service/CMakeLists.txt
A be/src/service/data-stream-service.cc
A be/src/service/data-stream-service.h
M be/src/service/impala-server.cc
M common/protobuf/CMakeLists.txt
A common/protobuf/common.proto
A common/protobuf/data_stream_service.proto
A common/protobuf/row_batch.proto
29 files changed, 2,783 insertions(+), 163 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/1
-- 
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Michael Ho 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-08-11 Thread Henry Robinson (Code Review)
Henry Robinson has abandoned this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Abandoned

-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: abandon
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 6
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson 
Gerrit-Reviewer: Henry Robinson 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Mostafa Mokhtar 
Gerrit-Reviewer: Sailesh Mukil 


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-07-14 Thread Sailesh Mukil (Code Review)
Sailesh Mukil has uploaded a new patch set (#6).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch ports the data-flow parts of ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting, and remains
  implemented in Thrift for now. The second, DataStreamService, handles
  large-payload RPCs for transmitting runtime filters and row batches
  between hosts.

* In the DataStreamService, all RPCs use 'native' protobuf. The
  DataStreamService starts on the port previously reserved for the
  StatestoreSubscriberService (which is also a KRPC service), to avoid
  having to configure another port when starting Impala. When the
  ImpalaInternalService is ported to KRPC, all services will run on one
  port.

* To support needing to address two different backend services, a data
  service port has been added to TBackendDescriptor.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter() and TransmitData() RPCs are sent asynchronously using
  KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous
  RPCs. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* Also include a fix for KUDU-2011 that properly allows sidecars to be
  shared between KRPC and the RPC caller (fixing IMPALA-5093, a
  corruption bug).

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without major logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are
  created (by removing CreateImpalaServer), and clarifying the
  relationship between ExecEnv and ImpalaServer. ImpalaServer now
  follows the standard construct->Init()->Start()->Join() lifecycle that
  we use for other services.

* Ensure that all addresses used for KRPCs are fully resolved, avoiding
  the need to resolve them for each RPC.

TESTING
---

* New tests added to rpc-mgr-test.

TO DO
-

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.

Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
---
M .clang-format
M CMakeLists.txt
M be/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exprs/expr-test.cc
M be/src/kudu/rpc/rpc_sidecar.cc
M be/src/kudu/rpc/rpc_sidecar.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/TAcceptQueueServer.cpp
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc-mgr.inline.h
M be/src/rpc/rpc.h
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/scheduling/backend-config-test.cc
M 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-21 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 3:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS3, Line 252: batch->compressed_tuple_data
> The ownership is shared with the batch object. AddSidecar() internally move
I see. I am still getting used to this subtly of passing shared_ptr as argument.


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson 
Gerrit-Reviewer: Henry Robinson 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-21 Thread Henry Robinson (Code Review)
Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 5:

(22 comments)

PS4 is a rebase. PS5 includes the review responses (so diff 4->5 if you want to 
see what changed).

http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS3, Line 106: Ownership is
 :   // shared by the caller, and the RPC subsystem
> Doesn't std::move transfer the ownership so the caller no longer shares the
The shared_ptr is copied in. It's the copy that is then moved into the sidecar 
list.


PS3, Line 143: are owned by the caller
> the ownership is temporarily transferred to the RPC call when this function
I don't think so - the RPC call has pointers, but doesn't have ownership in the 
sense that it has no responsibility for managing a reference count or freeing 
the memory.


PS3, Line 147: 
> Having the names 'func', 'cb' and 'cb_wrapper' all close by each other make
Done


PS3, Line 153: 
> Does this move mean that the params_ member is invalid after this call? If 
Done


PS3, Line 327: 
> Maybe name this 'completion_cb'.
Done


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

PS3, Line 389: equest->mutable_bloom_filter()->set_log_heap_space(0);
 : 
request->mutable_bloom_filter()->set_directory_sidecar_idx(-1);
 :   }
> Why wouldn't a move capture ensure the same thing?
proto_filter is a const shared_ptr&. You can't move from it. Instead, we could 
have the argument be shared_ptr, and move from it here; 
they're basically equivalent, it's just a question of where you make the copy.


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

PS3, Line 1207:   VLOG_QUERY << "Not enough memory to allocate filter: "
  :  << PrettyPrinter::Print(heap_space, 
TUnit::BYTES)
  :  << " (query: " << coord->query_id() << ")";
  :   // Disable, as one missing update means a correct filter 
cannot be 
> I would add this to the commit message. This means we would take double the
I don't think so - because params.directory is a sidecar I don't think it's 
been copied since it arrived on the socket. In the Thrift case, the bytestream 
had to be deserialized into a TBloomFilter. That's what's happening here - the 
equivalent 'deserialization' step.

This path should only get taken the first time a filter arrives, and it does 
briefly keep two filters around (the sidecar should get destroyed as soon as 
the RPC is responded to).


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-recvr.cc
File be/src/runtime/data-stream-recvr.cc:

PS3, Line 280: blocked_senders_.front()
> Is this a right way to dispose a unique_ptr?
Good point - release() is clearer, and get() may have been a benign bug.


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS3, Line 58: 
> Not used.
Done


PS3, Line 60: 
> Not used.
Done


PS3, Line 133: scoped_ptr batch_;
> No one really calls GetNumDataBytesSent() (except from our BE test). So, I'
We're gaining correctness - so worth doing (otherwise if someone decides to use 
it in the future, they might run into problems).


PS3, Line 148: 
> A reader of this code might not immediately understand why this class needs
I expanded the comment here.


PS3, Line 170: 
> Why is this set in Init()? Wouldn't it ideally be set it in the constructor
Moved to c'tor.


PS3, Line 175: proto_batch_idx_
> Just want to make sure that this will increase the shared_ptr refcount? It 
Yep - this was a mistake. Removed auto to make it more explicit.


PS3, Line 203: co
> Prefer a more descriptive name "rpc_completion_cb" or something similar.
Done


PS3, Line 214: ck_guard
> channel == nullptr
Done


PS3, Line 252: batch->tuple_data, );
> Is this transferring the ownership to the RPC subsystem ? AddSideCar() inte
The ownership is shared with the batch object. AddSidecar() internally moves 
from the argument, which is a copy (i.e. its own reference).


PS3, Line 266: .release(), rpc_complete_callback);
> This is a subtle change in behavior from previous Impala version. In partic
Any reasonably conservative timeout runs the risk of false negatives if a 
sender is blocked.

I agree with your analysis about this being a change in behaviour. In practice, 
though, here's what I hope will happen: if one write to a node is slow enough 
to previously trigger the timeout, I would expect the statestore RPCs to also 
go slow (and they will time out); the node will be marked as offline and the 
query will be cancelled. 

If there is a situation where this RPC only is slow in writing (but all other 
RPCs to the server are ok), then I agree 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-21 Thread Henry Robinson (Code Review)
Henry Robinson has uploaded a new patch set (#5).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch ports the data-flow parts of ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting, and remains
  implemented in Thrift for now. The second, DataStreamService, handles
  large-payload RPCs for transmitting runtime filters and row batches
  between hosts.

* In the DataStreamService, all RPCs use 'native' protobuf. The
  DataStreamService starts on the port previously reserved for the
  StatestoreSubscriberService (which is also a KRPC service), to avoid
  having to configure another port when starting Impala. When the
  ImpalaInternalService is ported to KRPC, all services will run on one
  port.

* To support needing to address two different backend services, a data
  service port has been added to TBackendDescriptor.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter() and TransmitData() RPCs are sent asynchronously using
  KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous
  RPCs. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* Also include a fix for KUDU-2011 that properly allows sidecars to be
  shared between KRPC and the RPC caller (fixing IMPALA-5093, a
  corruption bug).

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without major logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are
  created (by removing CreateImpalaServer), and clarifying the
  relationship between ExecEnv and ImpalaServer. ImpalaServer now
  follows the standard construct->Init()->Start()->Join() lifecycle that
  we use for other services.

* Ensure that all addresses used for KRPCs are fully resolved, avoiding
  the need to resolve them for each RPC.

TESTING
---

* New tests added to rpc-mgr-test.

TO DO
-

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.

Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
---
M .clang-format
M CMakeLists.txt
M be/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exprs/expr-test.cc
M be/src/kudu/rpc/rpc_sidecar.cc
M be/src/kudu/rpc/rpc_sidecar.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/TAcceptQueueServer.cpp
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc-mgr.inline.h
M be/src/rpc/rpc.h
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/scheduling/backend-config-test.cc
M 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-21 Thread Henry Robinson (Code Review)
Henry Robinson has uploaded a new patch set (#4).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch ports the data-flow parts of ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting, and remains
  implemented in Thrift for now. The second, DataStreamService, handles
  large-payload RPCs for transmitting runtime filters and row batches
  between hosts.

* In the DataStreamService, all RPCs use 'native' protobuf. The
  DataStreamService starts on the port previously reserved for the
  StatestoreSubscriberService (which is also a KRPC service), to avoid
  having to configure another port when starting Impala. When the
  ImpalaInternalService is ported to KRPC, all services will run on one
  port.

* To support needing to address two different backend services, a data
  service port has been added to TBackendDescriptor.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter() and TransmitData() RPCs are sent asynchronously using
  KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous
  RPCs. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* Also include a fix for KUDU-2011 that properly allows sidecars to be
  shared between KRPC and the RPC caller (fixing IMPALA-5093, a
  corruption bug).

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without major logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are
  created (by removing CreateImpalaServer), and clarifying the
  relationship between ExecEnv and ImpalaServer. ImpalaServer now
  follows the standard construct->Init()->Start()->Join() lifecycle that
  we use for other services.

* Ensure that all addresses used for KRPCs are fully resolved, avoiding
  the need to resolve them for each RPC.

TESTING
---

* New tests added to rpc-mgr-test.

TO DO
-

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.

Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
---
M .clang-format
M CMakeLists.txt
M be/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exprs/expr-test.cc
M be/src/kudu/rpc/rpc_sidecar.cc
M be/src/kudu/rpc/rpc_sidecar.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/TAcceptQueueServer.cpp
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc-mgr.inline.h
M be/src/rpc/rpc.h
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/scheduling/backend-config-test.cc
M 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-19 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 3:

(6 comments)

Some more comments. Still going through the patch.

http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS3, Line 106: Ownership is
 :   // shared by the caller, and the RPC subsystem
Doesn't std::move transfer the ownership so the caller no longer shares the 
ownership, right ?


PS3, Line 143: are owned by the caller
the ownership is temporarily transferred to the RPC call when this function is 
invoked, right ?


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS3, Line 214: !channel
channel == nullptr


PS3, Line 252: batch->compressed_tuple_data
Is this transferring the ownership to the RPC subsystem ? AddSideCar() 
internally uses std::move(). This seems subtle enough to warrant a comment.


PS3, Line 266: MonoDelta::FromMilliseconds(numeric_limits::max())
This is a subtle change in behavior from previous Impala version. In 
particular, FLAGS_backend_client_rpc_timeout_ms marks that the timeout for a 
socket if a thrift thread was stuck writing to the socket.

Given KRPC socket is asynchronous, the DSS may get blocked for quite a while 
until the query gets cancelled. Should we impose some reasonably conservative 
timeout here ?


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/row-batch.cc
File be/src/runtime/row-batch.cc:

PS3, Line 117: DCHECK(
DCHECK_EQ


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson 
Gerrit-Reviewer: Henry Robinson 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-16 Thread Sailesh Mukil (Code Review)
Sailesh Mukil has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 3:

(17 comments)

http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS3, Line 147: func
Having the names 'func', 'cb' and 'cb_wrapper' all close by each other makes 
some of this slightly more complicated to read.

I would opt for renaming 'func' to 'rpc_method' or something, so that it's 
crystal clear that it may not be a callback itself.


PS3, Line 153: std::move(params_)
Does this move mean that the params_ member is invalid after this call? If so, 
it would be good to add a comment where it's declared.


PS3, Line 327: cb
Maybe name this 'completion_cb'.


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

PS3, Line 389: Copying proto_filter here ensures that its lifetime will last at 
least until this
 :   // callback completes.
 :   auto cb = [proto_filter]
Why wouldn't a move capture ensure the same thing?


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

PS3, Line 1207:   // Do an explicit copy of the directory: 'params' may 
have come from an RPC so we
  :   // can't assume ownership of its directory.
  :   bloom_filter_ =
  :   make_unique(params.header, 
params.directory);
I would add this to the commit message. This means we would take double the 
memory cost every time we merge filters right?


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-recvr.cc
File be/src/runtime/data-stream-recvr.cc:

PS3, Line 280: blocked_senders_.front()
Is this a right way to dispose a unique_ptr?


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS3, Line 58: DECLARE_int32(datastream_sender_timeout_ms);
Not used.


PS3, Line 60: DECLARE_int32(state_store_subscriber_port);
Not used.


PS3, Line 133: AtomicInt64 num_data_bytes_sent_
No one really calls GetNumDataBytesSent() (except from our BE test). So, I'm 
not sure we're gaining anything by making this atomic.
Not a big deal, but it might be cheaper to leave it as an int64. Your call 
though.


PS3, Line 148: self_
A reader of this code might not immediately understand why this class needs to 
always have a reference to itself. It would be good to explicitly mention this 
member name in the header where the explanation is given.


PS3, Line 170: self_ = shared_from_this();
Why is this set in Init()? Wouldn't it ideally be set it in the constructor?


PS3, Line 175: auto proto_batch
Just want to make sure that this will increase the shared_ptr refcount? It 
should because it will make an underlying copy of the pointer, but I just want 
to make sure.


PS3, Line 203: cb
Prefer a more descriptive name "rpc_completion_cb" or something similar.


Line 336:   batch_.reset();
DCHECK(self_.get())


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/row-batch.cc
File be/src/runtime/row-batch.cc:

Line 216:   
output_batch->header.set_compression_type(THdfsCompression::LZ4);
Do you think it would be good to add a comment why we don't free the 
'tuple_data' buffer here? Presumably so we can reuse the memory when the 
RowBatch is recycled?


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/row-batch.h
File be/src/runtime/row-batch.h:

PS3, Line 73: == THdfsCompression::LZ4
!= THdfsCompression::NONE

Functionally same, more readable.


PS3, Line 441: FlushMode flush_ = FlushMode::NO_FLUSH_RESOURCES
Why not initialize this and the other args below with the default values in the 
constructor member initialization list?


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson 
Gerrit-Reviewer: Henry Robinson 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-13 Thread Henry Robinson (Code Review)
Henry Robinson has uploaded a new patch set (#3).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch ports the data-flow parts of ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting, and remains
  implemented in Thrift for now. The second, DataStreamService, handles
  large-payload RPCs for transmitting runtime filters and row batches
  between hosts.

* In the DataStreamService, all RPCs use 'native' protobuf. The
  DataStreamService starts on the port previously reserved for the
  StatestoreSubscriberService (which is also a KRPC service), to avoid
  having to configure another port when starting Impala. When the
  ImpalaInternalService is ported to KRPC, all services will run on one
  port.

* To support needing to address two different backend services, a data
  service port has been added to TBackendDescriptor.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter() and TransmitData() RPCs are sent asynchronously using
  KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous
  RPCs. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* Also include a fix for KUDU-2011 that properly allows sidecars to be
  shared between KRPC and the RPC caller (fixing IMPALA-5093, a
  corruption bug).

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without major logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are
  created (by removing CreateImpalaServer), and clarifying the
  relationship between ExecEnv and ImpalaServer. ImpalaServer now
  follows the standard construct->Init()->Start()->Join() lifecycle that
  we use for other services.

* Ensure that all addresses used for KRPCs are fully resolved, avoiding
  the need to resolve them for each RPC.

TESTING
---

* New tests added to rpc-mgr-test.

TO DO
-

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.

Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
---
M .clang-format
M CMakeLists.txt
M be/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exprs/expr-test.cc
M be/src/kudu/rpc/rpc_sidecar.cc
M be/src/kudu/rpc/rpc_sidecar.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/TAcceptQueueServer.cpp
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc-mgr.inline.h
M be/src/rpc/rpc.h
D be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-state.cc
M be/src/runtime/runtime-state.h

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-13 Thread Henry Robinson (Code Review)
Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 2:

(8 comments)

http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS2, Line 126: 'cb'
> Can you please state the context in which 'cb' is called from (e.g. reactor
Done


PS2, Line 133: aattempted
> typo
Done


PS2, Line 230: //
> ///
Done


PS2, Line 319: Retries
> Is there any way to write a be-test to exercise the retry path ?
Yep - see rpc-mgr-test.cc, RetryAsyncTest. That injects ERROR_SERVER_TOO_BUSY 
into an RPC response which triggers the retry logic.


PS2, Line 322: auto cb_wrapper = [params = std::move(params), mgr, func, 
req, resp,
 : cb = std::move(cb), controller_ptr = 
controller.release(), num_attempts]()
 : mutable {
> An alternative to this lambda implementation would be to define a separate 
It would, yeah. My preference is for using a lambda here partly because it's 
very clear about how the arguments are copied, and how ownership is managed (I 
find the copying behaviour of bind() a bit inscrutable).


PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr);
> Re-reading the comments above, status seems to indicate whether status was 
Right - I was in two minds about using different statuses, or merging together 
the one from the RpcController and the one that we must provide somehow. I 
think this is the simplest way to pass both statuses, but let me know if you 
have an idea! I added a comment for now.


Line 337:   kudu::MonoDelta retry_interval = 
kudu::MonoDelta::FromMilliseconds(params->retry_interval_ms);
> long line
Done


http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS2, Line 203:   auto cb = [self_ptr = 
weak_ptr(self_),
 :   instance_id = fragment_instance_id_, proto_batch = batch]
 :   (const Status& status, TransmitDataRequestPb* request,
 :   TransmitDataResponsePb* response, RpcController* 
controller) {
 : 
 : // Ensure that request and response get deleted when this 
callback returns.
 : auto request_container = 
unique_ptr(request);
 : auto response_container = 
unique_ptr(response);
 : 
 : // Check if this channel still exists.
 : auto channel = self_ptr.lock();
 : if (!channel) return;
 : {
 :   lock_guard l(channel->lock_);
 :   Status rpc_status = status.ok() ? 
FromKuduStatus(controller->status()) : status;
 : 
 :   int32_t status_code = response->status().status_code();
 :   channel->recvr_gone_ = status_code == 
TErrorCode::DATASTREAM_RECVR_ALREADY_GONE;
 : 
 :   if (!rpc_status.ok()) {
 : channel->last_rpc_status_ = rpc_status;
 :   } else if (!channel->recvr_gone_) {
 : if (status_code != TErrorCode::OK) {
 :   // Don't bubble up the 'receiver gone' status, because 
it's not an error.
 :   channel->last_rpc_status_ = Status(response->status());
 : } else {
 :   int size = proto_batch->GetSize();
 :   channel->num_data_bytes_sent_.Add(size);
 :   VLOG_ROW << "incremented #data_bytes_sent="
 :<< channel->num_data_bytes_sent_.Load();
 : }
 :   }
 :   channel->rpc_in_flight_ = false;
 : }
 : channel->rpc_done_cv_.notify_one();
 :   };
> I am no C++ expert so this question may be stupid: can we not write this as
We could write this as a method, and use bind(), or we could create a struct 
with one method (this one) that captures the context upon construction. The 
latter is what a lambda compiles down to, and I prefer the syntax sugar a 
lambda gives you. The former uses bind(), which I am not a great fan of. 

In my experience, gdb handles this just fine, and the stack is IMHO cleaner 
than using bind() (I've broken in this method lots of times!).


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 2
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson 
Gerrit-Reviewer: Henry Robinson 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-13 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 2:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr);
> Why do we pass Status::OK() for non-retryable error or after exceeding the 
Re-reading the comments above, status seems to indicate whether status was 
successfully attempted so it may be okay. The assumption is that cb will check 
for remote error from controller_ptr.


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 2
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson 
Gerrit-Reviewer: Henry Robinson 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-13 Thread Michael Ho (Code Review)
Michael Ho has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 2:

(11 comments)

http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS2, Line 126: 'cb'
Can you please state the context in which 'cb' is called from (e.g. reactor 
thread) and also add the precaution caller should take when implementing 'cb' 
(e.g. no blocking etc) ?


PS2, Line 133: aattempted
typo


PS2, Line 230: //
///

Same below.


PS2, Line 319: Retries
Is there any way to write a be-test to exercise the retry path ?


PS2, Line 322: auto cb_wrapper = [params = std::move(params), mgr, func, 
req, resp,
 : cb = std::move(cb), controller_ptr = 
controller.release(), num_attempts]()
 : mutable {
An alternative to this lambda implementation would be to define a separate 
function and uses boost::bind() to stash the arguments, right ?


PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr);
Why do we pass Status::OK() for non-retryable error or after exceeding the 
maximum number of retries ?


Line 337:   kudu::MonoDelta retry_interval = 
kudu::MonoDelta::FromMilliseconds(params->retry_interval_ms);
long line


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS1, Line 265: 10
Mind commenting above what 10 stands for ?


PS1, Line 266: numeric_limits::max()
Why is this not FLAGS_datastream_sender_timeout_ms ?


http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS2, Line 203:   auto cb = [self_ptr = 
weak_ptr(self_),
 :   instance_id = fragment_instance_id_, proto_batch = batch]
 :   (const Status& status, TransmitDataRequestPb* request,
 :   TransmitDataResponsePb* response, RpcController* 
controller) {
 : 
 : // Ensure that request and response get deleted when this 
callback returns.
 : auto request_container = 
unique_ptr(request);
 : auto response_container = 
unique_ptr(response);
 : 
 : // Check if this channel still exists.
 : auto channel = self_ptr.lock();
 : if (!channel) return;
 : {
 :   lock_guard l(channel->lock_);
 :   Status rpc_status = status.ok() ? 
FromKuduStatus(controller->status()) : status;
 : 
 :   int32_t status_code = response->status().status_code();
 :   channel->recvr_gone_ = status_code == 
TErrorCode::DATASTREAM_RECVR_ALREADY_GONE;
 : 
 :   if (!rpc_status.ok()) {
 : channel->last_rpc_status_ = rpc_status;
 :   } else if (!channel->recvr_gone_) {
 : if (status_code != TErrorCode::OK) {
 :   // Don't bubble up the 'receiver gone' status, because 
it's not an error.
 :   channel->last_rpc_status_ = Status(response->status());
 : } else {
 :   int size = proto_batch->GetSize();
 :   channel->num_data_bytes_sent_.Add(size);
 :   VLOG_ROW << "incremented #data_bytes_sent="
 :<< channel->num_data_bytes_sent_.Load();
 : }
 :   }
 :   channel->rpc_in_flight_ = false;
 : }
 : channel->rpc_done_cv_.notify_one();
 :   };
I am no C++ expert so this question may be stupid: can we not write this as a 
lambda function ? I am not sure how well gdb can handle lambda functions when 
compiled with optimization and this callback seems important enough that one 
may want to inspect its states in a core dump if necessary.


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/service/impala-internal-service.cc
File be/src/service/impala-internal-service.cc:

PS1, Line 63: DataStreamService
nit: Just wondering why we didn't put DataStreamService in 
data-stream-service.cc ?


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 2
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson 
Gerrit-Reviewer: Henry Robinson 
Gerrit-Reviewer: Michael Ho 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-12 Thread Henry Robinson (Code Review)
Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 1:

(10 comments)

http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-mgr.cc
File be/src/runtime/data-stream-mgr.cc:

Line 61:   bool unused = false;
> Doesn't the contract for FindRecvr() state that we need to hold 'lock_' bef
Done


Line 105:   EarlySendersList waiters;
> Add brief comment:
Done


PS1, Line 123: for (int32_t sender_id: waiters.closing_senders) 
recvr->RemoveSender(sender_id);
> According to the header comment in data-stream-mgr.h, a sender shouldn't be
Done


PS1, Line 300: early_senders_
> Assume the following case:
The sender fragment instance would fail, and then the coordinator should cancel 
the receiver. 

I believe there's an outstanding issue where, if the coordinator fails to 
cancel a fragment instance, the fragment instance will not fail itself. I'm 
going to file a JIRA for that, but it's unrelated to KRPC.


Line 321: // Wait for 10s
> Add a brief comment stating that this is to check if the DataStreamMgr is b
Done


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

PS1, Line 81: will do one of three things
> nit: would be nice to format them as bullet points.
Done


PS1, Line 83: if the buffer is full
> "if the batch queues are full"?
Done


PS1, Line 87: the sender
> "the sender along with its payload" ?
Done


Line 224:   /// has not yet prepared 'payload' is queued until it arrives, or 
is timed out. If the
> nit: been prepared,
Done


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/service/impala-server.h
File be/src/service/impala-server.h:

PS1, Line 255: void UpdateFilter
> Leave a TODO stating that this should move to query-state.h/cc after IMPALA
I'm going to leave that for now, since I don't want to make design decisions 
for IMPALA-3825 in this patch.


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson 
Gerrit-Reviewer: Henry Robinson 
Gerrit-Reviewer: Sailesh Mukil 
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-12 Thread Henry Robinson (Code Review)
Henry Robinson has uploaded a new patch set (#2).

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch ports the data-flow parts of ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting, and remains
  implemented in Thrift for now. The second, DataStreamService, handles
  large-payload RPCs for transmitting runtime filters and row batches
  between hosts.

* In the DataStreamService, all RPCs use 'native' protobuf. The
  DataStreamService starts on the port previously reserved for the
  StatestoreSubscriberService (which is also a KRPC service), to avoid
  having to configure another port when starting Impala. When the
  ImpalaInternalService is ported to KRPC, all services will run on one
  port.

* To support needing to address two different backend services, a data
  service port has been added to TBackendDescriptor.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter() and TransmitData() RPCs are sent asynchronously using
  KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous
  RPCs. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* Also include a fix for KUDU-2011 that properly allows sidecars to be
  shared between KRPC and the RPC caller (fixing IMPALA-5093, a
  corruption bug).

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without major logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are
  created (by removing CreateImpalaServer), and clarifying the
  relationship between ExecEnv and ImpalaServer. ImpalaServer now
  follows the standard construct->Init()->Start()->Join() lifecycle that
  we use for other services.

* Ensure that all addresses used for KRPCs are fully resolved, avoiding
  the need to resolve them for each RPC.

TESTING
---

* New tests added to rpc-mgr-test.

TO DO
-

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.PART1 DSS

Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
---
M .clang-format
M CMakeLists.txt
M be/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exprs/expr-test.cc
M be/src/kudu/rpc/rpc_sidecar.cc
M be/src/kudu/rpc/rpc_sidecar.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/TAcceptQueueServer.cpp
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc-mgr.inline.h
M be/src/rpc/rpc.h
D be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-state.cc
M 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-12 Thread Sailesh Mukil (Code Review)
Sailesh Mukil has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 1:

(12 comments)

> This patch passes core, exhaustive and ASAN tests. It can execute
 > 32 concurrent streams of TPCDS-Q17 @ scale factor 3 on a
 > 138-node cluster with Kerberos enabled. (I don't believe the
 > previous implementation could do this effectively because of the
 > number of Thrift connections required).
 > 
 > Some perf results from a 20-node cluster:
 > 
 > ++--+---++-++---++-+---+
 > | Workload   | Query| File Format   | Avg(s) | Base
 > Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients |
 > Iters |
 > ++--+---++-++---++-+---+
 > | TPCH(_300) | TPCH-Q3  | parquet / none / none | 32.55  | 28.18   
 >|   +15.51%  |   4.71%   |   1.17%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q13 | parquet / none / none | 24.43  | 22.21   
 >|   +9.99%   |   0.61%   |   0.70%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q8  | parquet / none / none | 7.53   | 7.05
 >|   +6.69%   |   1.70%   |   2.09%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q22 | parquet / none / none | 6.35   | 6.04
 >|   +5.19%   |   0.37%   |   0.76%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q14 | parquet / none / none | 4.28   | 4.10
 >|   +4.36%   |   0.03%   |   0.73%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q15 | parquet / none / none | 3.53   | 3.41
 >|   +3.69%   |   0.61%   |   1.42%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q16 | parquet / none / none | 6.09   | 5.87
 >|   +3.63%   |   0.15%   |   1.78%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q11 | parquet / none / none | 1.73   | 1.70
 >|   +2.22%   |   0.10%   |   0.95%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q21 | parquet / none / none | 105.84 | 103.71  
 >|   +2.06%   |   0.57%   |   0.44%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q9  | parquet / none / none | 30.76  | 30.46   
 >|   +1.00%   |   2.57%   |   1.22%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q1  | parquet / none / none | 22.14  | 21.94   
 >|   +0.91%   |   0.81%   |   0.86%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q4  | parquet / none / none | 5.09   | 5.05
 >|   +0.79%   |   0.48%   |   2.54%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q18 | parquet / none / none | 31.76  | 32.54   
 >|   -2.39%   |   0.44%   |   0.03%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q2  | parquet / none / none | 1.98   | 2.04
 >|   -2.74%   |   7.17%   |   7.41%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q5  | parquet / none / none | 47.62  | 48.98   
 >|   -2.79%   |   0.51%   |   0.16%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q20 | parquet / none / none | 3.18   | 3.27
 >|   -2.89%   |   1.34%   |   1.98%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q6  | parquet / none / none | 1.32   | 1.37
 >|   -3.72%   |   0.03%   |   4.00%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q10 | parquet / none / none | 9.00   | 9.48
 >|   -5.06%   |   0.16%   |   0.69%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q17 | parquet / none / none | 5.16   | 5.75
 >|   -10.18%  |   6.44%   |   2.63%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q12 | parquet / none / none | 3.01   | 3.39
 >|   -11.38%  |   2.43%   |   0.06%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q19 | parquet / none / none | 25.20  | 28.82   
 >| I -12.57%  |   0.01%   |   0.75%| 1   | 3
 > |
 > | TPCH(_300) | TPCH-Q7  | parquet / none / none | 45.32  | 61.16   
 >| I -25.91%  |   0.55%   |   2.22%| 1   | 3
 > |
 > ++--+---++-++---++-+---+
 > 
 > Primitives (note the significant regression in many_independent_fragments,
 > that needs further attention)
 > 
 > +-++---++-++---++-+---+
 > | Workload| Query  
 >| File Format   | Avg(s) | Base Avg(s) |
 > Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters |
 > 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-10 Thread Henry Robinson (Code Review)
Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
..


Patch Set 1:

This patch passes core, exhaustive and ASAN tests. It can execute 32 concurrent 
streams of TPCDS-Q17 @ scale factor 3 on a 138-node cluster with Kerberos 
enabled. (I don't believe the previous implementation could do this effectively 
because of the number of Thrift connections required). 

Some perf results from a 20-node cluster:

++--+---++-++---++-+---+
| Workload   | Query| File Format   | Avg(s) | Base Avg(s) | 
Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters |
++--+---++-++---++-+---+
| TPCH(_300) | TPCH-Q3  | parquet / none / none | 32.55  | 28.18   |   
+15.51%  |   4.71%   |   1.17%| 1   | 3 |
| TPCH(_300) | TPCH-Q13 | parquet / none / none | 24.43  | 22.21   |   
+9.99%   |   0.61%   |   0.70%| 1   | 3 |
| TPCH(_300) | TPCH-Q8  | parquet / none / none | 7.53   | 7.05|   
+6.69%   |   1.70%   |   2.09%| 1   | 3 |
| TPCH(_300) | TPCH-Q22 | parquet / none / none | 6.35   | 6.04|   
+5.19%   |   0.37%   |   0.76%| 1   | 3 |
| TPCH(_300) | TPCH-Q14 | parquet / none / none | 4.28   | 4.10|   
+4.36%   |   0.03%   |   0.73%| 1   | 3 |
| TPCH(_300) | TPCH-Q15 | parquet / none / none | 3.53   | 3.41|   
+3.69%   |   0.61%   |   1.42%| 1   | 3 |
| TPCH(_300) | TPCH-Q16 | parquet / none / none | 6.09   | 5.87|   
+3.63%   |   0.15%   |   1.78%| 1   | 3 |
| TPCH(_300) | TPCH-Q11 | parquet / none / none | 1.73   | 1.70|   
+2.22%   |   0.10%   |   0.95%| 1   | 3 |
| TPCH(_300) | TPCH-Q21 | parquet / none / none | 105.84 | 103.71  |   
+2.06%   |   0.57%   |   0.44%| 1   | 3 |
| TPCH(_300) | TPCH-Q9  | parquet / none / none | 30.76  | 30.46   |   
+1.00%   |   2.57%   |   1.22%| 1   | 3 |
| TPCH(_300) | TPCH-Q1  | parquet / none / none | 22.14  | 21.94   |   
+0.91%   |   0.81%   |   0.86%| 1   | 3 |
| TPCH(_300) | TPCH-Q4  | parquet / none / none | 5.09   | 5.05|   
+0.79%   |   0.48%   |   2.54%| 1   | 3 |
| TPCH(_300) | TPCH-Q18 | parquet / none / none | 31.76  | 32.54   |   
-2.39%   |   0.44%   |   0.03%| 1   | 3 |
| TPCH(_300) | TPCH-Q2  | parquet / none / none | 1.98   | 2.04|   
-2.74%   |   7.17%   |   7.41%| 1   | 3 |
| TPCH(_300) | TPCH-Q5  | parquet / none / none | 47.62  | 48.98   |   
-2.79%   |   0.51%   |   0.16%| 1   | 3 |
| TPCH(_300) | TPCH-Q20 | parquet / none / none | 3.18   | 3.27|   
-2.89%   |   1.34%   |   1.98%| 1   | 3 |
| TPCH(_300) | TPCH-Q6  | parquet / none / none | 1.32   | 1.37|   
-3.72%   |   0.03%   |   4.00%| 1   | 3 |
| TPCH(_300) | TPCH-Q10 | parquet / none / none | 9.00   | 9.48|   
-5.06%   |   0.16%   |   0.69%| 1   | 3 |
| TPCH(_300) | TPCH-Q17 | parquet / none / none | 5.16   | 5.75|   
-10.18%  |   6.44%   |   2.63%| 1   | 3 |
| TPCH(_300) | TPCH-Q12 | parquet / none / none | 3.01   | 3.39|   
-11.38%  |   2.43%   |   0.06%| 1   | 3 |
| TPCH(_300) | TPCH-Q19 | parquet / none / none | 25.20  | 28.82   | I 
-12.57%  |   0.01%   |   0.75%| 1   | 3 |
| TPCH(_300) | TPCH-Q7  | parquet / none / none | 45.32  | 61.16   | I 
-25.91%  |   0.55%   |   2.22%| 1   | 3 |
++--+---++-++---++-+---+

Primitives (note the significant regression in many_independent_fragments, that 
needs further attention)

+-++---++-++---++-+---+
| Workload| Query  
| File Format   | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base 
StdDev(%) | Num Clients | Iters |
+-++---++-++---++-+---+
| TARGETED-PERF(_300) | primitive_many_independent_fragments   
| parquet / none / none | 377.69 | 189.40  | R +99.42%  |   0.32%   |   
0.22%| 1   | 

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

2017-06-07 Thread Henry Robinson (Code Review)
Henry Robinson has uploaded a new change for review.

  http://gerrit.cloudera.org:8080/7103

Change subject: IMPALA-4856: Port data stream service to KRPC
..

IMPALA-4856: Port data stream service to KRPC

This patch ports the data-flow parts of ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting, and remains
  implemented in Thrift for now. The second, DataStreamService, handles
  large-payload RPCs for transmitting runtime filters and row batches
  between hosts.

* In the DataStreamService, all RPCs use 'native' protobuf. The
  DataStreamService starts on the port previously reserved for the
  StatestoreSubscriberService (which is also a KRPC service), to avoid
  having to configure another port when starting Impala. When the
  ImpalaInternalService is ported to KRPC, all services will run on one
  port.

* To support needing to address two different backend services, a data
  service port has been added to TBackendDescriptor.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter() and TransmitData() RPCs are sent asynchronously using
  KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous
  RPCs. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* Also include a fix for KUDU-2011 that properly allows sidecars to be
  shared between KRPC and the RPC caller (fixing IMPALA-5093, a
  corruption bug).

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without major logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are
  created (by removing CreateImpalaServer), and clarifying the
  relationship between ExecEnv and ImpalaServer. ImpalaServer now
  follows the standard construct->Init()->Start()->Join() lifecycle that
  we use for other services.

* Ensure that all addresses used for KRPCs are fully resolved, avoiding
  the need to resolve them for each RPC.

TESTING
---

* New tests added to rpc-mgr-test.

TO DO
-

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.PART1 DSS

Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
---
M .clang-format
M CMakeLists.txt
M be/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exprs/expr-test.cc
M be/src/kudu/rpc/rpc_sidecar.cc
M be/src/kudu/rpc/rpc_sidecar.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/TAcceptQueueServer.cpp
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc-mgr.inline.h
M be/src/rpc/rpc.h
D be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M