[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-11-04 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1013921335


##
extensions/standard-processors/processors/PutTCP.cpp:
##
@@ -0,0 +1,582 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include 
+#include 
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+->withDescription("The ip address or hostname of the destination.")
+->withDefaultValue("localhost")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+->withDescription("The port or service on the destination.")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+->withDescription("The timeout for connecting to and communicating with 
the destination.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+->withDefaultValue(false)
+->isRequired(true)
+->supportsExpressionLanguage(false)
+->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+  "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+  "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+->isRequired(false)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+constexpr size_t chunk_size = 1024;
+
+PutTCP::PutTCP(cons

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-11-04 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1013917235


##
extensions/standard-processors/processors/PutTCP.cpp:
##
@@ -0,0 +1,582 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include 
+#include 
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+->withDescription("The ip address or hostname of the destination.")
+->withDefaultValue("localhost")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+->withDescription("The port or service on the destination.")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+->withDescription("The timeout for connecting to and communicating with 
the destination.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+->withDefaultValue(false)
+->isRequired(true)
+->supportsExpressionLanguage(false)
+->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+  "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+  "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+->isRequired(false)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+constexpr size_t chunk_size = 1024;
+
+PutTCP::PutTCP(cons

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-11-04 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1013916908


##
extensions/standard-processors/processors/PutTCP.cpp:
##
@@ -0,0 +1,582 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include 
+#include 
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+->withDescription("The ip address or hostname of the destination.")
+->withDefaultValue("localhost")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+->withDescription("The port or service on the destination.")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+->withDescription("The timeout for connecting to and communicating with 
the destination.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+->withDefaultValue(false)
+->isRequired(true)
+->supportsExpressionLanguage(false)
+->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+  "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+  "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+->isRequired(false)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+constexpr size_t chunk_size = 1024;
+
+PutTCP::PutTCP(cons

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-11-04 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1013913817


##
extensions/standard-processors/processors/PutTCP.cpp:
##
@@ -0,0 +1,582 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include 
+#include 
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+->withDescription("The ip address or hostname of the destination.")
+->withDefaultValue("localhost")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+->withDescription("The port or service on the destination.")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+->withDescription("The timeout for connecting to and communicating with 
the destination.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+->withDefaultValue(false)
+->isRequired(true)
+->supportsExpressionLanguage(false)
+->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+  "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+  "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+->isRequired(false)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+constexpr size_t chunk_size = 1024;
+
+PutTCP::PutTCP(cons

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-11-02 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011835200


##
extensions/standard-processors/processors/PutTCP.cpp:
##
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include 
+#include 
+
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/transform.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+->withDescription("The ip address or hostname of the destination.")
+->withDefaultValue("localhost")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+->withDescription("The port or service on the destination.")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+->withDescription("The timeout for connecting to and communicating with 
the destination.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+->withDefaultValue(false)
+->isRequired(true)
+->supportsExpressionLanguage(false)
+->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+  "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+  "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+->isRequired(false)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(con

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-11-02 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011816544


##
extensions/standard-processors/processors/PutTCP.cpp:
##
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include 
+#include 
+
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/transform.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+->withDescription("The ip address or hostname of the destination.")
+->withDefaultValue("localhost")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+->withDescription("The port or service on the destination.")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+->withDescription("The timeout for connecting to and communicating with 
the destination.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+->withDefaultValue(false)
+->isRequired(true)
+->supportsExpressionLanguage(false)
+->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+  "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+  "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+->isRequired(false)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(con

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-11-02 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011666740


##
libminifi/include/utils/StringUtils.h:
##
@@ -37,6 +37,19 @@
 #include "utils/gsl.h"
 #include "utils/meta/detected.h"
 
+// libc++ doesn't define operator<=> on strings, and apparently the operator 
rewrite rules don't automagically make one
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+#include 
+#endif
+
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+template
+constexpr std::strong_ordering operator<=>(const std::basic_string<_CharT, 
_Traits, _Alloc>& __lhs,
+const std::basic_string<_CharT, _Traits, _Alloc>& __rhs) noexcept {
+  return __lhs.compare(__rhs) <=> 0;
+}

Review Comment:
   good idea, I've simplified it in (we only need it for plain old std::string 
anyways) 
https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/a4becde14199ad2b8f1354de64ed7dad5b6503dd#diff-82cfd335c88c846c59b1b3e46405088a59a5e85560dbb81bf5ebd5d7772457bdR44-R45



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-11-02 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011665191


##
extensions/standard-processors/processors/PutTCP.cpp:
##
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include 
+#include 
+
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/transform.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+->withDescription("The ip address or hostname of the destination.")
+->withDefaultValue("localhost")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+->withDescription("The port or service on the destination.")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+->withDescription("The timeout for connecting to and communicating with 
the destination.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+->withDefaultValue(false)
+->isRequired(true)
+->supportsExpressionLanguage(false)
+->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+  "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+  "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+->isRequired(false)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(con

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-11-02 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011662554


##
extensions/standard-processors/processors/PutTCP.h:
##
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "Processor.h"
+#include "utils/Export.h"
+#include "controllers/SSLContextService.h"
+
+#include "utils/expected.h"
+#include "utils/StringUtils.h"  // for string <=> on libc++
+
+namespace org::apache::nifi::minifi::processors {
+
+class ConnectionId {
+ public:
+  ConnectionId(std::string hostname, std::string port) : 
hostname_(std::move(hostname)), port_(std::move(port)) {}
+
+  struct hash {
+std::size_t operator () (const ConnectionId& connection_id) const {
+  return 
utils::hash_combine(std::hash{}(connection_id.hostname_), 
std::hash{}(connection_id.port_));
+}
+  };

Review Comment:
   I can see your point :+1: I've changed it in 
https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/a4becde14199ad2b8f1354de64ed7dad5b6503dd#diff-2259b226acd731ddec22f739e72603e60fdcfaddbc203d6e20f44534f7a6a5eeR51-R59



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-11-02 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011661453


##
extensions/standard-processors/processors/PutTCP.cpp:
##
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include 
+#include 
+
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/transform.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+->withDescription("The ip address or hostname of the destination.")
+->withDefaultValue("localhost")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+->withDescription("The port or service on the destination.")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+->withDescription("The timeout for connecting to and communicating with 
the destination.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+->withDefaultValue(false)
+->isRequired(true)
+->supportsExpressionLanguage(false)
+->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+  "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+  "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+->isRequired(false)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(con

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-11-02 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011660327


##
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+std::lock_guard lock_guard{mutex_};
+return sessions_.size();
+  }
+
+  void closeSessions() override {
+std::lock_guard lock_guard{mutex_};
+for (const auto& session_weak : sessions_) {
+  if (auto session = session_weak.lock()) {
+auto& socket = session->getSocket();
+if (socket.is_open()) {
+  socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+  session->getSocket().close();
+}
+  }
+}
+  }
+
+  mutable std::mutex mutex_;
+  std::vector> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public 
SessionAwareServer {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr createSession() override {
+std::lock_guard lock_guard{mutex_};
+auto session = TcpServer::createSession();
+logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+sessions_.emplace_back(session);
+return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public 
SessionAwareServer {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr createSession() override {
+std::lock_guard lock_guard{mutex_};
+auto session = SslServer::createSession();
+logger_->log_trace("SessionAwareSslServer::createSession %p", 
session.get());
+sessions_.emplace_back(session);
+return session;
+  }
+};
+
+utils::net::SslData createSslDataForServer() {
+  const std::filesystem::path executable_dir = 
minifi::utils::file::FileUtils::get_executable_dir();
+  utils::net::SslData ssl_data;
+  ssl_data.ca_loc = (executable_dir / "resources" / "ca_A.crt").string();
+  ssl_data.cert_loc = (executable_dir / "resources" / 
"localhost_by_A.pem").string();
+  ssl_data.key_loc = (executable_dir / "resources" / 
"localhost_by_A.pem").string();
+  return ssl_data;
+}
+}  // namespace
+
+class PutTCPTestFixture {
+ public:
+  PutTCPTestFixture() {
+LogTestController::getInstance().setTrace();
+LogTestController::getInstance().setInfo();
+LogTestController::getInstance().setTrace();
+put_tcp_->setProperty(PutTCP::Hostname, "${literal('localhost')}");
+put_tcp_->setProperty(PutTCP::Port, 
utils::StringUtils::join_pack("${literal('", std::to_string(port_), "')}"));
+put_tcp_->setProperty(PutTCP::Timeout, "200 ms");
+put_tcp_->setProperty(PutTCP::OutgoingMessageDelimiter, "\n");
+  }
+
+  ~PutTCPTestFixture() {
+stopServer();
+  }
+
+  void startTCPServer() {
+gsl_Expects(!listener_ && !server_thread_.joinable());
+listener_ = std::make_unique(std::nullopt, port_, 
core::logging::LoggerFactory::getLogger());
+server_thread_ = std::thread([this]() { listener_->run(); });
+  }
+
+  void startSSLServer() {
+gsl_Expects(!listener_ && !server_thread_.joinable());
+listener_ = std::make_unique(std::nullopt,
+port_,
+core::logging::LoggerFactory::getLogger(),
+createSslDataForServer(),
+utils::net::SslServer::ClientAuthOption::REQUIRED);
+server_thread_ = std::thread([this]() { lis

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-11-02 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1011658749


##
extensions/standard-processors/processors/PutTCP.cpp:
##
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include 
+#include 
+
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/transform.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+->withDescription("The ip address or hostname of the destination.")
+->withDefaultValue("localhost")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+->withDescription("The port or service on the destination.")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+->withDescription("The timeout for connecting to and communicating with 
the destination.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+->withDefaultValue(false)
+->isRequired(true)
+->supportsExpressionLanguage(false)
+->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+  "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+  "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+->isRequired(false)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(con

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-10-26 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005559661


##
extensions/standard-processors/processors/PutTCP.h:
##
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "Processor.h"
+#include "utils/Export.h"
+#include "controllers/SSLContextService.h"
+
+#include "utils/expected.h"
+#include "utils/StringUtils.h"  // for string <=> on libc++
+
+namespace org::apache::nifi::minifi::processors {
+
+class ConnectionId {
+ public:
+  ConnectionId(std::string hostname, std::string port) : 
hostname_(std::move(hostname)), port_(std::move(port)) {}
+
+  struct hash {
+std::size_t operator () (const ConnectionId& connection_id) const {
+  return 
utils::hash_combine(std::hash{}(connection_id.hostname_), 
std::hash{}(connection_id.port_));
+}
+  };
+
+  auto operator<=>(const ConnectionId&) const = default;
+
+  std::string& getHostname() { return hostname_; }
+  std::string& getPort() { return port_; }

Review Comment:
   sure thing :+1:  
https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/9928eaa75f607d1d337d289c7bbf84f23e20ee5a#diff-2259b226acd731ddec22f739e72603e60fdcfaddbc203d6e20f44534f7a6a5eeR48-R49



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-10-26 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005559282


##
extensions/standard-processors/tests/unit/resources/ca_B.crt:
##
@@ -0,0 +1,21 @@
+-BEGIN CERTIFICATE-

Review Comment:
   Its not invalid per se, just the server is configured to use ca_A, so a 
client configured with a different CA will fail the handshake.



##
extensions/standard-processors/tests/unit/resources/bob_by_A.pem:
##
@@ -0,0 +1,46 @@
+-BEGIN RSA PRIVATE KEY-

Review Comment:
   sure thing removed them 
https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/9928eaa75f607d1d337d289c7bbf84f23e20ee5a



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-10-26 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005559107


##
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+std::lock_guard lock_guard{mutex_};
+return sessions_.size();
+  }
+
+  void closeSessions() override {
+std::lock_guard lock_guard{mutex_};
+for (const auto& session_weak : sessions_) {
+  if (auto session = session_weak.lock()) {
+auto& socket = session->getSocket();
+if (socket.is_open()) {
+  socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+  session->getSocket().close();
+}
+  }
+}
+  }
+
+  mutable std::mutex mutex_;
+  std::vector> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public 
SessionAwareServer {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr createSession() override {
+std::lock_guard lock_guard{mutex_};
+auto session = TcpServer::createSession();
+logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+sessions_.emplace_back(session);
+return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public 
SessionAwareServer {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr createSession() override {
+std::lock_guard lock_guard{mutex_};
+auto session = SslServer::createSession();
+logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+sessions_.emplace_back(session);
+return session;
+  }
+};
+
+utils::net::SslData createSslDataForServer() {
+  const std::filesystem::path executable_dir = 
minifi::utils::file::FileUtils::get_executable_dir();
+  utils::net::SslData ssl_data;
+  ssl_data.ca_loc = (executable_dir / "resources/ca_A.crt").string();
+  ssl_data.cert_loc = (executable_dir / 
"resources/localhost_by_A.pem").string();
+  ssl_data.key_loc = (executable_dir / 
"resources/localhost_by_A.pem").string();
+  return ssl_data;
+}
+}  // namespace
+
+class PutTCPTestFixture {
+ public:
+  PutTCPTestFixture() {
+LogTestController::getInstance().setTrace();
+LogTestController::getInstance().setInfo();
+LogTestController::getInstance().setTrace();
+put_tcp_->setProperty(PutTCP::Hostname, "${literal('localhost')}");
+put_tcp_->setProperty(PutTCP::Port, 
utils::StringUtils::join_pack("${literal('", std::to_string(port_), "')}"));
+put_tcp_->setProperty(PutTCP::Timeout, "200 ms");
+put_tcp_->setProperty(PutTCP::OutgoingMessageDelimiter, "\n");
+  }
+
+  ~PutTCPTestFixture() {
+stopServer();
+  }
+
+  void startTCPServer() {
+gsl_Expects(!listener_ && !server_thread_.joinable());
+listener_ = std::make_unique(std::nullopt, port_, 
core::logging::LoggerFactory().getLogger());

Review Comment:
   you are right, changed these and the one in PutUDPTests.cpp (where I 
probably copy pasted it from)  
https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/9928eaa75f607d1d337d289c7bbf84f23e20ee5a#diff-f690439939269c49549e72231c05edf2e8ba912ec3fe206031d6465b7d161779R60



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL abov

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-10-26 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005557987


##
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+std::lock_guard lock_guard{mutex_};
+return sessions_.size();
+  }
+
+  void closeSessions() override {
+std::lock_guard lock_guard{mutex_};
+for (const auto& session_weak : sessions_) {
+  if (auto session = session_weak.lock()) {
+auto& socket = session->getSocket();
+if (socket.is_open()) {
+  socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+  session->getSocket().close();
+}
+  }
+}
+  }
+
+  mutable std::mutex mutex_;
+  std::vector> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public 
SessionAwareServer {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr createSession() override {
+std::lock_guard lock_guard{mutex_};
+auto session = TcpServer::createSession();
+logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+sessions_.emplace_back(session);
+return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public 
SessionAwareServer {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr createSession() override {
+std::lock_guard lock_guard{mutex_};
+auto session = SslServer::createSession();
+logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+sessions_.emplace_back(session);
+return session;
+  }
+};
+
+utils::net::SslData createSslDataForServer() {
+  const std::filesystem::path executable_dir = 
minifi::utils::file::FileUtils::get_executable_dir();
+  utils::net::SslData ssl_data;
+  ssl_data.ca_loc = (executable_dir / "resources/ca_A.crt").string();
+  ssl_data.cert_loc = (executable_dir / 
"resources/localhost_by_A.pem").string();
+  ssl_data.key_loc = (executable_dir / 
"resources/localhost_by_A.pem").string();

Review Comment:
   good idea, I've changed it in 
https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/9928eaa75f607d1d337d289c7bbf84f23e20ee5a#diff-bbb56b112b398a6450c23e71e5dbbfb17bf20dad3d2fb908a1c6de26118f9ee8R108-R110



##
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include "SingleProcessorTestController.h"
+

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-10-26 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005557725


##
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+std::lock_guard lock_guard{mutex_};
+return sessions_.size();
+  }
+
+  void closeSessions() override {
+std::lock_guard lock_guard{mutex_};
+for (const auto& session_weak : sessions_) {
+  if (auto session = session_weak.lock()) {
+auto& socket = session->getSocket();
+if (socket.is_open()) {
+  socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+  session->getSocket().close();
+}
+  }
+}
+  }
+
+  mutable std::mutex mutex_;
+  std::vector> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public 
SessionAwareServer {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr createSession() override {
+std::lock_guard lock_guard{mutex_};
+auto session = TcpServer::createSession();
+logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+sessions_.emplace_back(session);
+return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public 
SessionAwareServer {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr createSession() override {
+std::lock_guard lock_guard{mutex_};
+auto session = SslServer::createSession();
+logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());

Review Comment:
   :+1:  
https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/9928eaa75f607d1d337d289c7bbf84f23e20ee5a#diff-bbb56b112b398a6450c23e71e5dbbfb17bf20dad3d2fb908a1c6de26118f9ee8R99



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-10-26 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005557011


##
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template

Review Comment:
   makes sense, I've changed it in 
https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/9928eaa75f607d1d337d289c7bbf84f23e20ee5a#diff-bbb56b112b398a6450c23e71e5dbbfb17bf20dad3d2fb908a1c6de26118f9ee8R52



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-10-26 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005556676


##
extensions/standard-processors/processors/PutTCP.cpp:
##
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include 
+#include 
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+->withDescription("The ip address or hostname of the destination.")
+->withDefaultValue("localhost")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+->withDescription("The port or service on the destination.")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+->withDescription("The timeout for connecting to and communicating with 
the destination.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+->withDefaultValue(false)
+->isRequired(true)
+->supportsExpressionLanguage(false)
+->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+  "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+  "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+->isRequired(false)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(const std::string& name, const utils::Ident

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-10-26 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r100380


##
extensions/standard-processors/processors/PutTCP.cpp:
##
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include 
+#include 
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+->withDescription("The ip address or hostname of the destination.")
+->withDefaultValue("localhost")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+->withDescription("The port or service on the destination.")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+->withDescription("The timeout for connecting to and communicating with 
the destination.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+->withDefaultValue(false)
+->isRequired(true)
+->supportsExpressionLanguage(false)
+->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+  "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+  "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+->isRequired(false)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(const std::string& name, const utils::Ident

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-10-26 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r100073


##
extensions/standard-processors/processors/PutTCP.cpp:
##
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include 
+#include 
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+->withDescription("The ip address or hostname of the destination.")
+->withDefaultValue("localhost")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+->withDescription("The port or service on the destination.")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+->withDescription("The timeout for connecting to and communicating with 
the destination.")
+->withDefaultValue("15 seconds")
+->isRequired(true)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+->withDefaultValue(false)
+->isRequired(true)
+->supportsExpressionLanguage(false)
+->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+  "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+  "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+->isRequired(false)
+->supportsExpressionLanguage(true)
+->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+->isRequired(false)
+->asType()
+->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(const std::string& name, const utils::Ident

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-10-26 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005553979


##
libminifi/include/utils/StringUtils.h:
##
@@ -37,6 +37,19 @@
 #include "utils/gsl.h"
 #include "utils/meta/detected.h"
 
+// libc++ doesn't define operator<=> on strings, and apparently the operator 
rewrite rules don't automagically make one
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+#include 
+#endif
+
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+template
+constexpr std::strong_ordering operator<=>(const std::basic_string<_CharT, 
_Traits, _Alloc>& __lhs,
+const std::basic_string<_CharT, _Traits, _Alloc>& __rhs) noexcept {
+  return __lhs.compare(__rhs) <=> 0;
+}
+#endif

Review Comment:
   thanks, I changed this and the other manually written <=> in 
https://github.com/apache/nifi-minifi-cpp/pull/1419/commits/3a3b0437cfebdef44a15d54f40222f5e498d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

2022-10-26 Thread GitBox


martinzink commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1005553207


##
PROCESSORS.md:
##
@@ -2167,6 +2168,32 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | success | After a successful SQL update operation, the incoming FlowFile 
sent here |
 
 
+## PutTCP
+
+### Description
+The PutTCP processor receives a FlowFile and transmits the FlowFile content 
over a TCP connection to the configured TCP server. By default, the FlowFiles 
are transmitted over the same TCP connection. To assist the TCP server with 
determining message boundaries, an optional "Outgoing Message Delimiter" string 
can be configured which is appended to the end of each FlowFiles content when 
it is transmitted over the TCP connection. An optional "Connection Per 
FlowFile" parameter can be specified to change the behaviour so that each 
FlowFiles content is transmitted over a single TCP connection which is closed 
after the FlowFile has been sent.
+
+### Properties
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name   | Default Value | Allowable Values | 
Description 




   |
+||---|--||
+| **Hostname**   | localhost |  | The ip 
address or hostname of the destination.**Supports Expression Language: 
true**  



|
+| **Port**   |   |  | The port 
or service on the destination.**Supports Expression Language: true**   



  |
+| **Idle Connection Expiration** | 15 seconds|  | The 
amount of time a connection should be held open without being used before 
closing the connection. A value of 0 seconds will disable this 
feature.**Supports Expression Language: true** 


  |
+| **Timeout**| 15 seconds|  | The 
timeout for connecting to and communicating with the 
destination.**Supports Expression Language: true** 



  |
+| **Connection Per FlowFile**| false |  | 
Specifies whether to send each FlowFile's content on an individual connection.  




   |
+| Outgoing Message Delimiter |   |  | 
Specifies the delimiter to use when sending messages out over the same TCP 
stream. The delimiter is appended to each FlowFile message that is transmitted 
over the stream so that the receiver can determine when one message ends and 
the next message begins. Users should ensur