[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1013921335 ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -0,0 +1,582 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PutTCP.h" + +#include +#include + +#include "range/v3/range/conversion.hpp" + +#include "utils/gsl.h" +#include "utils/expected.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/PropertyBuilder.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "controllers/SSLContextService.h" + +#include "asio/ssl.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/write.hpp" +#include "asio/high_resolution_timer.hpp" + +using asio::ip::tcp; +using TcpSocket = asio::ip::tcp::socket; +using SslSocket = asio::ssl::stream; + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname") +->withDescription("The ip address or hostname of the destination.") +->withDefaultValue("localhost") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port") +->withDescription("The port or service on the destination.") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration") +->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout") +->withDescription("The timeout for connecting to and communicating with the destination.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile") +->withDescription("Specifies whether to send each FlowFile's content on an individual connection.") +->withDefaultValue(false) +->isRequired(true) +->supportsExpressionLanguage(false) +->build(); + +const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter") +->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. " + "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. " + "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.") +->isRequired(false) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service") +->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") +->isRequired(false) +->asType() +->build(); + +const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer") +->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.") +->isRequired(false) +->asType() +->build(); + +const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; +const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + +constexpr size_t chunk_size = 1024; + +PutTCP::PutTCP(cons
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1013917235 ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -0,0 +1,582 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PutTCP.h" + +#include +#include + +#include "range/v3/range/conversion.hpp" + +#include "utils/gsl.h" +#include "utils/expected.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/PropertyBuilder.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "controllers/SSLContextService.h" + +#include "asio/ssl.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/write.hpp" +#include "asio/high_resolution_timer.hpp" + +using asio::ip::tcp; +using TcpSocket = asio::ip::tcp::socket; +using SslSocket = asio::ssl::stream; + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname") +->withDescription("The ip address or hostname of the destination.") +->withDefaultValue("localhost") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port") +->withDescription("The port or service on the destination.") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration") +->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout") +->withDescription("The timeout for connecting to and communicating with the destination.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile") +->withDescription("Specifies whether to send each FlowFile's content on an individual connection.") +->withDefaultValue(false) +->isRequired(true) +->supportsExpressionLanguage(false) +->build(); + +const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter") +->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. " + "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. " + "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.") +->isRequired(false) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service") +->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") +->isRequired(false) +->asType() +->build(); + +const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer") +->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.") +->isRequired(false) +->asType() +->build(); + +const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; +const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + +constexpr size_t chunk_size = 1024; + +PutTCP::PutTCP(cons
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1013916908 ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -0,0 +1,582 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PutTCP.h" + +#include +#include + +#include "range/v3/range/conversion.hpp" + +#include "utils/gsl.h" +#include "utils/expected.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/PropertyBuilder.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "controllers/SSLContextService.h" + +#include "asio/ssl.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/write.hpp" +#include "asio/high_resolution_timer.hpp" + +using asio::ip::tcp; +using TcpSocket = asio::ip::tcp::socket; +using SslSocket = asio::ssl::stream; + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname") +->withDescription("The ip address or hostname of the destination.") +->withDefaultValue("localhost") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port") +->withDescription("The port or service on the destination.") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration") +->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout") +->withDescription("The timeout for connecting to and communicating with the destination.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile") +->withDescription("Specifies whether to send each FlowFile's content on an individual connection.") +->withDefaultValue(false) +->isRequired(true) +->supportsExpressionLanguage(false) +->build(); + +const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter") +->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. " + "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. " + "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.") +->isRequired(false) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service") +->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") +->isRequired(false) +->asType() +->build(); + +const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer") +->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.") +->isRequired(false) +->asType() +->build(); + +const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; +const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + +constexpr size_t chunk_size = 1024; + +PutTCP::PutTCP(cons
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1013913817 ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -0,0 +1,582 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PutTCP.h" + +#include +#include + +#include "range/v3/range/conversion.hpp" + +#include "utils/gsl.h" +#include "utils/expected.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/PropertyBuilder.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "controllers/SSLContextService.h" + +#include "asio/ssl.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/write.hpp" +#include "asio/high_resolution_timer.hpp" + +using asio::ip::tcp; +using TcpSocket = asio::ip::tcp::socket; +using SslSocket = asio::ssl::stream; + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname") +->withDescription("The ip address or hostname of the destination.") +->withDefaultValue("localhost") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port") +->withDescription("The port or service on the destination.") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration") +->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout") +->withDescription("The timeout for connecting to and communicating with the destination.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile") +->withDescription("Specifies whether to send each FlowFile's content on an individual connection.") +->withDefaultValue(false) +->isRequired(true) +->supportsExpressionLanguage(false) +->build(); + +const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter") +->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. " + "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. " + "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.") +->isRequired(false) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service") +->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") +->isRequired(false) +->asType() +->build(); + +const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer") +->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.") +->isRequired(false) +->asType() +->build(); + +const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; +const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + +constexpr size_t chunk_size = 1024; + +PutTCP::PutTCP(cons
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011835200 ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -0,0 +1,551 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PutTCP.h" + +#include +#include + +#include "range/v3/range/conversion.hpp" +#include "range/v3/view/transform.hpp" + +#include "utils/gsl.h" +#include "utils/expected.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/PropertyBuilder.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "controllers/SSLContextService.h" + +#include "asio/ssl.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/write.hpp" +#include "asio/high_resolution_timer.hpp" + +using asio::ip::tcp; +using TcpSocket = asio::ip::tcp::socket; +using SslSocket = asio::ssl::stream; + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname") +->withDescription("The ip address or hostname of the destination.") +->withDefaultValue("localhost") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port") +->withDescription("The port or service on the destination.") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration") +->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout") +->withDescription("The timeout for connecting to and communicating with the destination.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile") +->withDescription("Specifies whether to send each FlowFile's content on an individual connection.") +->withDefaultValue(false) +->isRequired(true) +->supportsExpressionLanguage(false) +->build(); + +const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter") +->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. " + "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. " + "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.") +->isRequired(false) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service") +->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") +->isRequired(false) +->asType() +->build(); + +const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer") +->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.") +->isRequired(false) +->asType() +->build(); + +const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; +const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + +PutTCP::PutTCP(con
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011816544 ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -0,0 +1,551 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PutTCP.h" + +#include +#include + +#include "range/v3/range/conversion.hpp" +#include "range/v3/view/transform.hpp" + +#include "utils/gsl.h" +#include "utils/expected.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/PropertyBuilder.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "controllers/SSLContextService.h" + +#include "asio/ssl.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/write.hpp" +#include "asio/high_resolution_timer.hpp" + +using asio::ip::tcp; +using TcpSocket = asio::ip::tcp::socket; +using SslSocket = asio::ssl::stream; + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname") +->withDescription("The ip address or hostname of the destination.") +->withDefaultValue("localhost") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port") +->withDescription("The port or service on the destination.") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration") +->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout") +->withDescription("The timeout for connecting to and communicating with the destination.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile") +->withDescription("Specifies whether to send each FlowFile's content on an individual connection.") +->withDefaultValue(false) +->isRequired(true) +->supportsExpressionLanguage(false) +->build(); + +const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter") +->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. " + "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. " + "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.") +->isRequired(false) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service") +->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") +->isRequired(false) +->asType() +->build(); + +const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer") +->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.") +->isRequired(false) +->asType() +->build(); + +const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; +const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + +PutTCP::PutTCP(con
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011666740 ## libminifi/include/utils/StringUtils.h: ## @@ -37,6 +37,19 @@ #include "utils/gsl.h" #include "utils/meta/detected.h" +// libc++ doesn't define operator<=> on strings, and apparently the operator rewrite rules don't automagically make one +#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000 +#include +#endif + +#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000 +template +constexpr std::strong_ordering operator<=>(const std::basic_string<_CharT, _Traits, _Alloc>& __lhs, +const std::basic_string<_CharT, _Traits, _Alloc>& __rhs) noexcept { + return __lhs.compare(__rhs) <=> 0; +} Review Comment: good idea, I've simplified it in (we only need it for plain old std::string anyways) https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/a4becde14199ad2b8f1354de64ed7dad5b6503dd#diff-82cfd335c88c846c59b1b3e46405088a59a5e85560dbb81bf5ebd5d7772457bdR44-R45 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011665191 ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -0,0 +1,551 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PutTCP.h" + +#include +#include + +#include "range/v3/range/conversion.hpp" +#include "range/v3/view/transform.hpp" + +#include "utils/gsl.h" +#include "utils/expected.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/PropertyBuilder.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "controllers/SSLContextService.h" + +#include "asio/ssl.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/write.hpp" +#include "asio/high_resolution_timer.hpp" + +using asio::ip::tcp; +using TcpSocket = asio::ip::tcp::socket; +using SslSocket = asio::ssl::stream; + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname") +->withDescription("The ip address or hostname of the destination.") +->withDefaultValue("localhost") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port") +->withDescription("The port or service on the destination.") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration") +->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout") +->withDescription("The timeout for connecting to and communicating with the destination.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile") +->withDescription("Specifies whether to send each FlowFile's content on an individual connection.") +->withDefaultValue(false) +->isRequired(true) +->supportsExpressionLanguage(false) +->build(); + +const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter") +->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. " + "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. " + "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.") +->isRequired(false) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service") +->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") +->isRequired(false) +->asType() +->build(); + +const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer") +->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.") +->isRequired(false) +->asType() +->build(); + +const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; +const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + +PutTCP::PutTCP(con
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011662554 ## extensions/standard-processors/processors/PutTCP.h: ## @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "Processor.h" +#include "utils/Export.h" +#include "controllers/SSLContextService.h" + +#include "utils/expected.h" +#include "utils/StringUtils.h" // for string <=> on libc++ + +namespace org::apache::nifi::minifi::processors { + +class ConnectionId { + public: + ConnectionId(std::string hostname, std::string port) : hostname_(std::move(hostname)), port_(std::move(port)) {} + + struct hash { +std::size_t operator () (const ConnectionId& connection_id) const { + return utils::hash_combine(std::hash{}(connection_id.hostname_), std::hash{}(connection_id.port_)); +} + }; Review Comment: I can see your point :+1: I've changed it in https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/a4becde14199ad2b8f1354de64ed7dad5b6503dd#diff-2259b226acd731ddec22f739e72603e60fdcfaddbc203d6e20f44534f7a6a5eeR51-R59 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011661453 ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -0,0 +1,551 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PutTCP.h" + +#include +#include + +#include "range/v3/range/conversion.hpp" +#include "range/v3/view/transform.hpp" + +#include "utils/gsl.h" +#include "utils/expected.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/PropertyBuilder.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "controllers/SSLContextService.h" + +#include "asio/ssl.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/write.hpp" +#include "asio/high_resolution_timer.hpp" + +using asio::ip::tcp; +using TcpSocket = asio::ip::tcp::socket; +using SslSocket = asio::ssl::stream; + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname") +->withDescription("The ip address or hostname of the destination.") +->withDefaultValue("localhost") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port") +->withDescription("The port or service on the destination.") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration") +->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout") +->withDescription("The timeout for connecting to and communicating with the destination.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile") +->withDescription("Specifies whether to send each FlowFile's content on an individual connection.") +->withDefaultValue(false) +->isRequired(true) +->supportsExpressionLanguage(false) +->build(); + +const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter") +->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. " + "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. " + "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.") +->isRequired(false) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service") +->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") +->isRequired(false) +->asType() +->build(); + +const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer") +->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.") +->isRequired(false) +->asType() +->build(); + +const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; +const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + +PutTCP::PutTCP(con
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011660327 ## extensions/standard-processors/tests/unit/PutTCPTests.cpp: ## @@ -0,0 +1,454 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "SingleProcessorTestController.h" +#include "Catch.h" +#include "PutTCP.h" +#include "controllers/SSLContextService.h" +#include "core/ProcessSession.h" +#include "utils/net/TcpServer.h" +#include "utils/net/SslServer.h" +#include "utils/expected.h" +#include "utils/StringUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +using controllers::SSLContextService; + +namespace { +using utils::net::TcpSession; +using utils::net::TcpServer; + +using utils::net::SslSession; +using utils::net::SslServer; + +class ISessionAwareServer { + public: + [[nodiscard]] virtual size_t getNumberOfSessions() const = 0; + virtual void closeSessions() = 0; +}; + +template +class SessionAwareServer : public ISessionAwareServer { + protected: + size_t getNumberOfSessions() const override { +std::lock_guard lock_guard{mutex_}; +return sessions_.size(); + } + + void closeSessions() override { +std::lock_guard lock_guard{mutex_}; +for (const auto& session_weak : sessions_) { + if (auto session = session_weak.lock()) { +auto& socket = session->getSocket(); +if (socket.is_open()) { + socket.shutdown(asio::ip::tcp::socket::shutdown_both); + session->getSocket().close(); +} + } +} + } + + mutable std::mutex mutex_; + std::vector> sessions_; +}; + +class SessionAwareTcpServer : public TcpServer, public SessionAwareServer { + public: + using TcpServer::TcpServer; + + protected: + std::shared_ptr createSession() override { +std::lock_guard lock_guard{mutex_}; +auto session = TcpServer::createSession(); +logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get()); +sessions_.emplace_back(session); +return session; + } +}; + +class SessionAwareSslServer : public SslServer, public SessionAwareServer { + public: + using SslServer::SslServer; + + protected: + std::shared_ptr createSession() override { +std::lock_guard lock_guard{mutex_}; +auto session = SslServer::createSession(); +logger_->log_trace("SessionAwareSslServer::createSession %p", session.get()); +sessions_.emplace_back(session); +return session; + } +}; + +utils::net::SslData createSslDataForServer() { + const std::filesystem::path executable_dir = minifi::utils::file::FileUtils::get_executable_dir(); + utils::net::SslData ssl_data; + ssl_data.ca_loc = (executable_dir / "resources" / "ca_A.crt").string(); + ssl_data.cert_loc = (executable_dir / "resources" / "localhost_by_A.pem").string(); + ssl_data.key_loc = (executable_dir / "resources" / "localhost_by_A.pem").string(); + return ssl_data; +} +} // namespace + +class PutTCPTestFixture { + public: + PutTCPTestFixture() { +LogTestController::getInstance().setTrace(); +LogTestController::getInstance().setInfo(); +LogTestController::getInstance().setTrace(); +put_tcp_->setProperty(PutTCP::Hostname, "${literal('localhost')}"); +put_tcp_->setProperty(PutTCP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port_), "')}")); +put_tcp_->setProperty(PutTCP::Timeout, "200 ms"); +put_tcp_->setProperty(PutTCP::OutgoingMessageDelimiter, "\n"); + } + + ~PutTCPTestFixture() { +stopServer(); + } + + void startTCPServer() { +gsl_Expects(!listener_ && !server_thread_.joinable()); +listener_ = std::make_unique(std::nullopt, port_, core::logging::LoggerFactory::getLogger()); +server_thread_ = std::thread([this]() { listener_->run(); }); + } + + void startSSLServer() { +gsl_Expects(!listener_ && !server_thread_.joinable()); +listener_ = std::make_unique(std::nullopt, +port_, +core::logging::LoggerFactory::getLogger(), +createSslDataForServer(), +utils::net::SslServer::ClientAuthOption::REQUIRED); +server_thread_ = std::thread([this]() { lis
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011658749 ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -0,0 +1,551 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PutTCP.h" + +#include +#include + +#include "range/v3/range/conversion.hpp" +#include "range/v3/view/transform.hpp" + +#include "utils/gsl.h" +#include "utils/expected.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/PropertyBuilder.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "controllers/SSLContextService.h" + +#include "asio/ssl.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/write.hpp" +#include "asio/high_resolution_timer.hpp" + +using asio::ip::tcp; +using TcpSocket = asio::ip::tcp::socket; +using SslSocket = asio::ssl::stream; + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname") +->withDescription("The ip address or hostname of the destination.") +->withDefaultValue("localhost") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port") +->withDescription("The port or service on the destination.") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration") +->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout") +->withDescription("The timeout for connecting to and communicating with the destination.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile") +->withDescription("Specifies whether to send each FlowFile's content on an individual connection.") +->withDefaultValue(false) +->isRequired(true) +->supportsExpressionLanguage(false) +->build(); + +const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter") +->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. " + "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. " + "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.") +->isRequired(false) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service") +->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") +->isRequired(false) +->asType() +->build(); + +const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer") +->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.") +->isRequired(false) +->asType() +->build(); + +const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; +const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + +PutTCP::PutTCP(con
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005559661 ## extensions/standard-processors/processors/PutTCP.h: ## @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "Processor.h" +#include "utils/Export.h" +#include "controllers/SSLContextService.h" + +#include "utils/expected.h" +#include "utils/StringUtils.h" // for string <=> on libc++ + +namespace org::apache::nifi::minifi::processors { + +class ConnectionId { + public: + ConnectionId(std::string hostname, std::string port) : hostname_(std::move(hostname)), port_(std::move(port)) {} + + struct hash { +std::size_t operator () (const ConnectionId& connection_id) const { + return utils::hash_combine(std::hash{}(connection_id.hostname_), std::hash{}(connection_id.port_)); +} + }; + + auto operator<=>(const ConnectionId&) const = default; + + std::string& getHostname() { return hostname_; } + std::string& getPort() { return port_; } Review Comment: sure thing :+1: https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/9928eaa75f607d1d337d289c7bbf84f23e20ee5a#diff-2259b226acd731ddec22f739e72603e60fdcfaddbc203d6e20f44534f7a6a5eeR48-R49 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005559282 ## extensions/standard-processors/tests/unit/resources/ca_B.crt: ## @@ -0,0 +1,21 @@ +-BEGIN CERTIFICATE- Review Comment: Its not invalid per se, just the server is configured to use ca_A, so a client configured with a different CA will fail the handshake. ## extensions/standard-processors/tests/unit/resources/bob_by_A.pem: ## @@ -0,0 +1,46 @@ +-BEGIN RSA PRIVATE KEY- Review Comment: sure thing removed them https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/9928eaa75f607d1d337d289c7bbf84f23e20ee5a -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005559107 ## extensions/standard-processors/tests/unit/PutTCPTests.cpp: ## @@ -0,0 +1,454 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "SingleProcessorTestController.h" +#include "Catch.h" +#include "PutTCP.h" +#include "controllers/SSLContextService.h" +#include "core/ProcessSession.h" +#include "utils/net/TcpServer.h" +#include "utils/net/SslServer.h" +#include "utils/expected.h" +#include "utils/StringUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +using controllers::SSLContextService; + +namespace { +using utils::net::TcpSession; +using utils::net::TcpServer; + +using utils::net::SslSession; +using utils::net::SslServer; + +class ISessionAwareServer { + public: + [[nodiscard]] virtual size_t getNumberOfSessions() const = 0; + virtual void closeSessions() = 0; +}; + +template +class SessionAwareServer : public ISessionAwareServer { + protected: + size_t getNumberOfSessions() const override { +std::lock_guard lock_guard{mutex_}; +return sessions_.size(); + } + + void closeSessions() override { +std::lock_guard lock_guard{mutex_}; +for (const auto& session_weak : sessions_) { + if (auto session = session_weak.lock()) { +auto& socket = session->getSocket(); +if (socket.is_open()) { + socket.shutdown(asio::ip::tcp::socket::shutdown_both); + session->getSocket().close(); +} + } +} + } + + mutable std::mutex mutex_; + std::vector> sessions_; +}; + +class SessionAwareTcpServer : public TcpServer, public SessionAwareServer { + public: + using TcpServer::TcpServer; + + protected: + std::shared_ptr createSession() override { +std::lock_guard lock_guard{mutex_}; +auto session = TcpServer::createSession(); +logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get()); +sessions_.emplace_back(session); +return session; + } +}; + +class SessionAwareSslServer : public SslServer, public SessionAwareServer { + public: + using SslServer::SslServer; + + protected: + std::shared_ptr createSession() override { +std::lock_guard lock_guard{mutex_}; +auto session = SslServer::createSession(); +logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get()); +sessions_.emplace_back(session); +return session; + } +}; + +utils::net::SslData createSslDataForServer() { + const std::filesystem::path executable_dir = minifi::utils::file::FileUtils::get_executable_dir(); + utils::net::SslData ssl_data; + ssl_data.ca_loc = (executable_dir / "resources/ca_A.crt").string(); + ssl_data.cert_loc = (executable_dir / "resources/localhost_by_A.pem").string(); + ssl_data.key_loc = (executable_dir / "resources/localhost_by_A.pem").string(); + return ssl_data; +} +} // namespace + +class PutTCPTestFixture { + public: + PutTCPTestFixture() { +LogTestController::getInstance().setTrace(); +LogTestController::getInstance().setInfo(); +LogTestController::getInstance().setTrace(); +put_tcp_->setProperty(PutTCP::Hostname, "${literal('localhost')}"); +put_tcp_->setProperty(PutTCP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port_), "')}")); +put_tcp_->setProperty(PutTCP::Timeout, "200 ms"); +put_tcp_->setProperty(PutTCP::OutgoingMessageDelimiter, "\n"); + } + + ~PutTCPTestFixture() { +stopServer(); + } + + void startTCPServer() { +gsl_Expects(!listener_ && !server_thread_.joinable()); +listener_ = std::make_unique(std::nullopt, port_, core::logging::LoggerFactory().getLogger()); Review Comment: you are right, changed these and the one in PutUDPTests.cpp (where I probably copy pasted it from) https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/9928eaa75f607d1d337d289c7bbf84f23e20ee5a#diff-f690439939269c49549e72231c05edf2e8ba912ec3fe206031d6465b7d161779R60 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL abov
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005557987 ## extensions/standard-processors/tests/unit/PutTCPTests.cpp: ## @@ -0,0 +1,454 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "SingleProcessorTestController.h" +#include "Catch.h" +#include "PutTCP.h" +#include "controllers/SSLContextService.h" +#include "core/ProcessSession.h" +#include "utils/net/TcpServer.h" +#include "utils/net/SslServer.h" +#include "utils/expected.h" +#include "utils/StringUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +using controllers::SSLContextService; + +namespace { +using utils::net::TcpSession; +using utils::net::TcpServer; + +using utils::net::SslSession; +using utils::net::SslServer; + +class ISessionAwareServer { + public: + [[nodiscard]] virtual size_t getNumberOfSessions() const = 0; + virtual void closeSessions() = 0; +}; + +template +class SessionAwareServer : public ISessionAwareServer { + protected: + size_t getNumberOfSessions() const override { +std::lock_guard lock_guard{mutex_}; +return sessions_.size(); + } + + void closeSessions() override { +std::lock_guard lock_guard{mutex_}; +for (const auto& session_weak : sessions_) { + if (auto session = session_weak.lock()) { +auto& socket = session->getSocket(); +if (socket.is_open()) { + socket.shutdown(asio::ip::tcp::socket::shutdown_both); + session->getSocket().close(); +} + } +} + } + + mutable std::mutex mutex_; + std::vector> sessions_; +}; + +class SessionAwareTcpServer : public TcpServer, public SessionAwareServer { + public: + using TcpServer::TcpServer; + + protected: + std::shared_ptr createSession() override { +std::lock_guard lock_guard{mutex_}; +auto session = TcpServer::createSession(); +logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get()); +sessions_.emplace_back(session); +return session; + } +}; + +class SessionAwareSslServer : public SslServer, public SessionAwareServer { + public: + using SslServer::SslServer; + + protected: + std::shared_ptr createSession() override { +std::lock_guard lock_guard{mutex_}; +auto session = SslServer::createSession(); +logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get()); +sessions_.emplace_back(session); +return session; + } +}; + +utils::net::SslData createSslDataForServer() { + const std::filesystem::path executable_dir = minifi::utils::file::FileUtils::get_executable_dir(); + utils::net::SslData ssl_data; + ssl_data.ca_loc = (executable_dir / "resources/ca_A.crt").string(); + ssl_data.cert_loc = (executable_dir / "resources/localhost_by_A.pem").string(); + ssl_data.key_loc = (executable_dir / "resources/localhost_by_A.pem").string(); Review Comment: good idea, I've changed it in https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/9928eaa75f607d1d337d289c7bbf84f23e20ee5a#diff-bbb56b112b398a6450c23e71e5dbbfb17bf20dad3d2fb908a1c6de26118f9ee8R108-R110 ## extensions/standard-processors/tests/unit/PutTCPTests.cpp: ## @@ -0,0 +1,454 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "SingleProcessorTestController.h" +
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005557725 ## extensions/standard-processors/tests/unit/PutTCPTests.cpp: ## @@ -0,0 +1,454 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "SingleProcessorTestController.h" +#include "Catch.h" +#include "PutTCP.h" +#include "controllers/SSLContextService.h" +#include "core/ProcessSession.h" +#include "utils/net/TcpServer.h" +#include "utils/net/SslServer.h" +#include "utils/expected.h" +#include "utils/StringUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +using controllers::SSLContextService; + +namespace { +using utils::net::TcpSession; +using utils::net::TcpServer; + +using utils::net::SslSession; +using utils::net::SslServer; + +class ISessionAwareServer { + public: + [[nodiscard]] virtual size_t getNumberOfSessions() const = 0; + virtual void closeSessions() = 0; +}; + +template +class SessionAwareServer : public ISessionAwareServer { + protected: + size_t getNumberOfSessions() const override { +std::lock_guard lock_guard{mutex_}; +return sessions_.size(); + } + + void closeSessions() override { +std::lock_guard lock_guard{mutex_}; +for (const auto& session_weak : sessions_) { + if (auto session = session_weak.lock()) { +auto& socket = session->getSocket(); +if (socket.is_open()) { + socket.shutdown(asio::ip::tcp::socket::shutdown_both); + session->getSocket().close(); +} + } +} + } + + mutable std::mutex mutex_; + std::vector> sessions_; +}; + +class SessionAwareTcpServer : public TcpServer, public SessionAwareServer { + public: + using TcpServer::TcpServer; + + protected: + std::shared_ptr createSession() override { +std::lock_guard lock_guard{mutex_}; +auto session = TcpServer::createSession(); +logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get()); +sessions_.emplace_back(session); +return session; + } +}; + +class SessionAwareSslServer : public SslServer, public SessionAwareServer { + public: + using SslServer::SslServer; + + protected: + std::shared_ptr createSession() override { +std::lock_guard lock_guard{mutex_}; +auto session = SslServer::createSession(); +logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get()); Review Comment: :+1: https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/9928eaa75f607d1d337d289c7bbf84f23e20ee5a#diff-bbb56b112b398a6450c23e71e5dbbfb17bf20dad3d2fb908a1c6de26118f9ee8R99 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005557011 ## extensions/standard-processors/tests/unit/PutTCPTests.cpp: ## @@ -0,0 +1,454 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "SingleProcessorTestController.h" +#include "Catch.h" +#include "PutTCP.h" +#include "controllers/SSLContextService.h" +#include "core/ProcessSession.h" +#include "utils/net/TcpServer.h" +#include "utils/net/SslServer.h" +#include "utils/expected.h" +#include "utils/StringUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +using controllers::SSLContextService; + +namespace { +using utils::net::TcpSession; +using utils::net::TcpServer; + +using utils::net::SslSession; +using utils::net::SslServer; + +class ISessionAwareServer { + public: + [[nodiscard]] virtual size_t getNumberOfSessions() const = 0; + virtual void closeSessions() = 0; +}; + +template Review Comment: makes sense, I've changed it in https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/9928eaa75f607d1d337d289c7bbf84f23e20ee5a#diff-bbb56b112b398a6450c23e71e5dbbfb17bf20dad3d2fb908a1c6de26118f9ee8R52 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005556676 ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -0,0 +1,551 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PutTCP.h" + +#include +#include + +#include "range/v3/range/conversion.hpp" + +#include "utils/gsl.h" +#include "utils/expected.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/PropertyBuilder.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "controllers/SSLContextService.h" + +#include "asio/ssl.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/write.hpp" +#include "asio/high_resolution_timer.hpp" + +using asio::ip::tcp; +using TcpSocket = asio::ip::tcp::socket; +using SslSocket = asio::ssl::stream; + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname") +->withDescription("The ip address or hostname of the destination.") +->withDefaultValue("localhost") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port") +->withDescription("The port or service on the destination.") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration") +->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout") +->withDescription("The timeout for connecting to and communicating with the destination.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile") +->withDescription("Specifies whether to send each FlowFile's content on an individual connection.") +->withDefaultValue(false) +->isRequired(true) +->supportsExpressionLanguage(false) +->build(); + +const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter") +->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. " + "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. " + "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.") +->isRequired(false) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service") +->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") +->isRequired(false) +->asType() +->build(); + +const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer") +->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.") +->isRequired(false) +->asType() +->build(); + +const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; +const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + +PutTCP::PutTCP(const std::string& name, const utils::Ident
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r100380 ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -0,0 +1,551 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PutTCP.h" + +#include +#include + +#include "range/v3/range/conversion.hpp" + +#include "utils/gsl.h" +#include "utils/expected.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/PropertyBuilder.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "controllers/SSLContextService.h" + +#include "asio/ssl.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/write.hpp" +#include "asio/high_resolution_timer.hpp" + +using asio::ip::tcp; +using TcpSocket = asio::ip::tcp::socket; +using SslSocket = asio::ssl::stream; + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname") +->withDescription("The ip address or hostname of the destination.") +->withDefaultValue("localhost") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port") +->withDescription("The port or service on the destination.") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration") +->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout") +->withDescription("The timeout for connecting to and communicating with the destination.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile") +->withDescription("Specifies whether to send each FlowFile's content on an individual connection.") +->withDefaultValue(false) +->isRequired(true) +->supportsExpressionLanguage(false) +->build(); + +const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter") +->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. " + "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. " + "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.") +->isRequired(false) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service") +->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") +->isRequired(false) +->asType() +->build(); + +const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer") +->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.") +->isRequired(false) +->asType() +->build(); + +const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; +const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + +PutTCP::PutTCP(const std::string& name, const utils::Ident
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r100073 ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -0,0 +1,551 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PutTCP.h" + +#include +#include + +#include "range/v3/range/conversion.hpp" + +#include "utils/gsl.h" +#include "utils/expected.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/PropertyBuilder.h" +#include "core/Resource.h" +#include "core/logging/Logger.h" +#include "controllers/SSLContextService.h" + +#include "asio/ssl.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/write.hpp" +#include "asio/high_resolution_timer.hpp" + +using asio::ip::tcp; +using TcpSocket = asio::ip::tcp::socket; +using SslSocket = asio::ssl::stream; + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::processors { + +const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname") +->withDescription("The ip address or hostname of the destination.") +->withDefaultValue("localhost") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port") +->withDescription("The port or service on the destination.") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration") +->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout") +->withDescription("The timeout for connecting to and communicating with the destination.") +->withDefaultValue("15 seconds") +->isRequired(true) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile") +->withDescription("Specifies whether to send each FlowFile's content on an individual connection.") +->withDefaultValue(false) +->isRequired(true) +->supportsExpressionLanguage(false) +->build(); + +const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter") +->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. " + "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. " + "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.") +->isRequired(false) +->supportsExpressionLanguage(true) +->build(); + +const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service") +->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") +->isRequired(false) +->asType() +->build(); + +const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer") +->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.") +->isRequired(false) +->asType() +->build(); + +const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; +const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + +PutTCP::PutTCP(const std::string& name, const utils::Ident
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005553979 ## libminifi/include/utils/StringUtils.h: ## @@ -37,6 +37,19 @@ #include "utils/gsl.h" #include "utils/meta/detected.h" +// libc++ doesn't define operator<=> on strings, and apparently the operator rewrite rules don't automagically make one +#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000 +#include +#endif + +#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000 +template +constexpr std::strong_ordering operator<=>(const std::basic_string<_CharT, _Traits, _Alloc>& __lhs, +const std::basic_string<_CharT, _Traits, _Alloc>& __rhs) noexcept { + return __lhs.compare(__rhs) <=> 0; +} +#endif Review Comment: thanks, I changed this and the other manually written <=> in https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/3a3b0437cfebdef44a15d54f40222f5e498d -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor
martinzink commented on code in PR #1419: URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005553207 ## PROCESSORS.md: ## @@ -2167,6 +2168,32 @@ In the list below, the names of required properties appear in bold. Any other pr | success | After a successful SQL update operation, the incoming FlowFile sent here | +## PutTCP + +### Description +The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP connection. To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection which is closed after the FlowFile has been sent. + +### Properties +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +||---|--|| +| **Hostname** | localhost | | The ip address or hostname of the destination.**Supports Expression Language: true** | +| **Port** | | | The port or service on the destination.**Supports Expression Language: true** | +| **Idle Connection Expiration** | 15 seconds| | The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.**Supports Expression Language: true** | +| **Timeout**| 15 seconds| | The timeout for connecting to and communicating with the destination.**Supports Expression Language: true** | +| **Connection Per FlowFile**| false | | Specifies whether to send each FlowFile's content on an individual connection. | +| Outgoing Message Delimiter | | | Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should ensur