Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-16 Thread via GitHub


szaszm closed pull request #1695: MINIFICPP-2261 Add processor for pushing logs 
to Grafana Loki through REST API
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-15 Thread via GitHub


lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1452385790


##
extensions/grafana-loki/PushGrafanaLokiREST.h:
##
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "controllers/SSLContextService.h"
+#include "core/Processor.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/PropertyType.h"
+#include "core/RelationshipDefinition.h"
+#include "client/HTTPClient.h"
+#include "core/StateManager.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+class PushGrafanaLokiREST : public core::Processor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push 
processor that uses the Grafana Loki REST API. The processor expects each flow 
file to contain a single log line to be "
+  "pushed to Grafana 
Loki, therefore it is usually used together with the TailFile processor.";
+
+  explicit PushGrafanaLokiREST(const std::string& name, const 
utils::Identifier& uuid = {})
+  : Processor(name, uuid),
+log_batch_(logger_) {
+  }
+  ~PushGrafanaLokiREST() override = default;
+
+  EXTENSIONAPI static constexpr auto Url = 
core::PropertyDefinitionBuilder<>::createProperty("Url")
+.withDescription("Url of the Grafana Loki server. For example 
http://localhost:3100/.;)
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto StreamLabels = 
core::PropertyDefinitionBuilder<>::createProperty("Stream Labels")
+.withDescription("Comma separated list of = labels to be sent 
as stream labels.")
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata 
Attributes")
+.withDescription("Comma separated list of attributes to be sent as log 
line metadata for a log line.")
+.build();
+  EXTENSIONAPI static constexpr auto TenantID = 
core::PropertyDefinitionBuilder<>::createProperty("Tenant ID")
+.withDescription("The tenant ID used by default to push logs to Grafana 
Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant 
mode and no X-Scope-OrgID header is sent.")
+.build();
+  EXTENSIONAPI static constexpr auto MaxBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size")
+.withDescription("The maximum number of flow files to process at a time. 
If not set, or set to 0, all FlowFiles will be processed at once.")
+.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE)
+.withDefaultValue("100")
+.build();
+  EXTENSIONAPI static constexpr auto LogLineBatchWait = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait")
+.withDescription("Time to wait before sending a log line batch to Grafana 
Loki, full or not. If this property and Log Line Batch Size are both unset, "
+ "the log batch of the current trigger will be sent 
immediately.")
+.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+.build();
+  EXTENSIONAPI static constexpr auto LogLineBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size")
+.withDescription("Number of log lines to send in a batch to Loki. If this 
property and Log Line Batch Wait are both unset, "
+ "the log batch of the current trigger will be sent 
immediately.")
+.withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE)
+.build();
+  EXTENSIONAPI static constexpr auto ConnectTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout")
+.withDescription("Max wait time for connection to the Grafana Loki 
service.")
+.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+.withDefaultValue("5 s")
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto ReadTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Read Timeout")
+.withDescription("Max wait time for response from remote 

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-15 Thread via GitHub


adamdebreceni commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1452371311


##
extensions/grafana-loki/PushGrafanaLokiREST.h:
##
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "controllers/SSLContextService.h"
+#include "core/Processor.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/PropertyType.h"
+#include "core/RelationshipDefinition.h"
+#include "client/HTTPClient.h"
+#include "core/StateManager.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+class PushGrafanaLokiREST : public core::Processor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push 
processor that uses the Grafana Loki REST API. The processor expects each flow 
file to contain a single log line to be "
+  "pushed to Grafana 
Loki, therefore it is usually used together with the TailFile processor.";
+
+  explicit PushGrafanaLokiREST(const std::string& name, const 
utils::Identifier& uuid = {})
+  : Processor(name, uuid),
+log_batch_(logger_) {
+  }
+  ~PushGrafanaLokiREST() override = default;
+
+  EXTENSIONAPI static constexpr auto Url = 
core::PropertyDefinitionBuilder<>::createProperty("Url")
+.withDescription("Url of the Grafana Loki server. For example 
http://localhost:3100/.;)
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto StreamLabels = 
core::PropertyDefinitionBuilder<>::createProperty("Stream Labels")
+.withDescription("Comma separated list of = labels to be sent 
as stream labels.")
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata 
Attributes")
+.withDescription("Comma separated list of attributes to be sent as log 
line metadata for a log line.")
+.build();
+  EXTENSIONAPI static constexpr auto TenantID = 
core::PropertyDefinitionBuilder<>::createProperty("Tenant ID")
+.withDescription("The tenant ID used by default to push logs to Grafana 
Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant 
mode and no X-Scope-OrgID header is sent.")
+.build();
+  EXTENSIONAPI static constexpr auto MaxBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size")
+.withDescription("The maximum number of flow files to process at a time. 
If not set, or set to 0, all FlowFiles will be processed at once.")
+.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE)
+.withDefaultValue("100")
+.build();
+  EXTENSIONAPI static constexpr auto LogLineBatchWait = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait")
+.withDescription("Time to wait before sending a log line batch to Grafana 
Loki, full or not. If this property and Log Line Batch Size are both unset, "
+ "the log batch of the current trigger will be sent 
immediately.")
+.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+.build();
+  EXTENSIONAPI static constexpr auto LogLineBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size")
+.withDescription("Number of log lines to send in a batch to Loki. If this 
property and Log Line Batch Wait are both unset, "
+ "the log batch of the current trigger will be sent 
immediately.")
+.withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE)
+.build();
+  EXTENSIONAPI static constexpr auto ConnectTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout")
+.withDescription("Max wait time for connection to the Grafana Loki 
service.")
+.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+.withDefaultValue("5 s")
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto ReadTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Read Timeout")
+.withDescription("Max wait time for response from remote 

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-15 Thread via GitHub


lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1452345696


##
PROCESSORS.md:
##
@@ -2196,6 +2197,40 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | failure | FlowFiles that failed to be sent to the destination are 
transferred to this relationship |
 
 
+## PushGrafanaLokiREST
+
+### Description
+
+A Grafana Loki push processor that uses the Grafana Loki REST API. The 
processor expects each flow file to contain a single log line to be pushed to 
Grafana Loki, therefore it is usually used together with the TailFile processor.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | 
Description 

   |
+|--|---|--||
+| **Url**  |   |  | Url of the 
Grafana Loki server. For example http://localhost:3100/.

|
+| **Stream Labels**|   |  | Comma 
separated list of = labels to be sent as stream labels. 

 |
+| Log Line Metadata Attributes |   |  | Comma 
separated list of attributes to be sent as log line metadata for a log line.

 |
+| Tenant ID|   |  | The tenant 
ID used by default to push logs to Grafana Loki. If omitted or empty it assumes 
Grafana Loki is running in single-tenant mode and no X-Scope-OrgID header is 
sent.  |
+| Max Batch Size   | 100   |  | The 
maximum number of flow files to process at a time. If not set, or set to 0, all 
FlowFiles will be processed at once.
   |

Review Comment:
   Good catch, fixed in 0f1a614f803abf8ea538df7f95330a953ae2afd7



##
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include 
+#include 
+#include 
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+start_push_time_ = std::chrono::system_clock::now();
+std::unordered_map state;
+state["start_push_time"] = 
std::to_string(std::chrono::duration_cast(start_push_time_.time_since_epoch()).count());
+logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result 

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-15 Thread via GitHub


szaszm commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1446396780


##
PROCESSORS.md:
##
@@ -2196,6 +2197,40 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | failure | FlowFiles that failed to be sent to the destination are 
transferred to this relationship |
 
 
+## PushGrafanaLokiREST
+
+### Description
+
+A Grafana Loki push processor that uses the Grafana Loki REST API. The 
processor expects each flow file to contain a single log line to be pushed to 
Grafana Loki, therefore it is usually used together with the TailFile processor.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | 
Description 

   |
+|--|---|--||
+| **Url**  |   |  | Url of the 
Grafana Loki server. For example http://localhost:3100/.

|
+| **Stream Labels**|   |  | Comma 
separated list of = labels to be sent as stream labels. 

 |
+| Log Line Metadata Attributes |   |  | Comma 
separated list of attributes to be sent as log line metadata for a log line.

 |
+| Tenant ID|   |  | The tenant 
ID used by default to push logs to Grafana Loki. If omitted or empty it assumes 
Grafana Loki is running in single-tenant mode and no X-Scope-OrgID header is 
sent.  |
+| Max Batch Size   | 100   |  | The 
maximum number of flow files to process at a time. If not set, or set to 0, all 
FlowFiles will be processed at once.
   |

Review Comment:
   If not set, isn't it just using the default 100?



##
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include 
+#include 
+#include 
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+start_push_time_ = std::chrono::system_clock::now();
+std::unordered_map state;
+state["start_push_time"] = 
std::to_string(std::chrono::duration_cast(start_push_time_.time_since_epoch()).count());
+logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result = 

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-15 Thread via GitHub


lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1452221843


##
extensions/grafana-loki/PushGrafanaLokiREST.h:
##
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "controllers/SSLContextService.h"
+#include "core/Processor.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/PropertyType.h"
+#include "core/RelationshipDefinition.h"
+#include "client/HTTPClient.h"
+#include "core/StateManager.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+class PushGrafanaLokiREST : public core::Processor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push 
processor that uses the Grafana Loki REST API. The processor expects each flow 
file to contain a single log line to be "
+  "pushed to Grafana 
Loki, therefore it is usually used together with the TailFile processor.";
+
+  explicit PushGrafanaLokiREST(const std::string& name, const 
utils::Identifier& uuid = {})
+  : Processor(name, uuid),
+log_batch_(logger_) {
+  }
+  ~PushGrafanaLokiREST() override = default;
+
+  EXTENSIONAPI static constexpr auto Url = 
core::PropertyDefinitionBuilder<>::createProperty("Url")
+.withDescription("Url of the Grafana Loki server. For example 
http://localhost:3100/.;)
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto StreamLabels = 
core::PropertyDefinitionBuilder<>::createProperty("Stream Labels")
+.withDescription("Comma separated list of = labels to be sent 
as stream labels.")
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata 
Attributes")
+.withDescription("Comma separated list of attributes to be sent as log 
line metadata for a log line.")
+.build();
+  EXTENSIONAPI static constexpr auto TenantID = 
core::PropertyDefinitionBuilder<>::createProperty("Tenant ID")
+.withDescription("The tenant ID used by default to push logs to Grafana 
Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant 
mode and no X-Scope-OrgID header is sent.")
+.build();
+  EXTENSIONAPI static constexpr auto MaxBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size")
+.withDescription("The maximum number of flow files to process at a time. 
If not set, or set to 0, all FlowFiles will be processed at once.")
+.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE)
+.withDefaultValue("100")
+.build();
+  EXTENSIONAPI static constexpr auto LogLineBatchWait = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait")
+.withDescription("Time to wait before sending a log line batch to Grafana 
Loki, full or not. If this property and Log Line Batch Size are both unset, "
+ "the log batch of the current trigger will be sent 
immediately.")
+.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+.build();
+  EXTENSIONAPI static constexpr auto LogLineBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size")
+.withDescription("Number of log lines to send in a batch to Loki. If this 
property and Log Line Batch Wait are both unset, "
+ "the log batch of the current trigger will be sent 
immediately.")
+.withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE)
+.build();
+  EXTENSIONAPI static constexpr auto ConnectTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout")
+.withDescription("Max wait time for connection to the Grafana Loki 
service.")
+.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+.withDefaultValue("5 s")
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto ReadTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Read Timeout")
+.withDescription("Max wait time for response from remote 

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-15 Thread via GitHub


adamdebreceni commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1452092124


##
extensions/grafana-loki/PushGrafanaLokiREST.h:
##
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "controllers/SSLContextService.h"
+#include "core/Processor.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/PropertyType.h"
+#include "core/RelationshipDefinition.h"
+#include "client/HTTPClient.h"
+#include "core/StateManager.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+class PushGrafanaLokiREST : public core::Processor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push 
processor that uses the Grafana Loki REST API. The processor expects each flow 
file to contain a single log line to be "
+  "pushed to Grafana 
Loki, therefore it is usually used together with the TailFile processor.";
+
+  explicit PushGrafanaLokiREST(const std::string& name, const 
utils::Identifier& uuid = {})
+  : Processor(name, uuid),
+log_batch_(logger_) {
+  }
+  ~PushGrafanaLokiREST() override = default;
+
+  EXTENSIONAPI static constexpr auto Url = 
core::PropertyDefinitionBuilder<>::createProperty("Url")
+.withDescription("Url of the Grafana Loki server. For example 
http://localhost:3100/.;)
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto StreamLabels = 
core::PropertyDefinitionBuilder<>::createProperty("Stream Labels")
+.withDescription("Comma separated list of = labels to be sent 
as stream labels.")
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata 
Attributes")
+.withDescription("Comma separated list of attributes to be sent as log 
line metadata for a log line.")
+.build();
+  EXTENSIONAPI static constexpr auto TenantID = 
core::PropertyDefinitionBuilder<>::createProperty("Tenant ID")
+.withDescription("The tenant ID used by default to push logs to Grafana 
Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant 
mode and no X-Scope-OrgID header is sent.")
+.build();
+  EXTENSIONAPI static constexpr auto MaxBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size")
+.withDescription("The maximum number of flow files to process at a time. 
If not set, or set to 0, all FlowFiles will be processed at once.")
+.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE)
+.withDefaultValue("100")
+.build();
+  EXTENSIONAPI static constexpr auto LogLineBatchWait = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait")
+.withDescription("Time to wait before sending a log line batch to Grafana 
Loki, full or not. If this property and Log Line Batch Size are both unset, "
+ "the log batch of the current trigger will be sent 
immediately.")
+.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+.build();
+  EXTENSIONAPI static constexpr auto LogLineBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size")
+.withDescription("Number of log lines to send in a batch to Loki. If this 
property and Log Line Batch Wait are both unset, "
+ "the log batch of the current trigger will be sent 
immediately.")
+.withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE)
+.build();
+  EXTENSIONAPI static constexpr auto ConnectTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout")
+.withDescription("Max wait time for connection to the Grafana Loki 
service.")
+.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+.withDefaultValue("5 s")
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto ReadTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Read Timeout")
+.withDescription("Max wait time for response from remote 

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-02 Thread via GitHub


lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439542044


##
extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp:
##
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PushGrafanaLokiREST.h"
+#include "MockGrafanaLoki.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "utils/StringUtils.h"
+#include "utils/TestUtils.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki::test {
+
+TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, ""));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1"));
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+TEST_CASE("Valid stream labels need to be set", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1"));
+  SECTION("Stream labels cannot be empty") {
+test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "");
+  }
+  SECTION("Stream labels need to be valid") {
+test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "invalidlabels,invalidlabels2");
+  }
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+TEST_CASE("Log Line Batch Size cannot be 0", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "0");
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+class PushGrafanaLokiRESTTestFixture {
+ public:
+  PushGrafanaLokiRESTTestFixture()
+  : mock_loki_("10990"),
+
push_grafana_loki_rest_(std::make_shared("PushGrafanaLokiREST")),
+test_controller_(push_grafana_loki_rest_) {
+LogTestController::getInstance().setDebug();
+LogTestController::getInstance().setDebug();
+LogTestController::getInstance().setTrace();
+LogTestController::getInstance().setTrace();
+CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  }
+
+  void setProperty(const auto& property, const std::string& property_value) {
+CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, 
property, property_value));
+  }
+
+  void verifyLastRequestIsEmpty() {
+const auto& request = mock_loki_.getLastRequest();
+REQUIRE(request.IsNull());
+  }
+
+  void verifyTenantId(const std::string& tenant_id) {
+REQUIRE(mock_loki_.getLastTenantId() == tenant_id);
+  }
+
+  void verifyBasicAuthorization(const std::string& 
expected_username_and_password) {
+auto last_authorization = mock_loki_.getLastAuthorization();
+std::string expected_authorization = "Basic ";
+REQUIRE(minifi::utils::StringUtils::startsWith(last_authorization, 

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-02 Thread via GitHub


lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439541779


##
extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp:
##
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PushGrafanaLokiREST.h"
+#include "MockGrafanaLoki.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "utils/StringUtils.h"
+#include "utils/TestUtils.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki::test {
+
+TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, ""));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1"));

Review Comment:
   Removed the checks in 083c28ee59f96352e89d5675d635ea3b91bd9942



##
extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp:
##
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PushGrafanaLokiREST.h"
+#include "MockGrafanaLoki.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "utils/StringUtils.h"
+#include "utils/TestUtils.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki::test {
+
+TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, ""));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1"));
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+TEST_CASE("Valid stream labels need to be set", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1"));
+  SECTION("Stream labels cannot be empty") {
+test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "");
+  }
+  SECTION("Stream labels need to be valid") {
+test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "invalidlabels,invalidlabels2");
+  }
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+TEST_CASE("Log Line Batch Size cannot be 0", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-02 Thread via GitHub


lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439541507


##
extensions/grafana-loki/tests/MockGrafanaLoki.h:
##
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include "tests/CivetLibrary.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rapidjson/document.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stringbuffer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki::test {
+
+class GrafanaLokiHandler : public CivetHandler {
+ public:
+  const rapidjson::Document& getLastRequest() const {
+return request_received_;
+  }
+
+  std::string getLastTenantId() const {
+return tenant_id_set_;
+  }
+
+  std::string getLastAuthorization() const {
+return authorization_set_;
+  }
+
+ private:
+  bool handlePost(CivetServer*, struct mg_connection* conn) override {
+tenant_id_set_.clear();
+authorization_set_.clear();
+const char *org_id = mg_get_header(conn, "X-Scope-OrgID");
+if (org_id != nullptr) {
+  tenant_id_set_ = org_id;
+}
+
+const char *authorization = mg_get_header(conn, "Authorization");
+if (authorization != nullptr) {
+  authorization_set_ = authorization;
+}
+
+std::array request;
+size_t chars_read = mg_read(conn, request.data(), 2048);

Review Comment:
   Updated in 083c28ee59f96352e89d5675d635ea3b91bd9942



##
extensions/grafana-loki/tests/MockGrafanaLoki.h:
##
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include "tests/CivetLibrary.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rapidjson/document.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stringbuffer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki::test {
+
+class GrafanaLokiHandler : public CivetHandler {
+ public:
+  const rapidjson::Document& getLastRequest() const {
+return request_received_;
+  }
+
+  std::string getLastTenantId() const {
+return tenant_id_set_;
+  }
+
+  std::string getLastAuthorization() const {
+return authorization_set_;
+  }
+
+ private:
+  bool handlePost(CivetServer*, struct mg_connection* conn) override {
+tenant_id_set_.clear();
+authorization_set_.clear();
+const char *org_id = mg_get_header(conn, "X-Scope-OrgID");
+if (org_id != nullptr) {
+  tenant_id_set_ = org_id;
+}
+
+const char *authorization = mg_get_header(conn, "Authorization");
+if (authorization != nullptr) {
+  authorization_set_ = authorization;
+}
+
+std::array request;
+size_t chars_read = mg_read(conn, request.data(), 2048);
+std::string json_str(request.data(), chars_read);
+request_received_.Parse(json_str.c_str());
+
+mg_printf(conn, "HTTP/1.1 204 OK\r\n");
+mg_printf(conn, "Content-length: 0");
+mg_printf(conn, "\r\n\r\n");
+return true;
+  }
+
+  rapidjson::Document request_received_;
+  std::string tenant_id_set_;
+  std::string authorization_set_;
+};
+
+class MockGrafanaLoki {
+ public:
+  explicit MockGrafanaLoki(std::string port) : port_(std::move(port)) {
+std::vector options;
+options.emplace_back("listening_ports");
+

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-02 Thread via GitHub


lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439541257


##
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include 
+#include 
+#include 
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+start_push_time_ = std::chrono::steady_clock::now();
+std::unordered_map state;
+state["start_push_time"] = 
std::to_string(std::chrono::duration_cast(start_push_time_.time_since_epoch()).count());
+logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result = batched_flowfiles_;
+  batched_flowfiles_.clear();
+  if (log_line_batch_wait_) {
+start_push_time_ = {};
+std::unordered_map state;
+logger_->log_debug("Reset start push time state");
+state["start_push_time"] = "0";
+state_manager_->set(state);
+  }
+  return result;
+}
+
+bool PushGrafanaLokiREST::LogBatch::isReady() const {
+  return (log_line_batch_size_ && batched_flowfiles_.size() >= 
*log_line_batch_size_) || (log_line_batch_wait_ && 
std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_);
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional 
log_line_batch_size) {
+  log_line_batch_size_ = log_line_batch_size;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional
 log_line_batch_wait) {
+  log_line_batch_wait_ = log_line_batch_wait;
+}
+
+void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* 
state_manager) {
+  state_manager_ = state_manager;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point
 start_push_time) {
+  start_push_time_ = start_push_time;
+}
+
+const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the 
FlowFile to be owned by this processor");
+
+void PushGrafanaLokiREST::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = 
context.getProperty(PushGrafanaLokiREST::SSLContextService)) {
+return 
std::dynamic_pointer_cast(context.getControllerService(*ssl_context));
+  }
+  return std::shared_ptr{};
+}
+
+std::string readLogLineFromFlowFile(const std::shared_ptr& 
flow_file, core::ProcessSession& session) {
+  auto read_buffer_result = session.readBuffer(flow_file);
+  return {reinterpret_cast(read_buffer_result.buffer.data()), 
read_buffer_result.buffer.size()};
+}
+}  // namespace
+
+void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) {
+  auto state_manager = context.getStateManager();
+  if (state_manager == nullptr) {
+throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+  log_batch_.setStateManager(state_manager);
+
+  std::unordered_map state_map;
+  if (state_manager->get(state_map)) {
+auto it = state_map.find("start_push_time");
+if (it != state_map.end()) {
+  logger_->log_info("Restored start push time from processor state: {}", 
it->second);
+  std::chrono::steady_clock::time_point 
start_push_time{std::chrono::milliseconds{std::stoll(it->second)}};
+  log_batch_.setStartPushTime(start_push_time);
+}
+  }
+}
+
+void 

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-02 Thread via GitHub


lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439540650


##
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include 
+#include 
+#include 
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+start_push_time_ = std::chrono::steady_clock::now();
+std::unordered_map state;
+state["start_push_time"] = 
std::to_string(std::chrono::duration_cast(start_push_time_.time_since_epoch()).count());
+logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result = batched_flowfiles_;
+  batched_flowfiles_.clear();
+  if (log_line_batch_wait_) {
+start_push_time_ = {};
+std::unordered_map state;
+logger_->log_debug("Reset start push time state");
+state["start_push_time"] = "0";
+state_manager_->set(state);
+  }
+  return result;
+}
+
+bool PushGrafanaLokiREST::LogBatch::isReady() const {
+  return (log_line_batch_size_ && batched_flowfiles_.size() >= 
*log_line_batch_size_) || (log_line_batch_wait_ && 
std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_);
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional 
log_line_batch_size) {
+  log_line_batch_size_ = log_line_batch_size;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional
 log_line_batch_wait) {
+  log_line_batch_wait_ = log_line_batch_wait;
+}
+
+void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* 
state_manager) {
+  state_manager_ = state_manager;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point
 start_push_time) {
+  start_push_time_ = start_push_time;
+}
+
+const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the 
FlowFile to be owned by this processor");
+
+void PushGrafanaLokiREST::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = 
context.getProperty(PushGrafanaLokiREST::SSLContextService)) {
+return 
std::dynamic_pointer_cast(context.getControllerService(*ssl_context));
+  }
+  return std::shared_ptr{};
+}
+
+std::string readLogLineFromFlowFile(const std::shared_ptr& 
flow_file, core::ProcessSession& session) {
+  auto read_buffer_result = session.readBuffer(flow_file);
+  return {reinterpret_cast(read_buffer_result.buffer.data()), 
read_buffer_result.buffer.size()};
+}
+}  // namespace
+
+void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) {
+  auto state_manager = context.getStateManager();
+  if (state_manager == nullptr) {
+throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+  log_batch_.setStateManager(state_manager);
+
+  std::unordered_map state_map;
+  if (state_manager->get(state_map)) {
+auto it = state_map.find("start_push_time");
+if (it != state_map.end()) {
+  logger_->log_info("Restored start push time from processor state: {}", 
it->second);
+  std::chrono::steady_clock::time_point 
start_push_time{std::chrono::milliseconds{std::stoll(it->second)}};
+  log_batch_.setStartPushTime(start_push_time);

Review Comment:
   Good catch, I would like to keep the state after restart, 

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-02 Thread via GitHub


lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439539966


##
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include 
+#include 
+#include 
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+start_push_time_ = std::chrono::steady_clock::now();
+std::unordered_map state;
+state["start_push_time"] = 
std::to_string(std::chrono::duration_cast(start_push_time_.time_since_epoch()).count());
+logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result = batched_flowfiles_;
+  batched_flowfiles_.clear();
+  if (log_line_batch_wait_) {
+start_push_time_ = {};
+std::unordered_map state;
+logger_->log_debug("Reset start push time state");
+state["start_push_time"] = "0";
+state_manager_->set(state);
+  }
+  return result;
+}
+
+bool PushGrafanaLokiREST::LogBatch::isReady() const {
+  return (log_line_batch_size_ && batched_flowfiles_.size() >= 
*log_line_batch_size_) || (log_line_batch_wait_ && 
std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_);
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional 
log_line_batch_size) {
+  log_line_batch_size_ = log_line_batch_size;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional
 log_line_batch_wait) {
+  log_line_batch_wait_ = log_line_batch_wait;
+}
+
+void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* 
state_manager) {
+  state_manager_ = state_manager;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point
 start_push_time) {
+  start_push_time_ = start_push_time;
+}
+
+const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the 
FlowFile to be owned by this processor");
+
+void PushGrafanaLokiREST::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = 
context.getProperty(PushGrafanaLokiREST::SSLContextService)) {
+return 
std::dynamic_pointer_cast(context.getControllerService(*ssl_context));
+  }
+  return std::shared_ptr{};
+}
+
+std::string readLogLineFromFlowFile(const std::shared_ptr& 
flow_file, core::ProcessSession& session) {
+  auto read_buffer_result = session.readBuffer(flow_file);
+  return {reinterpret_cast(read_buffer_result.buffer.data()), 
read_buffer_result.buffer.size()};

Review Comment:
   Updated with the latter in 083c28ee59f96352e89d5675d635ea3b91bd9942



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-02 Thread via GitHub


lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439539574


##
extensions/grafana-loki/PushGrafanaLokiREST.h:
##
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "controllers/SSLContextService.h"
+#include "core/Processor.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/PropertyType.h"
+#include "core/RelationshipDefinition.h"
+#include "client/HTTPClient.h"
+#include "core/StateManager.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+class PushGrafanaLokiREST : public core::Processor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push 
processor that uses the Grafana Loki REST API. The processor expects each flow 
file to contain a single log line to be "
+  "pushed to Grafana 
Loki, therefore it is usually used together with the TailFile processor.";
+
+  explicit PushGrafanaLokiREST(const std::string& name, const 
utils::Identifier& uuid = {})
+  : Processor(name, uuid),
+log_batch_(logger_) {
+  }
+  ~PushGrafanaLokiREST() override = default;
+
+  EXTENSIONAPI static constexpr auto Url = 
core::PropertyDefinitionBuilder<>::createProperty("Url")
+.withDescription("Url of the Grafana Loki server. For example 
http://localhost:3100/.;)
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto StreamLabels = 
core::PropertyDefinitionBuilder<>::createProperty("Stream Labels")
+.withDescription("Comma separated list of = labels to be sent 
as stream labels.")
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata 
Attributes")
+.withDescription("Comma separated list of attributes to be sent as log 
line metadata for a log line.")
+.build();
+  EXTENSIONAPI static constexpr auto TenantID = 
core::PropertyDefinitionBuilder<>::createProperty("Tenant ID")
+.withDescription("The tenant ID used by default to push logs to Grafana 
Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant 
mode and no X-Scope-OrgID header is sent.")
+.build();
+  EXTENSIONAPI static constexpr auto MaxBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size")
+.withDescription("The maximum number of flow files to process at a time. 
If not set, or set to 0, all FlowFiles will be processed at once.")
+.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE)
+.withDefaultValue("100")
+.build();
+  EXTENSIONAPI static constexpr auto LogLineBatchWait = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait")
+.withDescription("Time to wait before sending a log line batch to Grafana 
Loki, full or not. If this property and Log Line Batch Size are both unset, "
+ "the log batch of the current trigger will be sent 
immediately.")
+.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+.build();
+  EXTENSIONAPI static constexpr auto LogLineBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size")
+.withDescription("Number of log lines to send in a batch to Loki. If this 
property and Log Line Batch Wait are both unset, "
+ "the log batch of the current trigger will be sent 
immediately.")
+.withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE)
+.build();
+  EXTENSIONAPI static constexpr auto ConnectTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout")
+.withDescription("Max wait time for connection to the Grafana Loki 
service.")
+.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+.withDefaultValue("5 s")
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto ReadTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Read Timeout")
+.withDescription("Max wait time for response from remote 

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-02 Thread via GitHub


lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439539348


##
bootstrap.sh:
##
@@ -339,6 +339,9 @@ add_option PROMETHEUS_ENABLED ${TRUE} "ENABLE_PROMETHEUS"
 add_option OPENSSL_ENABLED ${TRUE} "OPENSSL_OFF"
 add_dependency OPENSSL_ENABLED "opensslbuild"
 
+add_option GRAFANA_LOKI_ENABLED ${TRUE} "ENABLE_GRAFANA_LOKI"
+set_dependency GRAFANA_LOKI_ENABLED HTTP_CURL_ENABLED

Review Comment:
   It should be false, updated in 083c28ee59f96352e89d5675d635ea3b91bd9942



##
docker/test/integration/cluster/checkers/GrafanaLokiChecker.py:
##
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import requests
+from typing import List
+from utils import wait_for
+
+
+class GrafanaLokiChecker:
+def __init__(self):
+self.url = "localhost:3100/loki/api/v1/query"
+
+def veify_log_lines_on_grafana_loki(self, lines: List[str], ssl: bool, 
tenant_id: str):
+labels = '{job="minifi"}'
+prefix = "http://;
+if ssl:
+prefix = "https://;
+
+query_url = f"{prefix}{self.url}?query={labels}"
+
+headers = None
+if tenant_id:
+headers = {'X-Scope-OrgID': tenant_id}
+
+response = requests.get(query_url, verify=False, timeout=30, 
headers=headers)
+if response.status_code >= 200 and response.status_code < 300:

Review Comment:
   Updated in 083c28ee59f96352e89d5675d635ea3b91bd9942



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2024-01-02 Thread via GitHub


lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439438776


##
extensions/grafana-loki/PushGrafanaLokiREST.h:
##
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include 
+#include 
+#include 
+#include 
+
+#include "controllers/SSLContextService.h"
+#include "core/Processor.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/PropertyType.h"
+#include "core/RelationshipDefinition.h"
+#include "client/HTTPClient.h"
+#include "core/StateManager.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+class PushGrafanaLokiREST : public core::Processor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push 
processor that uses the Grafana Loki REST API. The processor expects each flow 
file to contain a single log line to be "
+  "pushed to Grafana 
Loki, therefore it is usually used together with the TailFile processor.";
+
+  explicit PushGrafanaLokiREST(const std::string& name, const 
utils::Identifier& uuid = {})
+  : Processor(name, uuid),
+log_batch_(logger_) {
+  }
+  ~PushGrafanaLokiREST() override = default;
+
+  EXTENSIONAPI static constexpr auto Url = 
core::PropertyDefinitionBuilder<>::createProperty("Url")
+.withDescription("Url of the Grafana Loki server. For example 
http://localhost:3100/.;)
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto StreamLabels = 
core::PropertyDefinitionBuilder<>::createProperty("Stream Labels")
+.withDescription("Comma separated list of = labels to be sent 
as stream labels.")
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata 
Attributes")
+.withDescription("Comma separated list of attributes to be sent as log 
line metadata for a log line.")
+.build();
+  EXTENSIONAPI static constexpr auto TenantID = 
core::PropertyDefinitionBuilder<>::createProperty("Tenant ID")
+.withDescription("The tenant ID used by default to push logs to Grafana 
Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant 
mode and no X-Scope-OrgID header is sent.")
+.build();
+  EXTENSIONAPI static constexpr auto MaxBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size")
+.withDescription("The maximum number of flow files to process at a time. 
If not set, or set to 0, all FlowFiles will be processed at once.")
+.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE)
+.withDefaultValue("100")
+.build();
+  EXTENSIONAPI static constexpr auto LogLineBatchWait = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait")
+.withDescription("Time to wait before sending a log line batch to Grafana 
Loki, full or not. If this property and Log Line Batch Size are both unset, "
+ "the log batch of the current trigger will be sent 
immediately.")
+.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+.build();
+  EXTENSIONAPI static constexpr auto LogLineBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size")
+.withDescription("Number of log lines to send in a batch to Loki. If this 
property and Log Line Batch Wait are both unset, "
+ "the log batch of the current trigger will be sent 
immediately.")
+.withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE)
+.build();
+  EXTENSIONAPI static constexpr auto ConnectTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout")
+.withDescription("Max wait time for connection to the Grafana Loki 
service.")
+.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+.withDefaultValue("5 s")
+.isRequired(true)
+.build();
+  EXTENSIONAPI static constexpr auto ReadTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Read Timeout")
+.withDescription("Max wait time for response from remote 

Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2023-12-19 Thread via GitHub


fgerlits commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1425597679


##
docker/test/integration/cluster/checkers/GrafanaLokiChecker.py:
##
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import requests
+from typing import List
+from utils import wait_for
+
+
+class GrafanaLokiChecker:
+def __init__(self):
+self.url = "localhost:3100/loki/api/v1/query"
+
+def veify_log_lines_on_grafana_loki(self, lines: List[str], ssl: bool, 
tenant_id: str):
+labels = '{job="minifi"}'
+prefix = "http://;
+if ssl:
+prefix = "https://;
+
+query_url = f"{prefix}{self.url}?query={labels}"
+
+headers = None
+if tenant_id:
+headers = {'X-Scope-OrgID': tenant_id}
+
+response = requests.get(query_url, verify=False, timeout=30, 
headers=headers)
+if response.status_code >= 200 and response.status_code < 300:

Review Comment:
   this conditional could be reversed, so the `return True` happens at the end 
of the function



##
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include 
+#include 
+#include 
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+start_push_time_ = std::chrono::steady_clock::now();
+std::unordered_map state;
+state["start_push_time"] = 
std::to_string(std::chrono::duration_cast(start_push_time_.time_since_epoch()).count());
+logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result = batched_flowfiles_;
+  batched_flowfiles_.clear();
+  if (log_line_batch_wait_) {
+start_push_time_ = {};
+std::unordered_map state;
+logger_->log_debug("Reset start push time state");
+state["start_push_time"] = "0";
+state_manager_->set(state);
+  }
+  return result;
+}
+
+bool PushGrafanaLokiREST::LogBatch::isReady() const {
+  return (log_line_batch_size_ && batched_flowfiles_.size() >= 
*log_line_batch_size_) || (log_line_batch_wait_ && 
std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_);
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional 
log_line_batch_size) {
+  log_line_batch_size_ = log_line_batch_size;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional
 log_line_batch_wait) {
+  log_line_batch_wait_ = log_line_batch_wait;
+}
+
+void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* 
state_manager) {
+  state_manager_ = state_manager;
+}
+
+void 

[PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]

2023-11-10 Thread via GitHub


lordgamez opened a new pull request, #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695

   https://issues.apache.org/jira/browse/MINIFICPP-2261
   
   -
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [ ] Has your PR been rebased against the latest commit within the target 
branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI 
results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org