IMPALA-7006: Add KRPC folders from kudu@334ecafd cp -a ~/checkout/kudu/src/kudu/{rpc,util,security} be/src/kudu/
Change-Id: I232db2b4ccf5df9aca87b21dea31bfb2735d1ab7 Reviewed-on: http://gerrit.cloudera.org:8080/10757 Reviewed-by: Lars Volker <l...@cloudera.com> Tested-by: Lars Volker <l...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fcf190c4 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fcf190c4 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fcf190c4 Branch: refs/heads/master Commit: fcf190c4de1fcc291a5356634fd7cd12efa64852 Parents: 39870d4 Author: Lars Volker <l...@cloudera.com> Authored: Tue Jul 3 15:10:52 2018 -0700 Committer: Lars Volker <l...@cloudera.com> Committed: Thu Jul 12 21:35:42 2018 +0000 ---------------------------------------------------------------------- be/src/kudu/rpc/CMakeLists.txt | 138 + be/src/kudu/rpc/acceptor_pool.cc | 175 ++ be/src/kudu/rpc/acceptor_pool.h | 84 + be/src/kudu/rpc/blocking_ops.cc | 126 + be/src/kudu/rpc/blocking_ops.h | 58 + be/src/kudu/rpc/client_negotiation.cc | 853 ++++++ be/src/kudu/rpc/client_negotiation.h | 263 ++ be/src/kudu/rpc/connection.cc | 767 ++++++ be/src/kudu/rpc/connection.h | 391 +++ be/src/kudu/rpc/connection_id.cc | 85 + be/src/kudu/rpc/connection_id.h | 84 + be/src/kudu/rpc/constants.cc | 37 + be/src/kudu/rpc/constants.h | 60 + be/src/kudu/rpc/exactly_once_rpc-test.cc | 629 +++++ be/src/kudu/rpc/inbound_call.cc | 345 +++ be/src/kudu/rpc/inbound_call.h | 286 ++ be/src/kudu/rpc/messenger.cc | 502 ++++ be/src/kudu/rpc/messenger.h | 460 ++++ be/src/kudu/rpc/mt-rpc-test.cc | 318 +++ be/src/kudu/rpc/negotiation-test.cc | 1346 ++++++++++ be/src/kudu/rpc/negotiation.cc | 327 +++ be/src/kudu/rpc/negotiation.h | 58 + be/src/kudu/rpc/outbound_call.cc | 531 ++++ be/src/kudu/rpc/outbound_call.h | 348 +++ be/src/kudu/rpc/periodic-test.cc | 295 +++ be/src/kudu/rpc/periodic.cc | 219 ++ be/src/kudu/rpc/periodic.h | 215 ++ be/src/kudu/rpc/protoc-gen-krpc.cc | 691 +++++ be/src/kudu/rpc/proxy.cc | 116 + be/src/kudu/rpc/proxy.h | 126 + be/src/kudu/rpc/reactor-test.cc | 112 + be/src/kudu/rpc/reactor.cc | 918 +++++++ be/src/kudu/rpc/reactor.h | 427 +++ be/src/kudu/rpc/remote_method.cc | 53 + be/src/kudu/rpc/remote_method.h | 51 + be/src/kudu/rpc/remote_user.cc | 40 + be/src/kudu/rpc/remote_user.h | 99 + be/src/kudu/rpc/request_tracker-test.cc | 86 + be/src/kudu/rpc/request_tracker.cc | 55 + be/src/kudu/rpc/request_tracker.h | 87 + be/src/kudu/rpc/response_callback.h | 31 + be/src/kudu/rpc/result_tracker.cc | 595 +++++ be/src/kudu/rpc/result_tracker.h | 401 +++ be/src/kudu/rpc/retriable_rpc.h | 296 +++ be/src/kudu/rpc/rpc-bench.cc | 298 +++ be/src/kudu/rpc/rpc-test-base.h | 661 +++++ be/src/kudu/rpc/rpc-test.cc | 1364 ++++++++++ be/src/kudu/rpc/rpc.cc | 101 + be/src/kudu/rpc/rpc.h | 221 ++ be/src/kudu/rpc/rpc_context.cc | 217 ++ be/src/kudu/rpc/rpc_context.h | 245 ++ be/src/kudu/rpc/rpc_controller.cc | 177 ++ be/src/kudu/rpc/rpc_controller.h | 282 ++ be/src/kudu/rpc/rpc_header.proto | 365 +++ be/src/kudu/rpc/rpc_introspection.proto | 110 + be/src/kudu/rpc/rpc_service.h | 47 + be/src/kudu/rpc/rpc_sidecar.cc | 115 + be/src/kudu/rpc/rpc_sidecar.h | 73 + be/src/kudu/rpc/rpc_stub-test.cc | 726 ++++++ be/src/kudu/rpc/rpcz_store.cc | 272 ++ be/src/kudu/rpc/rpcz_store.h | 74 + be/src/kudu/rpc/rtest.proto | 160 ++ be/src/kudu/rpc/rtest_diff_package.proto | 26 + be/src/kudu/rpc/sasl_common.cc | 470 ++++ be/src/kudu/rpc/sasl_common.h | 158 ++ be/src/kudu/rpc/sasl_helper.cc | 134 + be/src/kudu/rpc/sasl_helper.h | 109 + be/src/kudu/rpc/serialization.cc | 223 ++ be/src/kudu/rpc/serialization.h | 88 + be/src/kudu/rpc/server_negotiation.cc | 989 +++++++ be/src/kudu/rpc/server_negotiation.h | 259 ++ be/src/kudu/rpc/service_if.cc | 160 ++ be/src/kudu/rpc/service_if.h | 134 + be/src/kudu/rpc/service_pool.cc | 234 ++ be/src/kudu/rpc/service_pool.h | 117 + be/src/kudu/rpc/service_queue-test.cc | 151 ++ be/src/kudu/rpc/service_queue.cc | 145 ++ be/src/kudu/rpc/service_queue.h | 225 ++ be/src/kudu/rpc/transfer.cc | 283 ++ be/src/kudu/rpc/transfer.h | 212 ++ be/src/kudu/rpc/user_credentials.cc | 64 + be/src/kudu/rpc/user_credentials.h | 53 + be/src/kudu/security/CMakeLists.txt | 141 + be/src/kudu/security/ca/cert_management-test.cc | 294 +++ be/src/kudu/security/ca/cert_management.cc | 423 +++ be/src/kudu/security/ca/cert_management.h | 226 ++ be/src/kudu/security/cert-test.cc | 165 ++ be/src/kudu/security/cert.cc | 301 +++ be/src/kudu/security/cert.h | 119 + be/src/kudu/security/crypto-test.cc | 257 ++ be/src/kudu/security/crypto.cc | 276 ++ be/src/kudu/security/crypto.h | 103 + be/src/kudu/security/init.cc | 465 ++++ be/src/kudu/security/init.h | 84 + be/src/kudu/security/kerberos_util.cc | 37 + be/src/kudu/security/kerberos_util.h | 29 + be/src/kudu/security/krb5_realm_override.cc | 105 + be/src/kudu/security/openssl_util.cc | 322 +++ be/src/kudu/security/openssl_util.h | 217 ++ be/src/kudu/security/openssl_util_bio.h | 129 + be/src/kudu/security/security-test-util.cc | 103 + be/src/kudu/security/security-test-util.h | 56 + be/src/kudu/security/security_flags.cc | 42 + be/src/kudu/security/security_flags.h | 36 + be/src/kudu/security/simple_acl.cc | 89 + be/src/kudu/security/simple_acl.h | 58 + be/src/kudu/security/test/mini_kdc-test.cc | 144 ++ be/src/kudu/security/test/mini_kdc.cc | 315 +++ be/src/kudu/security/test/mini_kdc.h | 134 + be/src/kudu/security/test/test_certs.cc | 969 +++++++ be/src/kudu/security/test/test_certs.h | 86 + be/src/kudu/security/test/test_pass.cc | 40 + be/src/kudu/security/test/test_pass.h | 37 + be/src/kudu/security/tls_context.cc | 520 ++++ be/src/kudu/security/tls_context.h | 202 ++ be/src/kudu/security/tls_handshake-test.cc | 390 +++ be/src/kudu/security/tls_handshake.cc | 274 ++ be/src/kudu/security/tls_handshake.h | 171 ++ be/src/kudu/security/tls_socket-test.cc | 366 +++ be/src/kudu/security/tls_socket.cc | 185 ++ be/src/kudu/security/tls_socket.h | 60 + be/src/kudu/security/token-test.cc | 677 +++++ be/src/kudu/security/token.proto | 97 + be/src/kudu/security/token_signer.cc | 299 +++ be/src/kudu/security/token_signer.h | 316 +++ be/src/kudu/security/token_signing_key.cc | 110 + be/src/kudu/security/token_signing_key.h | 103 + be/src/kudu/security/token_verifier.cc | 173 ++ be/src/kudu/security/token_verifier.h | 126 + be/src/kudu/security/x509_check_host.cc | 439 ++++ be/src/kudu/security/x509_check_host.h | 50 + be/src/kudu/util/CMakeLists.txt | 482 ++++ be/src/kudu/util/alignment.h | 28 + be/src/kudu/util/array_view.h | 133 + be/src/kudu/util/async_logger.cc | 151 ++ be/src/kudu/util/async_logger.h | 206 ++ be/src/kudu/util/async_util-test.cc | 129 + be/src/kudu/util/async_util.h | 99 + be/src/kudu/util/atomic-test.cc | 135 + be/src/kudu/util/atomic.cc | 56 + be/src/kudu/util/atomic.h | 322 +++ be/src/kudu/util/auto_release_pool.h | 99 + be/src/kudu/util/barrier.h | 68 + be/src/kudu/util/bit-stream-utils.h | 150 ++ be/src/kudu/util/bit-stream-utils.inline.h | 211 ++ be/src/kudu/util/bit-util-test.cc | 45 + be/src/kudu/util/bit-util.h | 57 + be/src/kudu/util/bitmap-test.cc | 230 ++ be/src/kudu/util/bitmap.cc | 136 + be/src/kudu/util/bitmap.h | 219 ++ be/src/kudu/util/blocking_queue-test.cc | 249 ++ be/src/kudu/util/blocking_queue.h | 256 ++ be/src/kudu/util/bloom_filter-test.cc | 92 + be/src/kudu/util/bloom_filter.cc | 89 + be/src/kudu/util/bloom_filter.h | 254 ++ be/src/kudu/util/boost_mutex_utils.h | 45 + be/src/kudu/util/cache-bench.cc | 191 ++ be/src/kudu/util/cache-test.cc | 246 ++ be/src/kudu/util/cache.cc | 572 ++++ be/src/kudu/util/cache.h | 216 ++ be/src/kudu/util/cache_metrics.cc | 69 + be/src/kudu/util/cache_metrics.h | 42 + be/src/kudu/util/callback_bind-test.cc | 119 + be/src/kudu/util/coding-inl.h | 120 + be/src/kudu/util/coding.cc | 142 + be/src/kudu/util/coding.h | 113 + .../kudu/util/compression/compression-test.cc | 90 + be/src/kudu/util/compression/compression.proto | 29 + .../kudu/util/compression/compression_codec.cc | 286 ++ .../kudu/util/compression/compression_codec.h | 78 + be/src/kudu/util/condition_variable.cc | 142 + be/src/kudu/util/condition_variable.h | 118 + be/src/kudu/util/countdown_latch-test.cc | 74 + be/src/kudu/util/countdown_latch.h | 137 + be/src/kudu/util/cow_object.cc | 34 + be/src/kudu/util/cow_object.h | 437 ++++ be/src/kudu/util/crc-test.cc | 112 + be/src/kudu/util/crc.cc | 56 + be/src/kudu/util/crc.h | 43 + be/src/kudu/util/curl_util.cc | 130 + be/src/kudu/util/curl_util.h | 92 + be/src/kudu/util/debug-util-test.cc | 458 ++++ be/src/kudu/util/debug-util.cc | 800 ++++++ be/src/kudu/util/debug-util.h | 321 +++ be/src/kudu/util/debug/leak_annotations.h | 84 + be/src/kudu/util/debug/leakcheck_disabler.h | 48 + be/src/kudu/util/debug/sanitizer_scopes.h | 47 + be/src/kudu/util/debug/trace_event.h | 1501 +++++++++++ be/src/kudu/util/debug/trace_event_impl.cc | 2436 ++++++++++++++++++ be/src/kudu/util/debug/trace_event_impl.h | 726 ++++++ .../util/debug/trace_event_impl_constants.cc | 14 + be/src/kudu/util/debug/trace_event_memory.h | 28 + .../util/debug/trace_event_synthetic_delay.cc | 238 ++ .../util/debug/trace_event_synthetic_delay.h | 166 ++ be/src/kudu/util/debug/trace_logging.h | 132 + be/src/kudu/util/debug/unwind_safeness.cc | 164 ++ be/src/kudu/util/debug/unwind_safeness.h | 29 + be/src/kudu/util/debug_ref_counted.h | 56 + be/src/kudu/util/decimal_util-test.cc | 81 + be/src/kudu/util/decimal_util.cc | 89 + be/src/kudu/util/decimal_util.h | 69 + be/src/kudu/util/easy_json-test.cc | 106 + be/src/kudu/util/easy_json.cc | 212 ++ be/src/kudu/util/easy_json.h | 190 ++ be/src/kudu/util/env-test.cc | 1173 +++++++++ be/src/kudu/util/env.cc | 93 + be/src/kudu/util/env.h | 681 +++++ be/src/kudu/util/env_posix.cc | 1852 +++++++++++++ be/src/kudu/util/env_util-test.cc | 192 ++ be/src/kudu/util/env_util.cc | 320 +++ be/src/kudu/util/env_util.h | 112 + be/src/kudu/util/errno-test.cc | 50 + be/src/kudu/util/errno.cc | 52 + be/src/kudu/util/errno.h | 36 + be/src/kudu/util/faststring-test.cc | 65 + be/src/kudu/util/faststring.cc | 72 + be/src/kudu/util/faststring.h | 259 ++ be/src/kudu/util/fault_injection.cc | 78 + be/src/kudu/util/fault_injection.h | 98 + be/src/kudu/util/file_cache-stress-test.cc | 402 +++ be/src/kudu/util/file_cache-test-util.h | 92 + be/src/kudu/util/file_cache-test.cc | 361 +++ be/src/kudu/util/file_cache.cc | 654 +++++ be/src/kudu/util/file_cache.h | 209 ++ be/src/kudu/util/flag_tags-test.cc | 135 + be/src/kudu/util/flag_tags.cc | 91 + be/src/kudu/util/flag_tags.h | 169 ++ be/src/kudu/util/flag_validators-test.cc | 252 ++ be/src/kudu/util/flag_validators.cc | 67 + be/src/kudu/util/flag_validators.h | 102 + be/src/kudu/util/flags-test.cc | 109 + be/src/kudu/util/flags.cc | 604 +++++ be/src/kudu/util/flags.h | 89 + be/src/kudu/util/group_varint-inl.h | 294 +++ be/src/kudu/util/group_varint-test.cc | 144 ++ be/src/kudu/util/group_varint.cc | 81 + be/src/kudu/util/hash_util-test.cc | 42 + be/src/kudu/util/hash_util.h | 71 + be/src/kudu/util/hdr_histogram-test.cc | 116 + be/src/kudu/util/hdr_histogram.cc | 501 ++++ be/src/kudu/util/hdr_histogram.h | 351 +++ be/src/kudu/util/hexdump.cc | 85 + be/src/kudu/util/hexdump.h | 34 + be/src/kudu/util/high_water_mark.h | 85 + be/src/kudu/util/histogram.proto | 48 + be/src/kudu/util/init.cc | 89 + be/src/kudu/util/init.h | 33 + be/src/kudu/util/inline_slice-test.cc | 88 + be/src/kudu/util/inline_slice.h | 181 ++ be/src/kudu/util/int128-test.cc | 69 + be/src/kudu/util/int128.h | 46 + be/src/kudu/util/int128_util.h | 39 + be/src/kudu/util/interval_tree-inl.h | 444 ++++ be/src/kudu/util/interval_tree-test.cc | 353 +++ be/src/kudu/util/interval_tree.h | 158 ++ be/src/kudu/util/jsonreader-test.cc | 193 ++ be/src/kudu/util/jsonreader.cc | 141 + be/src/kudu/util/jsonreader.h | 92 + be/src/kudu/util/jsonwriter-test.cc | 216 ++ be/src/kudu/util/jsonwriter.cc | 352 +++ be/src/kudu/util/jsonwriter.h | 102 + be/src/kudu/util/jsonwriter_test.proto | 79 + be/src/kudu/util/kernel_stack_watchdog.cc | 256 ++ be/src/kudu/util/kernel_stack_watchdog.h | 290 +++ be/src/kudu/util/knapsack_solver-test.cc | 172 ++ be/src/kudu/util/knapsack_solver.h | 269 ++ be/src/kudu/util/locks.cc | 47 + be/src/kudu/util/locks.h | 294 +++ be/src/kudu/util/logging-test.cc | 249 ++ be/src/kudu/util/logging.cc | 413 +++ be/src/kudu/util/logging.h | 367 +++ be/src/kudu/util/logging_callback.h | 46 + be/src/kudu/util/logging_test_util.h | 60 + be/src/kudu/util/maintenance_manager-test.cc | 369 +++ be/src/kudu/util/maintenance_manager.cc | 550 ++++ be/src/kudu/util/maintenance_manager.h | 361 +++ be/src/kudu/util/maintenance_manager.proto | 54 + be/src/kudu/util/make_shared.h | 64 + be/src/kudu/util/malloc.cc | 35 + be/src/kudu/util/malloc.h | 32 + be/src/kudu/util/map-util-test.cc | 116 + be/src/kudu/util/mem_tracker-test.cc | 285 ++ be/src/kudu/util/mem_tracker.cc | 296 +++ be/src/kudu/util/mem_tracker.h | 272 ++ be/src/kudu/util/memcmpable_varint-test.cc | 220 ++ be/src/kudu/util/memcmpable_varint.cc | 257 ++ be/src/kudu/util/memcmpable_varint.h | 45 + be/src/kudu/util/memory/arena-test.cc | 205 ++ be/src/kudu/util/memory/arena.cc | 167 ++ be/src/kudu/util/memory/arena.h | 501 ++++ be/src/kudu/util/memory/memory.cc | 339 +++ be/src/kudu/util/memory/memory.h | 970 +++++++ be/src/kudu/util/memory/overwrite.cc | 42 + be/src/kudu/util/memory/overwrite.h | 33 + be/src/kudu/util/metrics-test.cc | 388 +++ be/src/kudu/util/metrics.cc | 746 ++++++ be/src/kudu/util/metrics.h | 1195 +++++++++ be/src/kudu/util/minidump-test.cc | 149 ++ be/src/kudu/util/minidump.cc | 382 +++ be/src/kudu/util/minidump.h | 104 + be/src/kudu/util/monotime-test.cc | 424 +++ be/src/kudu/util/monotime.cc | 334 +++ be/src/kudu/util/monotime.h | 421 +++ be/src/kudu/util/mt-hdr_histogram-test.cc | 116 + be/src/kudu/util/mt-metrics-test.cc | 128 + be/src/kudu/util/mt-threadlocal-test.cc | 357 +++ be/src/kudu/util/mutex.cc | 164 ++ be/src/kudu/util/mutex.h | 142 + be/src/kudu/util/net/dns_resolver-test.cc | 59 + be/src/kudu/util/net/dns_resolver.cc | 65 + be/src/kudu/util/net/dns_resolver.h | 62 + be/src/kudu/util/net/net_util-test.cc | 170 ++ be/src/kudu/util/net/net_util.cc | 402 +++ be/src/kudu/util/net/net_util.h | 166 ++ be/src/kudu/util/net/sockaddr.cc | 136 + be/src/kudu/util/net/sockaddr.h | 94 + be/src/kudu/util/net/socket-test.cc | 89 + be/src/kudu/util/net/socket.cc | 590 +++++ be/src/kudu/util/net/socket.h | 178 ++ be/src/kudu/util/nvm_cache.cc | 577 +++++ be/src/kudu/util/nvm_cache.h | 31 + be/src/kudu/util/object_pool-test.cc | 86 + be/src/kudu/util/object_pool.h | 166 ++ be/src/kudu/util/oid_generator-test.cc | 52 + be/src/kudu/util/oid_generator.cc | 65 + be/src/kudu/util/oid_generator.h | 63 + be/src/kudu/util/once-test.cc | 113 + be/src/kudu/util/once.cc | 32 + be/src/kudu/util/once.h | 116 + be/src/kudu/util/os-util-test.cc | 62 + be/src/kudu/util/os-util.cc | 185 ++ be/src/kudu/util/os-util.h | 72 + be/src/kudu/util/path_util-test.cc | 77 + be/src/kudu/util/path_util.cc | 122 + be/src/kudu/util/path_util.h | 63 + be/src/kudu/util/pb_util-internal.cc | 105 + be/src/kudu/util/pb_util-internal.h | 136 + be/src/kudu/util/pb_util-test.cc | 661 +++++ be/src/kudu/util/pb_util.cc | 1088 ++++++++ be/src/kudu/util/pb_util.h | 513 ++++ be/src/kudu/util/pb_util.proto | 45 + be/src/kudu/util/pb_util_test.proto | 29 + be/src/kudu/util/process_memory-test.cc | 75 + be/src/kudu/util/process_memory.cc | 287 +++ be/src/kudu/util/process_memory.h | 62 + be/src/kudu/util/promise.h | 79 + be/src/kudu/util/proto_container_test.proto | 25 + be/src/kudu/util/proto_container_test2.proto | 29 + be/src/kudu/util/proto_container_test3.proto | 33 + be/src/kudu/util/protobuf-annotations.h | 33 + be/src/kudu/util/protobuf_util.h | 39 + be/src/kudu/util/protoc-gen-insertions.cc | 77 + be/src/kudu/util/pstack_watcher-test.cc | 100 + be/src/kudu/util/pstack_watcher.cc | 249 ++ be/src/kudu/util/pstack_watcher.h | 101 + be/src/kudu/util/random-test.cc | 171 ++ be/src/kudu/util/random.h | 252 ++ be/src/kudu/util/random_util-test.cc | 75 + be/src/kudu/util/random_util.cc | 65 + be/src/kudu/util/random_util.h | 44 + be/src/kudu/util/rle-encoding.h | 523 ++++ be/src/kudu/util/rle-test.cc | 546 ++++ be/src/kudu/util/rolling_log-test.cc | 147 ++ be/src/kudu/util/rolling_log.cc | 285 ++ be/src/kudu/util/rolling_log.h | 128 + be/src/kudu/util/rw_mutex-test.cc | 185 ++ be/src/kudu/util/rw_mutex.cc | 207 ++ be/src/kudu/util/rw_mutex.h | 123 + be/src/kudu/util/rw_semaphore-test.cc | 94 + be/src/kudu/util/rw_semaphore.h | 206 ++ be/src/kudu/util/rwc_lock-test.cc | 147 ++ be/src/kudu/util/rwc_lock.cc | 136 + be/src/kudu/util/rwc_lock.h | 142 + be/src/kudu/util/safe_math-test.cc | 56 + be/src/kudu/util/safe_math.h | 69 + be/src/kudu/util/scoped_cleanup-test.cc | 56 + be/src/kudu/util/scoped_cleanup.h | 67 + be/src/kudu/util/semaphore.cc | 105 + be/src/kudu/util/semaphore.h | 77 + be/src/kudu/util/semaphore_macosx.cc | 75 + be/src/kudu/util/signal.cc | 47 + be/src/kudu/util/signal.h | 42 + be/src/kudu/util/slice-test.cc | 61 + be/src/kudu/util/slice.cc | 97 + be/src/kudu/util/slice.h | 332 +++ .../util/sorted_disjoint_interval_list-test.cc | 98 + .../kudu/util/sorted_disjoint_interval_list.h | 95 + be/src/kudu/util/spinlock_profiling-test.cc | 81 + be/src/kudu/util/spinlock_profiling.cc | 308 +++ be/src/kudu/util/spinlock_profiling.h | 72 + be/src/kudu/util/stack_watchdog-test.cc | 152 ++ be/src/kudu/util/status-test.cc | 119 + be/src/kudu/util/status.cc | 170 ++ be/src/kudu/util/status.h | 493 ++++ be/src/kudu/util/status_callback.cc | 41 + be/src/kudu/util/status_callback.h | 54 + be/src/kudu/util/stopwatch.h | 364 +++ be/src/kudu/util/string_case-test.cc | 65 + be/src/kudu/util/string_case.cc | 76 + be/src/kudu/util/string_case.h | 48 + be/src/kudu/util/striped64-test.cc | 163 ++ be/src/kudu/util/striped64.cc | 191 ++ be/src/kudu/util/striped64.h | 168 ++ be/src/kudu/util/subprocess-test.cc | 381 +++ be/src/kudu/util/subprocess.cc | 815 ++++++ be/src/kudu/util/subprocess.h | 219 ++ be/src/kudu/util/test_graph.cc | 121 + be/src/kudu/util/test_graph.h | 90 + be/src/kudu/util/test_macros.h | 123 + be/src/kudu/util/test_main.cc | 109 + be/src/kudu/util/test_util.cc | 446 ++++ be/src/kudu/util/test_util.h | 146 ++ be/src/kudu/util/test_util_prod.cc | 28 + be/src/kudu/util/test_util_prod.h | 32 + be/src/kudu/util/thread-test.cc | 160 ++ be/src/kudu/util/thread.cc | 628 +++++ be/src/kudu/util/thread.h | 373 +++ be/src/kudu/util/thread_restrictions.cc | 87 + be/src/kudu/util/thread_restrictions.h | 121 + be/src/kudu/util/threadlocal.cc | 89 + be/src/kudu/util/threadlocal.h | 128 + be/src/kudu/util/threadlocal_cache.h | 110 + be/src/kudu/util/threadpool-test.cc | 941 +++++++ be/src/kudu/util/threadpool.cc | 766 ++++++ be/src/kudu/util/threadpool.h | 505 ++++ be/src/kudu/util/throttler-test.cc | 76 + be/src/kudu/util/throttler.cc | 67 + be/src/kudu/util/throttler.h | 62 + be/src/kudu/util/trace-test.cc | 891 +++++++ be/src/kudu/util/trace.cc | 259 ++ be/src/kudu/util/trace.h | 292 +++ be/src/kudu/util/trace_metrics.cc | 74 + be/src/kudu/util/trace_metrics.h | 89 + be/src/kudu/util/url-coding-test.cc | 112 + be/src/kudu/util/url-coding.cc | 208 ++ be/src/kudu/util/url-coding.h | 69 + be/src/kudu/util/user-test.cc | 44 + be/src/kudu/util/user.cc | 90 + be/src/kudu/util/user.h | 32 + be/src/kudu/util/version_info.cc | 84 + be/src/kudu/util/version_info.h | 51 + be/src/kudu/util/version_info.proto | 32 + be/src/kudu/util/version_util-test.cc | 66 + be/src/kudu/util/version_util.cc | 83 + be/src/kudu/util/version_util.h | 58 + be/src/kudu/util/web_callback_registry.h | 129 + be/src/kudu/util/website_util.cc | 43 + be/src/kudu/util/website_util.h | 35 + be/src/kudu/util/zlib.cc | 127 + be/src/kudu/util/zlib.h | 39 + 450 files changed, 99139 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/CMakeLists.txt b/be/src/kudu/rpc/CMakeLists.txt new file mode 100644 index 0000000..f8cdb02 --- /dev/null +++ b/be/src/kudu/rpc/CMakeLists.txt @@ -0,0 +1,138 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#### Global header protobufs +PROTOBUF_GENERATE_CPP( + RPC_HEADER_PROTO_SRCS RPC_HEADER_PROTO_HDRS RPC_HEADER_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES rpc_header.proto) +ADD_EXPORTABLE_LIBRARY(rpc_header_proto + SRCS ${RPC_HEADER_PROTO_SRCS} + DEPS protobuf pb_util_proto token_proto + NONLINK_DEPS ${RPC_HEADER_PROTO_TGTS}) + +PROTOBUF_GENERATE_CPP( + RPC_INTROSPECTION_PROTO_SRCS RPC_INTROSPECTION_PROTO_HDRS RPC_INTROSPECTION_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES rpc_introspection.proto) +set(RPC_INTROSPECTION_PROTO_LIBS + rpc_header_proto + protobuf) +ADD_EXPORTABLE_LIBRARY(rpc_introspection_proto + SRCS ${RPC_INTROSPECTION_PROTO_SRCS} + DEPS ${RPC_INTROSPECTION_PROTO_LIBS} + NONLINK_DEPS ${RPC_INTROSPECTION_PROTO_TGTS}) + +### RPC library +set(KRPC_SRCS + acceptor_pool.cc + blocking_ops.cc + client_negotiation.cc + connection.cc + connection_id.cc + constants.cc + inbound_call.cc + messenger.cc + negotiation.cc + outbound_call.cc + periodic.cc + proxy.cc + reactor.cc + remote_method.cc + remote_user.cc + request_tracker.cc + result_tracker.cc + rpc.cc + rpc_context.cc + rpc_controller.cc + rpc_sidecar.cc + rpcz_store.cc + sasl_common.cc + sasl_helper.cc + serialization.cc + server_negotiation.cc + service_if.cc + service_pool.cc + service_queue.cc + user_credentials.cc + transfer.cc +) + +set(KRPC_LIBS + cyrus_sasl + gssapi_krb5 + gutil + kudu_util + libev + rpc_header_proto + rpc_introspection_proto + security) + +ADD_EXPORTABLE_LIBRARY(krpc + SRCS ${KRPC_SRCS} + DEPS ${KRPC_LIBS}) + +### RPC generator tool +add_executable(protoc-gen-krpc protoc-gen-krpc.cc) +target_link_libraries(protoc-gen-krpc + ${KUDU_BASE_LIBS} + rpc_header_proto + protoc + protobuf + gutil + kudu_util) + +#### RPC test +PROTOBUF_GENERATE_CPP( + RPC_TEST_DIFF_PACKAGE_SRCS RPC_TEST_DIFF_PACKAGE_HDRS RPC_TEST_DIFF_PACKAGE_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES rtest_diff_package.proto) +add_library(rtest_diff_package_proto ${RPC_TEST_DIFF_PACKAGE_SRCS} ${RPC_TEST_DIFF_PACKAGE_HDRS}) +target_link_libraries(rtest_diff_package_proto rpc_header_proto) + +KRPC_GENERATE( + RTEST_KRPC_SRCS RTEST_KRPC_HDRS RTEST_KRPC_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES rtest.proto) +add_library(rtest_krpc ${RTEST_KRPC_SRCS} ${RTEST_KRPC_HDRS}) +target_link_libraries(rtest_krpc + krpc + rpc_header_proto + rtest_diff_package_proto) + +# Tests +set(KUDU_TEST_LINK_LIBS + krpc + mini_kdc + rpc_header_proto + rtest_krpc + security_test_util + ${KUDU_MIN_TEST_LIBS}) +ADD_KUDU_TEST(exactly_once_rpc-test PROCESSORS 10) +ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true) +ADD_KUDU_TEST(negotiation-test) +ADD_KUDU_TEST(periodic-test) +ADD_KUDU_TEST(reactor-test) +ADD_KUDU_TEST(request_tracker-test) +ADD_KUDU_TEST(rpc-bench RUN_SERIAL true) +ADD_KUDU_TEST(rpc-test) +ADD_KUDU_TEST(rpc_stub-test) +ADD_KUDU_TEST(service_queue-test RUN_SERIAL true) http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/acceptor_pool.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/acceptor_pool.cc b/be/src/kudu/rpc/acceptor_pool.cc new file mode 100644 index 0000000..e4bcbd1 --- /dev/null +++ b/be/src/kudu/rpc/acceptor_pool.cc @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/acceptor_pool.h" + +#include <string> +#include <ostream> +#include <vector> + +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/basictypes.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/messenger.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/logging.h" +#include "kudu/util/metrics.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/status.h" +#include "kudu/util/thread.h" + +namespace google { +namespace protobuf { + +class Message; + +} +} + +using google::protobuf::Message; +using std::string; + +METRIC_DEFINE_counter(server, rpc_connections_accepted, + "RPC Connections Accepted", + kudu::MetricUnit::kConnections, + "Number of incoming TCP connections made to the RPC server"); + +DEFINE_int32(rpc_acceptor_listen_backlog, 128, + "Socket backlog parameter used when listening for RPC connections. " + "This defines the maximum length to which the queue of pending " + "TCP connections inbound to the RPC server may grow. If a connection " + "request arrives when the queue is full, the client may receive " + "an error. Higher values may help the server ride over bursts of " + "new inbound connection requests."); +TAG_FLAG(rpc_acceptor_listen_backlog, advanced); + +namespace kudu { +namespace rpc { + +AcceptorPool::AcceptorPool(Messenger* messenger, Socket* socket, + Sockaddr bind_address) + : messenger_(messenger), + socket_(socket->Release()), + bind_address_(bind_address), + rpc_connections_accepted_(METRIC_rpc_connections_accepted.Instantiate( + messenger->metric_entity())), + closing_(false) {} + +AcceptorPool::~AcceptorPool() { + Shutdown(); +} + +Status AcceptorPool::Start(int num_threads) { + RETURN_NOT_OK(socket_.Listen(FLAGS_rpc_acceptor_listen_backlog)); + + for (int i = 0; i < num_threads; i++) { + scoped_refptr<kudu::Thread> new_thread; + Status s = kudu::Thread::Create("acceptor pool", "acceptor", + &AcceptorPool::RunThread, this, &new_thread); + if (!s.ok()) { + Shutdown(); + return s; + } + threads_.push_back(new_thread); + } + return Status::OK(); +} + +void AcceptorPool::Shutdown() { + if (Acquire_CompareAndSwap(&closing_, false, true) != false) { + VLOG(2) << "Acceptor Pool on " << bind_address_.ToString() + << " already shut down"; + return; + } + +#if defined(__linux__) + // Closing the socket will break us out of accept() if we're in it, and + // prevent future accepts. + WARN_NOT_OK(socket_.Shutdown(true, true), + strings::Substitute("Could not shut down acceptor socket on $0", + bind_address_.ToString())); +#else + // Calling shutdown on an accepting (non-connected) socket is illegal on most + // platforms (but not Linux). Instead, the accepting threads are interrupted + // forcefully. + for (const scoped_refptr<kudu::Thread>& thread : threads_) { + pthread_cancel(thread.get()->pthread_id()); + } +#endif + + for (const scoped_refptr<kudu::Thread>& thread : threads_) { + CHECK_OK(ThreadJoiner(thread.get()).Join()); + } + threads_.clear(); + + // Close the socket: keeping the descriptor open and, possibly, receiving late + // not-to-be-read messages from the peer does not make much sense. The + // Socket::Close() method is called upon destruction of the aggregated socket_ + // object as well. However, the typical ownership pattern of an AcceptorPool + // object includes two references wrapped via a shared_ptr smart pointer: one + // is held by Messenger, another by RpcServer. If not calling Socket::Close() + // here, it would necessary to wait until Messenger::Shutdown() is called for + // the corresponding messenger object to close this socket. + ignore_result(socket_.Close()); +} + +Sockaddr AcceptorPool::bind_address() const { + return bind_address_; +} + +Status AcceptorPool::GetBoundAddress(Sockaddr* addr) const { + return socket_.GetSocketAddress(addr); +} + +int64_t AcceptorPool::num_rpc_connections_accepted() const { + return rpc_connections_accepted_->value(); +} + +void AcceptorPool::RunThread() { + while (true) { + Socket new_sock; + Sockaddr remote; + VLOG(2) << "calling accept() on socket " << socket_.GetFd() + << " listening on " << bind_address_.ToString(); + Status s = socket_.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING); + if (!s.ok()) { + if (Release_Load(&closing_)) { + break; + } + KLOG_EVERY_N_SECS(WARNING, 1) << "AcceptorPool: accept failed: " << s.ToString() + << THROTTLE_MSG; + continue; + } + s = new_sock.SetNoDelay(true); + if (!s.ok()) { + KLOG_EVERY_N_SECS(WARNING, 1) << "Acceptor with remote = " << remote.ToString() + << " failed to set TCP_NODELAY on a newly accepted socket: " + << s.ToString() << THROTTLE_MSG; + continue; + } + rpc_connections_accepted_->Increment(); + messenger_->RegisterInboundSocket(&new_sock, remote); + } + VLOG(1) << "AcceptorPool shutting down."; +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/acceptor_pool.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/acceptor_pool.h b/be/src/kudu/rpc/acceptor_pool.h new file mode 100644 index 0000000..ba1996a --- /dev/null +++ b/be/src/kudu/rpc/acceptor_pool.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_RPC_ACCEPTOR_POOL_H +#define KUDU_RPC_ACCEPTOR_POOL_H + +#include <stdint.h> +#include <vector> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/status.h" + +namespace kudu { + +class Counter; +class Thread; + +namespace rpc { + +class Messenger; + +// A pool of threads calling accept() to create new connections. +// Acceptor pool threads terminate when they notice that the messenger has been +// shut down, if Shutdown() is called, or if the pool object is destructed. +class AcceptorPool { + public: + // Create a new acceptor pool. Calls socket::Release to take ownership of the + // socket. + // 'socket' must be already bound, but should not yet be listening. + AcceptorPool(Messenger *messenger, Socket *socket, Sockaddr bind_address); + ~AcceptorPool(); + + // Start listening and accepting connections. + Status Start(int num_threads); + void Shutdown(); + + // Return the address that the pool is bound to. If the port is specified as + // 0, then this will always return port 0. + Sockaddr bind_address() const; + + // Return the address that the pool is bound to. This only works while the + // socket is open, and if the specified port is 0 then this will return the + // actual port that was bound. + Status GetBoundAddress(Sockaddr* addr) const; + + // Return the number of connections accepted by this messenger. Thread-safe. + int64_t num_rpc_connections_accepted() const; + + private: + void RunThread(); + + Messenger *messenger_; + Socket socket_; + Sockaddr bind_address_; + std::vector<scoped_refptr<kudu::Thread> > threads_; + + scoped_refptr<Counter> rpc_connections_accepted_; + + Atomic32 closing_; + + DISALLOW_COPY_AND_ASSIGN(AcceptorPool); +}; + +} // namespace rpc +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/blocking_ops.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/blocking_ops.cc b/be/src/kudu/rpc/blocking_ops.cc new file mode 100644 index 0000000..f5cd644 --- /dev/null +++ b/be/src/kudu/rpc/blocking_ops.cc @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/blocking_ops.h" + +#include <cstdint> +#include <cstring> +#include <ostream> + +#include <glog/logging.h> +#include <google/protobuf/message_lite.h> + +#include "kudu/gutil/endian.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/constants.h" +#include "kudu/rpc/serialization.h" +#include "kudu/rpc/transfer.h" +#include "kudu/util/faststring.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/slice.h" +#include "kudu/util/status.h" + +namespace kudu { +namespace rpc { + +using google::protobuf::MessageLite; + +const char kHTTPHeader[] = "HTTP"; + +Status CheckInBlockingMode(const Socket* sock) { + bool is_nonblocking; + RETURN_NOT_OK(sock->IsNonBlocking(&is_nonblocking)); + if (is_nonblocking) { + static const char* const kErrMsg = "socket is not in blocking mode"; + LOG(DFATAL) << kErrMsg; + return Status::IllegalState(kErrMsg); + } + return Status::OK(); +} + +Status SendFramedMessageBlocking(Socket* sock, const MessageLite& header, const MessageLite& msg, + const MonoTime& deadline) { + DCHECK(sock != nullptr); + DCHECK(header.IsInitialized()) << "header protobuf must be initialized"; + DCHECK(msg.IsInitialized()) << "msg protobuf must be initialized"; + + // Ensure we are in blocking mode. + // These blocking calls are typically not in the fast path, so doing this for all build types. + RETURN_NOT_OK(CheckInBlockingMode(sock)); + + // Serialize message + faststring param_buf; + serialization::SerializeMessage(msg, ¶m_buf); + + // Serialize header and initial length + faststring header_buf; + serialization::SerializeHeader(header, param_buf.size(), &header_buf); + + // Write header & param to stream + size_t nsent; + RETURN_NOT_OK(sock->BlockingWrite(header_buf.data(), header_buf.size(), &nsent, deadline)); + RETURN_NOT_OK(sock->BlockingWrite(param_buf.data(), param_buf.size(), &nsent, deadline)); + + return Status::OK(); +} + +Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf, + MessageLite* header, Slice* param_buf, const MonoTime& deadline) { + DCHECK(sock != nullptr); + DCHECK(recv_buf != nullptr); + DCHECK(header != nullptr); + DCHECK(param_buf != nullptr); + + RETURN_NOT_OK(CheckInBlockingMode(sock)); + + // Read the message prefix, which specifies the length of the payload. + recv_buf->clear(); + recv_buf->resize(kMsgLengthPrefixLength); + size_t recvd = 0; + RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data(), kMsgLengthPrefixLength, &recvd, deadline)); + uint32_t payload_len = NetworkByteOrder::Load32(recv_buf->data()); + + // Verify that the payload size isn't out of bounds. + // This can happen because of network corruption, or a naughty client. + if (PREDICT_FALSE(payload_len > FLAGS_rpc_max_message_size)) { + // A common user mistake is to try to speak the Kudu RPC protocol to an + // HTTP endpoint, or vice versa. + if (memcmp(recv_buf->data(), kHTTPHeader, strlen(kHTTPHeader)) == 0) { + return Status::IOError( + "received invalid RPC message which appears to be an HTTP response. " + "Verify that you have specified a valid RPC port and not an HTTP port."); + } + + return Status::IOError( + strings::Substitute( + "received invalid message of size $0 which exceeds" + " the rpc_max_message_size of $1 bytes", + payload_len, FLAGS_rpc_max_message_size)); + } + + // Read the message payload. + recvd = 0; + recv_buf->resize(payload_len + kMsgLengthPrefixLength); + RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data() + kMsgLengthPrefixLength, + payload_len, &recvd, deadline)); + RETURN_NOT_OK(serialization::ParseMessage(Slice(*recv_buf), header, param_buf)); + return Status::OK(); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/blocking_ops.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/blocking_ops.h b/be/src/kudu/rpc/blocking_ops.h new file mode 100644 index 0000000..b305ba7 --- /dev/null +++ b/be/src/kudu/rpc/blocking_ops.h @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_RPC_BLOCKING_OPS_H +#define KUDU_RPC_BLOCKING_OPS_H + +namespace google { +namespace protobuf { +class MessageLite; +} // namespace protobuf +} // namespace google + +namespace kudu { + +class faststring; +class MonoTime; +class Slice; +class Socket; +class Status; + +namespace rpc { + +// Returns OK if socket is in blocking mode. Otherwise, returns an error. +Status CheckInBlockingMode(const Socket* sock); + +// Encode and send a message over a socket. +// header: Request or Response header protobuf. +// msg: Protobuf message to send. This message must be fully initialized. +// deadline: Latest time allowed for receive to complete before timeout. +Status SendFramedMessageBlocking(Socket* sock, const google::protobuf::MessageLite& header, + const google::protobuf::MessageLite& msg, const MonoTime& deadline); + +// Receive a full message frame from the server. +// recv_buf: buffer to use for reading the data from the socket. +// header: Request or Response header protobuf. +// param_buf: Slice into recv_buf containing unparsed RPC param protobuf data. +// deadline: Latest time allowed for receive to complete before timeout. +Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf, + google::protobuf::MessageLite* header, Slice* param_buf, const MonoTime& deadline); + +} // namespace rpc +} // namespace kudu + +#endif // KUDU_RPC_BLOCKING_OPS_H http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/client_negotiation.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/client_negotiation.cc b/be/src/kudu/rpc/client_negotiation.cc new file mode 100644 index 0000000..02175f6 --- /dev/null +++ b/be/src/kudu/rpc/client_negotiation.cc @@ -0,0 +1,853 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/client_negotiation.h" + +#include <cstdint> +#include <cstring> +#include <map> +#include <memory> +#include <ostream> +#include <set> +#include <string> + +#include <gflags/gflags_declare.h> +#include <glog/logging.h> +#include <gssapi/gssapi.h> +#include <gssapi/gssapi_krb5.h> +#include <sasl/sasl.h> + +#include "kudu/gutil/basictypes.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/blocking_ops.h" +#include "kudu/rpc/constants.h" +#include "kudu/rpc/messenger.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/sasl_common.h" +#include "kudu/rpc/sasl_helper.h" +#include "kudu/rpc/serialization.h" +#include "kudu/security/cert.h" +#include "kudu/security/tls_context.h" +#include "kudu/security/tls_handshake.h" +#include "kudu/util/faststring.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/slice.h" +#include "kudu/util/trace.h" + +using std::map; +using std::set; +using std::string; +using std::unique_ptr; + +using strings::Substitute; + +DECLARE_bool(rpc_encrypt_loopback_connections); + +namespace kudu { +namespace rpc { + +static int ClientNegotiationGetoptCb(ClientNegotiation* client_negotiation, + const char* plugin_name, + const char* option, + const char** result, + unsigned* len) { + return client_negotiation->GetOptionCb(plugin_name, option, result, len); +} + +static int ClientNegotiationSimpleCb(ClientNegotiation* client_negotiation, + int id, + const char** result, + unsigned* len) { + return client_negotiation->SimpleCb(id, result, len); +} + +static int ClientNegotiationSecretCb(sasl_conn_t* conn, + ClientNegotiation* client_negotiation, + int id, + sasl_secret_t** psecret) { + return client_negotiation->SecretCb(conn, id, psecret); +} + +// Return an appropriately-typed Status object based on an ErrorStatusPB returned +// from an Error RPC. +// In case there is no relevant Status type, return a RuntimeError. +static Status StatusFromRpcError(const ErrorStatusPB& error) { + DCHECK(error.IsInitialized()) << "Error status PB must be initialized"; + if (PREDICT_FALSE(!error.has_code())) { + return Status::RuntimeError(error.message()); + } + const string code_name = ErrorStatusPB::RpcErrorCodePB_Name(error.code()); + switch (error.code()) { + case ErrorStatusPB_RpcErrorCodePB_FATAL_UNAUTHORIZED: // fall-through + case ErrorStatusPB_RpcErrorCodePB_FATAL_INVALID_AUTHENTICATION_TOKEN: + return Status::NotAuthorized(code_name, error.message()); + case ErrorStatusPB_RpcErrorCodePB_ERROR_UNAVAILABLE: + return Status::ServiceUnavailable(code_name, error.message()); + default: + return Status::RuntimeError(code_name, error.message()); + } +} + +ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket, + const security::TlsContext* tls_context, + boost::optional<security::SignedTokenPB> authn_token, + RpcEncryption encryption, + std::string sasl_proto_name) + : socket_(std::move(socket)), + helper_(SaslHelper::CLIENT), + tls_context_(tls_context), + encryption_(encryption), + tls_negotiated_(false), + authn_token_(std::move(authn_token)), + psecret_(nullptr, std::free), + negotiated_authn_(AuthenticationType::INVALID), + negotiated_mech_(SaslMechanism::INVALID), + sasl_proto_name_(std::move(sasl_proto_name)), + deadline_(MonoTime::Max()) { + callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT, + reinterpret_cast<int (*)()>(&ClientNegotiationGetoptCb), this)); + callbacks_.push_back(SaslBuildCallback(SASL_CB_AUTHNAME, + reinterpret_cast<int (*)()>(&ClientNegotiationSimpleCb), this)); + callbacks_.push_back(SaslBuildCallback(SASL_CB_PASS, + reinterpret_cast<int (*)()>(&ClientNegotiationSecretCb), this)); + callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr)); + DCHECK(socket_); + DCHECK(tls_context_); +} + +Status ClientNegotiation::EnablePlain(const string& user, const string& pass) { + RETURN_NOT_OK(helper_.EnablePlain()); + plain_auth_user_ = user; + plain_pass_ = pass; + return Status::OK(); +} + +Status ClientNegotiation::EnableGSSAPI() { + return helper_.EnableGSSAPI(); +} + +SaslMechanism::Type ClientNegotiation::negotiated_mechanism() const { + return negotiated_mech_; +} + +void ClientNegotiation::set_server_fqdn(const string& domain_name) { + helper_.set_server_fqdn(domain_name); +} + +void ClientNegotiation::set_deadline(const MonoTime& deadline) { + deadline_ = deadline; +} + +Status ClientNegotiation::Negotiate(unique_ptr<ErrorStatusPB>* rpc_error) { + TRACE("Beginning negotiation"); + + // Ensure we can use blocking calls on the socket during negotiation. + RETURN_NOT_OK(CheckInBlockingMode(socket_.get())); + + // Step 1: send the connection header. + RETURN_NOT_OK(SendConnectionHeader()); + + faststring recv_buf; + + { // Step 2: send and receive the NEGOTIATE step messages. + RETURN_NOT_OK(SendNegotiate()); + NegotiatePB response; + RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf, rpc_error)); + RETURN_NOT_OK(HandleNegotiate(response)); + TRACE("Negotiated authn=$0", AuthenticationTypeToString(negotiated_authn_)); + } + + // Step 3: if both ends support TLS, do a TLS handshake. + // TODO(KUDU-1921): allow the client to require TLS. + if (encryption_ != RpcEncryption::DISABLED && + ContainsKey(server_features_, TLS)) { + RETURN_NOT_OK(tls_context_->InitiateHandshake(security::TlsHandshakeType::CLIENT, + &tls_handshake_)); + + if (negotiated_authn_ == AuthenticationType::SASL) { + // When using SASL authentication, verifying the server's certificate is + // not necessary. This allows the client to still use TLS encryption for + // connections to servers which only have a self-signed certificate. + tls_handshake_.set_verification_mode(security::TlsVerificationMode::VERIFY_NONE); + } + + // To initiate the TLS handshake, we pretend as if the server sent us an + // empty TLS_HANDSHAKE token. + NegotiatePB initial; + initial.set_step(NegotiatePB::TLS_HANDSHAKE); + initial.set_tls_handshake(""); + Status s = HandleTlsHandshake(initial); + + while (s.IsIncomplete()) { + NegotiatePB response; + RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf, rpc_error)); + s = HandleTlsHandshake(response); + } + RETURN_NOT_OK(s); + tls_negotiated_ = true; + } + + // Step 4: Authentication + switch (negotiated_authn_) { + case AuthenticationType::SASL: + RETURN_NOT_OK(AuthenticateBySasl(&recv_buf, rpc_error)); + break; + case AuthenticationType::TOKEN: + RETURN_NOT_OK(AuthenticateByToken(&recv_buf, rpc_error)); + break; + case AuthenticationType::CERTIFICATE: + // The TLS handshake has already authenticated the server. + break; + case AuthenticationType::INVALID: LOG(FATAL) << "unreachable"; + } + + // Step 5: Send connection context. + RETURN_NOT_OK(SendConnectionContext()); + + TRACE("Negotiation successful"); + return Status::OK(); +} + +Status ClientNegotiation::SendNegotiatePB(const NegotiatePB& msg) { + RequestHeader header; + header.set_call_id(kNegotiateCallId); + + DCHECK(socket_); + DCHECK(msg.IsInitialized()) << "message must be initialized"; + DCHECK(msg.has_step()) << "message must have a step"; + + TRACE("Sending $0 NegotiatePB request", NegotiatePB::NegotiateStep_Name(msg.step())); + return SendFramedMessageBlocking(socket(), header, msg, deadline_); +} + +Status ClientNegotiation::RecvNegotiatePB(NegotiatePB* msg, + faststring* buffer, + unique_ptr<ErrorStatusPB>* rpc_error) { + ResponseHeader header; + Slice param_buf; + RETURN_NOT_OK(ReceiveFramedMessageBlocking(socket(), buffer, &header, ¶m_buf, deadline_)); + RETURN_NOT_OK(helper_.CheckNegotiateCallId(header.call_id())); + + if (header.is_error()) { + return ParseError(param_buf, rpc_error); + } + + RETURN_NOT_OK(helper_.ParseNegotiatePB(param_buf, msg)); + TRACE("Received $0 NegotiatePB response", NegotiatePB::NegotiateStep_Name(msg->step())); + return Status::OK(); +} + +Status ClientNegotiation::ParseError(const Slice& err_data, + unique_ptr<ErrorStatusPB>* rpc_error) { + unique_ptr<ErrorStatusPB> error(new ErrorStatusPB); + if (!error->ParseFromArray(err_data.data(), err_data.size())) { + return Status::IOError("invalid error response, missing fields", + error->InitializationErrorString()); + } + Status s = StatusFromRpcError(*error); + TRACE("Received error response from server: $0", s.ToString()); + + if (rpc_error) { + rpc_error->swap(error); + } + return s; +} + +Status ClientNegotiation::SendConnectionHeader() { + const uint8_t buflen = kMagicNumberLength + kHeaderFlagsLength; + uint8_t buf[buflen]; + serialization::SerializeConnHeader(buf); + size_t nsent; + return socket()->BlockingWrite(buf, buflen, &nsent, deadline_); +} + +Status ClientNegotiation::InitSaslClient() { + // TODO(KUDU-1922): consider setting SASL_SUCCESS_DATA + unsigned flags = 0; + + sasl_conn_t* sasl_conn = nullptr; + RETURN_NOT_OK_PREPEND(WrapSaslCall(nullptr /* no conn */, [&]() { + return sasl_client_new( + sasl_proto_name_.c_str(), // Registered name of the service using SASL. Required. + helper_.server_fqdn(), // The fully qualified domain name of the remote server. + nullptr, // Local and remote IP address strings. (we don't use + nullptr, // any mechanisms which require this info.) + &callbacks_[0], // Connection-specific callbacks. + flags, + &sasl_conn); + }), Substitute("unable to create new SASL $0 client", sasl_proto_name_)); + sasl_conn_.reset(sasl_conn); + return Status::OK(); +} + +Status ClientNegotiation::SendNegotiate() { + NegotiatePB msg; + msg.set_step(NegotiatePB::NEGOTIATE); + + // Advertise our supported features. + client_features_ = kSupportedClientRpcFeatureFlags; + + if (encryption_ != RpcEncryption::DISABLED) { + client_features_.insert(TLS); + // If the remote peer is local, then we allow using TLS for authentication + // without encryption or integrity. + if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) { + client_features_.insert(TLS_AUTHENTICATION_ONLY); + } + } + + for (RpcFeatureFlag feature : client_features_) { + msg.add_supported_features(feature); + } + + if (!helper_.EnabledMechs().empty()) { + msg.add_authn_types()->mutable_sasl(); + } + if (tls_context_->has_signed_cert() && !tls_context_->is_external_cert()) { + // We only provide authenticated TLS if the certificates are generated + // by the internal CA. + msg.add_authn_types()->mutable_certificate(); + } + if (authn_token_ && tls_context_->has_trusted_cert()) { + // TODO(KUDU-1924): check that the authn token is not expired. Can this be done + // reliably on clients? + msg.add_authn_types()->mutable_token(); + } + + if (PREDICT_FALSE(msg.authn_types().empty())) { + return Status::NotAuthorized("client is not configured with an authentication type"); + } + + RETURN_NOT_OK(SendNegotiatePB(msg)); + return Status::OK(); +} + +Status ClientNegotiation::HandleNegotiate(const NegotiatePB& response) { + if (PREDICT_FALSE(response.step() != NegotiatePB::NEGOTIATE)) { + return Status::NotAuthorized("expected NEGOTIATE step", + NegotiatePB::NegotiateStep_Name(response.step())); + } + TRACE("Received NEGOTIATE response from server"); + + // Fill in the set of features supported by the server. + for (int flag : response.supported_features()) { + // We only add the features that our local build knows about. + RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ? + static_cast<RpcFeatureFlag>(flag) : UNKNOWN; + if (feature_flag != UNKNOWN) { + server_features_.insert(feature_flag); + } + } + + if (encryption_ == RpcEncryption::REQUIRED && + !ContainsKey(server_features_, RpcFeatureFlag::TLS)) { + return Status::NotAuthorized("server does not support required TLS encryption"); + } + + // Get the authentication type which the server would like to use. + DCHECK_LE(response.authn_types().size(), 1); + if (response.authn_types().empty()) { + // If the server doesn't send back an authentication type, default to SASL + // in order to maintain backwards compatibility. + negotiated_authn_ = AuthenticationType::SASL; + } else { + const auto& authn_type = response.authn_types(0); + switch (authn_type.type_case()) { + case AuthenticationTypePB::kSasl: + negotiated_authn_ = AuthenticationType::SASL; + break; + case AuthenticationTypePB::kToken: + // TODO(todd): we should also be checking tls_context_->has_trusted_cert() + // here to match the original logic we used to advertise TOKEN support, + // or perhaps just check explicitly whether we advertised TOKEN. + if (!authn_token_) { + return Status::RuntimeError( + "server chose token authentication, but client has no token"); + } + negotiated_authn_ = AuthenticationType::TOKEN; + return Status::OK(); + case AuthenticationTypePB::kCertificate: + if (!tls_context_->has_signed_cert()) { + return Status::RuntimeError( + "server chose certificate authentication, but client has no certificate"); + } + negotiated_authn_ = AuthenticationType::CERTIFICATE; + return Status::OK(); + case AuthenticationTypePB::TYPE_NOT_SET: + return Status::RuntimeError("server chose an unknown authentication type"); + } + } + + DCHECK_EQ(negotiated_authn_, AuthenticationType::SASL); + + // Build a map of the SASL mechanisms offered by the server. + set<SaslMechanism::Type> client_mechs(helper_.EnabledMechs()); + set<SaslMechanism::Type> server_mechs; + for (const NegotiatePB::SaslMechanism& sasl_mech : response.sasl_mechanisms()) { + auto mech = SaslMechanism::value_of(sasl_mech.mechanism()); + if (mech == SaslMechanism::INVALID) { + continue; + } + server_mechs.insert(mech); + } + + // Determine which SASL mechanism to use for authenticating the connection. + // We pick the most preferred mechanism which is supported by both parties. + // The preference list in order of most to least preferred: + // * GSSAPI + // * PLAIN + // + // TODO(KUDU-1921): allow the client to require authentication. + if (ContainsKey(client_mechs, SaslMechanism::GSSAPI) && + ContainsKey(server_mechs, SaslMechanism::GSSAPI)) { + + // Check that the client has local Kerberos credentials, and if not fall + // back to an alternate mechanism. + Status s = CheckGSSAPI(); + if (s.ok()) { + negotiated_mech_ = SaslMechanism::GSSAPI; + return Status::OK(); + } + + TRACE("Kerberos authentication credentials are not available: $0", s.ToString()); + client_mechs.erase(SaslMechanism::GSSAPI); + } + + if (ContainsKey(client_mechs, SaslMechanism::PLAIN) && + ContainsKey(server_mechs, SaslMechanism::PLAIN)) { + negotiated_mech_ = SaslMechanism::PLAIN; + return Status::OK(); + } + + // There are no mechanisms in common. + if (ContainsKey(server_mechs, SaslMechanism::GSSAPI) && + !ContainsKey(client_mechs, SaslMechanism::GSSAPI)) { + return Status::NotAuthorized("server requires authentication, " + "but client does not have Kerberos credentials available"); + } + if (!ContainsKey(server_mechs, SaslMechanism::GSSAPI) && + ContainsKey(client_mechs, SaslMechanism::GSSAPI)) { + return Status::NotAuthorized("client requires authentication, " + "but server does not have Kerberos enabled"); + } + string msg = Substitute("client/server supported SASL mechanism mismatch; " + "client mechanisms: [$0], server mechanisms: [$1]", + JoinMapped(client_mechs, SaslMechanism::name_of, ", "), + JoinMapped(server_mechs, SaslMechanism::name_of, ", ")); + + // For now, there should never be a SASL mechanism mismatch that isn't due + // to one of the sides requiring Kerberos and the other not having it, so + // lets sanity check that. + DCHECK(STLSetIntersection(client_mechs, server_mechs).empty()) << msg; + return Status::NotAuthorized(msg); +} + +Status ClientNegotiation::SendTlsHandshake(string tls_token) { + TRACE("Sending TLS_HANDSHAKE message to server"); + NegotiatePB msg; + msg.set_step(NegotiatePB::TLS_HANDSHAKE); + msg.mutable_tls_handshake()->swap(tls_token); + return SendNegotiatePB(msg); +} + +Status ClientNegotiation::HandleTlsHandshake(const NegotiatePB& response) { + if (PREDICT_FALSE(response.step() != NegotiatePB::TLS_HANDSHAKE)) { + return Status::NotAuthorized("expected TLS_HANDSHAKE step", + NegotiatePB::NegotiateStep_Name(response.step())); + } + TRACE("Received TLS_HANDSHAKE response from server"); + + if (PREDICT_FALSE(!response.has_tls_handshake())) { + return Status::NotAuthorized("No TLS handshake token in TLS_HANDSHAKE response from server"); + } + + string token; + Status s = tls_handshake_.Continue(response.tls_handshake(), &token); + if (s.IsIncomplete()) { + // Another roundtrip is required to complete the handshake. + RETURN_NOT_OK(SendTlsHandshake(std::move(token))); + } + + // Check that the handshake step didn't produce an error. Will also propagate + // an Incomplete status. + RETURN_NOT_OK(s); + + // TLS handshake is finished. + if (ContainsKey(server_features_, TLS_AUTHENTICATION_ONLY) && + ContainsKey(client_features_, TLS_AUTHENTICATION_ONLY)) { + TRACE("Negotiated auth-only $0 with cipher $1", + tls_handshake_.GetProtocol(), tls_handshake_.GetCipherDescription()); + return tls_handshake_.FinishNoWrap(*socket_); + } + + TRACE("Negotiated $0 with cipher $1", + tls_handshake_.GetProtocol(), tls_handshake_.GetCipherDescription()); + return tls_handshake_.Finish(&socket_); +} + +Status ClientNegotiation::AuthenticateBySasl(faststring* recv_buf, + unique_ptr<ErrorStatusPB>* rpc_error) { + RETURN_NOT_OK(InitSaslClient()); + Status s = SendSaslInitiate(); + + // HandleSasl[Initiate, Challenge] return incomplete if an additional + // challenge step is required, or OK if a SASL_SUCCESS message is expected. + while (s.IsIncomplete()) { + NegotiatePB challenge; + RETURN_NOT_OK(RecvNegotiatePB(&challenge, recv_buf, rpc_error)); + s = HandleSaslChallenge(challenge); + } + + // Propagate failure from SendSaslInitiate or HandleSaslChallenge. + RETURN_NOT_OK(s); + + // Server challenges are over; we now expect the success message. + NegotiatePB success; + RETURN_NOT_OK(RecvNegotiatePB(&success, recv_buf, rpc_error)); + return HandleSaslSuccess(success); +} + +Status ClientNegotiation::AuthenticateByToken(faststring* recv_buf, + unique_ptr<ErrorStatusPB>* rpc_error) { + // Sanity check that TLS has been negotiated. Sending the token on an + // unencrypted channel is a big no-no. + CHECK(tls_negotiated_); + + // Send the token to the server. + NegotiatePB pb; + pb.set_step(NegotiatePB::TOKEN_EXCHANGE); + *pb.mutable_authn_token() = std::move(*authn_token_); + RETURN_NOT_OK(SendNegotiatePB(pb)); + pb.Clear(); + + // Check that the server responds with a non-error TOKEN_EXCHANGE message. + RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf, rpc_error)); + if (pb.step() != NegotiatePB::TOKEN_EXCHANGE) { + return Status::NotAuthorized("expected TOKEN_EXCHANGE step", + NegotiatePB::NegotiateStep_Name(pb.step())); + } + + return Status::OK(); +} + +Status ClientNegotiation::SendSaslInitiate() { + TRACE("Initiating SASL $0 handshake", SaslMechanism::name_of(negotiated_mech_)); + + // At this point we've already chosen the SASL mechanism to use + // (negotiated_mech_), but we need to let the SASL library know. SASL likes to + // choose the mechanism from among a list of possible options, so we simply + // provide it one option, and then check that it picks that option. + + const char* init_msg = nullptr; + unsigned init_msg_len = 0; + const char* negotiated_mech = nullptr; + + /* select a mechanism for a connection + * mechlist -- mechanisms server has available (punctuation ignored) + * output: + * prompt_need -- on SASL_INTERACT, list of prompts needed to continue + * clientout -- the initial client response to send to the server + * mech -- set to mechanism name + * + * Returns: + * SASL_OK -- success + * SASL_CONTINUE -- negotiation required + * SASL_NOMEM -- not enough memory + * SASL_NOMECH -- no mechanism meets requested properties + * SASL_INTERACT -- user interaction needed to fill in prompt_need list + */ + TRACE("Calling sasl_client_start()"); + const Status s = WrapSaslCall(sasl_conn_.get(), [&]() { + return sasl_client_start( + sasl_conn_.get(), // The SASL connection context created by init() + SaslMechanism::name_of(negotiated_mech_), // The list of mechanisms to negotiate. + nullptr, // Disables INTERACT return if NULL. + &init_msg, // Filled in on success. + &init_msg_len, // Filled in on success. + &negotiated_mech); // Filled in on success. + }); + + if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) { + return s; + } + + // Check that the SASL library is using the mechanism that we picked. + DCHECK_EQ(SaslMechanism::value_of(negotiated_mech), negotiated_mech_); + + // If the negotiated mechanism is GSSAPI (Kerberos), configure SASL to use + // integrity protection so that the channel bindings and nonce can be + // verified. + if (negotiated_mech_ == SaslMechanism::GSSAPI) { + RETURN_NOT_OK(EnableProtection(sasl_conn_.get(), SaslProtection::kIntegrity)); + } + + NegotiatePB msg; + msg.set_step(NegotiatePB::SASL_INITIATE); + msg.mutable_token()->assign(init_msg, init_msg_len); + msg.add_sasl_mechanisms()->set_mechanism(negotiated_mech); + RETURN_NOT_OK(SendNegotiatePB(msg)); + return s; +} + +Status ClientNegotiation::SendSaslResponse(const char* resp_msg, unsigned resp_msg_len) { + NegotiatePB reply; + reply.set_step(NegotiatePB::SASL_RESPONSE); + reply.mutable_token()->assign(resp_msg, resp_msg_len); + return SendNegotiatePB(reply); +} + +Status ClientNegotiation::HandleSaslChallenge(const NegotiatePB& response) { + if (PREDICT_FALSE(response.step() != NegotiatePB::SASL_CHALLENGE)) { + return Status::NotAuthorized("expected SASL_CHALLENGE step", + NegotiatePB::NegotiateStep_Name(response.step())); + } + TRACE("Received SASL_CHALLENGE response from server"); + if (PREDICT_FALSE(!response.has_token())) { + return Status::NotAuthorized("no token in SASL_CHALLENGE response from server"); + } + + const char* out = nullptr; + unsigned out_len = 0; + const Status s = DoSaslStep(response.token(), &out, &out_len); + if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) { + return s; + } + + RETURN_NOT_OK(SendSaslResponse(out, out_len)); + return s; +} + +Status ClientNegotiation::HandleSaslSuccess(const NegotiatePB& response) { + if (PREDICT_FALSE(response.step() != NegotiatePB::SASL_SUCCESS)) { + return Status::NotAuthorized("expected SASL_SUCCESS step", + NegotiatePB::NegotiateStep_Name(response.step())); + } + TRACE("Received SASL_SUCCESS response from server"); + + if (negotiated_mech_ == SaslMechanism::GSSAPI) { + if (response.has_nonce()) { + // Grab the nonce from the server, if it has sent one. We'll send it back + // later with SASL integrity protection as part of the connection context. + nonce_ = response.nonce(); + } + + if (tls_negotiated_) { + // Check the channel bindings provided by the server against the expected channel bindings. + if (!response.has_channel_bindings()) { + return Status::NotAuthorized("no channel bindings provided by server"); + } + + security::Cert cert; + RETURN_NOT_OK(tls_handshake_.GetRemoteCert(&cert)); + + string expected_channel_bindings; + RETURN_NOT_OK_PREPEND(cert.GetServerEndPointChannelBindings(&expected_channel_bindings), + "failed to generate channel bindings"); + + Slice received_channel_bindings; + RETURN_NOT_OK_PREPEND(SaslDecode(sasl_conn_.get(), + response.channel_bindings(), + &received_channel_bindings), + "failed to decode channel bindings"); + + if (expected_channel_bindings != received_channel_bindings) { + Sockaddr addr; + ignore_result(socket_->GetPeerAddress(&addr)); + + LOG(WARNING) << "Received invalid channel bindings from server " + << addr.ToString() + << ", this could indicate an active network man-in-the-middle"; + return Status::NotAuthorized("channel bindings do not match"); + } + } + } + + return Status::OK(); +} + +Status ClientNegotiation::DoSaslStep(const string& in, const char** out, unsigned* out_len) { + TRACE("Calling sasl_client_step()"); + + return WrapSaslCall(sasl_conn_.get(), [&]() { + return sasl_client_step(sasl_conn_.get(), in.c_str(), in.length(), nullptr, out, out_len); + }); +} + +Status ClientNegotiation::SendConnectionContext() { + TRACE("Sending connection context"); + RequestHeader header; + header.set_call_id(kConnectionContextCallId); + + ConnectionContextPB conn_context; + // This field is deprecated but used by servers <Kudu 1.1. Newer server versions ignore + // this and use the SASL-provided username instead. + conn_context.mutable_deprecated_user_info()->set_real_user( + plain_auth_user_.empty() ? "cpp-client" : plain_auth_user_); + + if (nonce_) { + // Reply with the SASL-protected nonce. We only set the nonce when using SASL GSSAPI. + Slice ciphertext; + RETURN_NOT_OK(SaslEncode(sasl_conn_.get(), *nonce_, &ciphertext)); + *conn_context.mutable_encoded_nonce() = ciphertext.ToString(); + } + + return SendFramedMessageBlocking(socket(), header, conn_context, deadline_); +} + +int ClientNegotiation::GetOptionCb(const char* plugin_name, const char* option, + const char** result, unsigned* len) { + return helper_.GetOptionCb(plugin_name, option, result, len); +} + +// Used for PLAIN. +// SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE +int ClientNegotiation::SimpleCb(int id, const char** result, unsigned* len) { + if (PREDICT_FALSE(!helper_.IsPlainEnabled())) { + LOG(DFATAL) << "Simple callback called, but PLAIN auth is not enabled"; + return SASL_FAIL; + } + if (PREDICT_FALSE(result == nullptr)) { + LOG(DFATAL) << "result outparam is NULL"; + return SASL_BADPARAM; + } + switch (id) { + // TODO(unknown): Support impersonation? + // For impersonation, USER is the impersonated user, AUTHNAME is the "sudoer". + case SASL_CB_USER: + TRACE("callback for SASL_CB_USER"); + *result = plain_auth_user_.c_str(); + if (len != nullptr) *len = plain_auth_user_.length(); + break; + case SASL_CB_AUTHNAME: + TRACE("callback for SASL_CB_AUTHNAME"); + *result = plain_auth_user_.c_str(); + if (len != nullptr) *len = plain_auth_user_.length(); + break; + case SASL_CB_LANGUAGE: + LOG(DFATAL) << "Unable to handle SASL callback type SASL_CB_LANGUAGE" + << "(" << id << ")"; + return SASL_BADPARAM; + default: + LOG(DFATAL) << "Unexpected SASL callback type: " << id; + return SASL_BADPARAM; + } + + return SASL_OK; +} + +// Used for PLAIN. +// SASL callback for SASL_CB_PASS: User password. +int ClientNegotiation::SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret) { + if (PREDICT_FALSE(!helper_.IsPlainEnabled())) { + LOG(DFATAL) << "Plain secret callback called, but PLAIN auth is not enabled"; + return SASL_FAIL; + } + switch (id) { + case SASL_CB_PASS: { + if (!conn || !psecret) return SASL_BADPARAM; + + size_t len = plain_pass_.length(); + *psecret = reinterpret_cast<sasl_secret_t*>(malloc(sizeof(sasl_secret_t) + len)); + if (!*psecret) { + return SASL_NOMEM; + } + psecret_.reset(*psecret); // Ensure that we free() this structure later. + (*psecret)->len = len; + memcpy((*psecret)->data, plain_pass_.c_str(), len + 1); + break; + } + default: + LOG(DFATAL) << "Unexpected SASL callback type: " << id; + return SASL_BADPARAM; + } + + return SASL_OK; +} + +namespace { +// Retrieve the GSSAPI error description for an error code and type. +string gss_error_description(OM_uint32 code, int type) { + string description; + OM_uint32 message_context = 0; + + do { + if (!description.empty()) { + description.append(": "); + } + OM_uint32 minor = 0; + gss_buffer_desc buf; + gss_display_status(&minor, code, type, GSS_C_NULL_OID, &message_context, &buf); + description.append(static_cast<const char*>(buf.value), buf.length); + gss_release_buffer(&minor, &buf); + } while (message_context != 0); + + return description; +} + +// Transforms a GSSAPI major and minor error code into a Kudu Status. +Status check_gss_error(OM_uint32 major, OM_uint32 minor) { + if (GSS_ERROR(major)) { + return Status::NotAuthorized(gss_error_description(major, GSS_C_GSS_CODE), + gss_error_description(minor, GSS_C_MECH_CODE)); + } + return Status::OK(); +} +} // anonymous namespace + +Status ClientNegotiation::CheckGSSAPI() { + OM_uint32 major, minor; + gss_cred_id_t cred = GSS_C_NO_CREDENTIAL; + + // Acquire the Kerberos credential. This will fail if the client does not have + // a Kerberos tgt ticket. In theory it should be sufficient to call + // gss_inquire_cred_by_mech, but that causes a memory leak on RHEL 7. + major = gss_acquire_cred(&minor, + GSS_C_NO_NAME, + GSS_C_INDEFINITE, + const_cast<gss_OID_set>(gss_mech_set_krb5), + GSS_C_INITIATE, + &cred, + nullptr, + nullptr); + Status s = check_gss_error(major, minor); + + // Inspect the Kerberos credential to determine if it is expired. The lifetime + // returned from gss_acquire_cred in the RHEL 6 version of krb5 is always 0, + // so it has to be done with a separate call to gss_inquire_cred. The lifetime + // holds the remaining validity of the tgt in seconds. + OM_uint32 lifetime; + if (s.ok()) { + major = gss_inquire_cred(&minor, cred, nullptr, &lifetime, nullptr, nullptr); + s = check_gss_error(major, minor); + } + + // Release the credential even if gss_inquire_cred fails. + gss_release_cred(&minor, &cred); + RETURN_NOT_OK(s); + + if (lifetime == 0) { + return Status::NotAuthorized("Kerberos ticket expired"); + } + return Status::OK(); +} + +} // namespace rpc +} // namespace kudu