Used by synce_port to create, control and destroy RX/TX threads.
Ports in sync mode can use only TX thread (device in external input
mode) or both RX and TX threads (device in internal input mode).

RX thread is responsible for:
- receiving ESMC frames
- verifying received QL
- storing last received valid QL and time it was received
- determining if QL-failed state occurred or port recovered from
  QL-failed state.

TX thread is responsible for:
- building new TX ESMC if requested
- sending ESMC on timer.

synce_port_ctrl interface allows:
- initialize threads
- obtain last received valid QL
- compare QL's between 2 ports
- check if RX QL has changed
- check if RX QL has Do Not Use QL
- check if RX QL-failed state occurred
- invalidate RX QL's.

Co-developed-by: Anatolii Gerasymenko <anatolii.gerasyme...@intel.com>
Signed-off-by: Anatolii Gerasymenko <anatolii.gerasyme...@intel.com>
Co-developed-by: Michal Michalik <michal.micha...@intel.com>
Signed-off-by: Michal Michalik <michal.micha...@intel.com>
Signed-off-by: Arkadiusz Kubalewski <arkadiusz.kubalew...@intel.com>
---
 synce_port_ctrl.c | 1077 +++++++++++++++++++++++++++++++++++++++++++++
 synce_port_ctrl.h |  177 ++++++++
 2 files changed, 1254 insertions(+)
 create mode 100644 synce_port_ctrl.c
 create mode 100644 synce_port_ctrl.h

diff --git a/synce_port_ctrl.c b/synce_port_ctrl.c
new file mode 100644
index 000000000000..b0364c4827ef
--- /dev/null
+++ b/synce_port_ctrl.c
@@ -0,0 +1,1077 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/**
+ * @file synce_port_ctrl.c
+ * @brief Interface between synce port and socket handling theads, used
+ * for controling data on the wire. Allows acquire incoming data and
+ * submit new outgoing data.
+ * tx thread is always present, rx only if required (internal_input mode).
+ * @note Copyright (C) 2022 Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2, as published
+ * by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see <http://www.gnu.org/licenses/>.
+ */
+#include <stdlib.h>
+#include <limits.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <net/if.h>
+#include <errno.h>
+#include "print.h"
+#include "ddt.h"
+#include "config.h"
+#include "synce_port_ctrl.h"
+#include "synce_transport.h"
+#include "synce_msg.h"
+
+#define MSEC_TO_USEC(X)                (X * 1000)
+#define THREAD_STOP_SLEEP_USEC MSEC_TO_USEC(50)
+#define THREAD_START_SLEEP_USEC        MSEC_TO_USEC(20)
+#define SYNCE_THREAD_STACK_SIZE 0xffff
+#define RX_THREAD              0
+#define TX_THREAD              1
+#define TASK_COMM_LEN          16
+
+#define ENHANCED_SSM_SHIFT             8
+#define QL_PRIORITY(r, e)              ((e << ENHANCED_SSM_SHIFT) | r)
+
+#define O1N_PRIORITY_COUNT     5
+static const uint16_t O1N_priority[O1N_PRIORITY_COUNT] = {
+       QL_PRIORITY(O1N_QL_EPRTC_SSM, O1N_QL_EPRTC_ENHSSM),
+       QL_PRIORITY(O1N_QL_PRTC_SSM, O1N_QL_PRTC_ENHSSM),
+       QL_PRIORITY(O1N_QL_PRC_SSM, O1N_QL_PRC_ENHSSM),
+       QL_PRIORITY(O1N_QL_SSU_A_SSM, O1N_QL_SSU_A_ENHSSM),
+       QL_PRIORITY(O1N_QL_SSU_B_SSM, O1N_QL_SSU_B_ENHSSM)
+};
+
+#define O2N_PRIORITY_COUNT     8
+static const uint16_t O2N_priority[O2N_PRIORITY_COUNT] = {
+       QL_PRIORITY(O2N_QL_EPRTC_SSM, O2N_QL_EPRTC_ENHSSM),
+       QL_PRIORITY(O2N_QL_PRTC_SSM, O2N_QL_PRTC_ENHSSM),
+       QL_PRIORITY(O2N_QL_PRS_SSM, O2N_QL_PRS_ENHSSM),
+       QL_PRIORITY(O2N_QL_STU_SSM, O2N_QL_STU_ENHSSM),
+       QL_PRIORITY(O2N_QL_ST2_SSM, O2N_QL_ST2_ENHSSM),
+       QL_PRIORITY(O2N_QL_TNC_SSM, O2N_QL_TNC_ENHSSM),
+       QL_PRIORITY(O2N_QL_ST3E_SSM, O2N_QL_ST3E_ENHSSM),
+       QL_PRIORITY(O2N_QL_PROV_SSM, O2N_QL_PROV_ENHSSM)
+};
+
+struct ql {
+       STAILQ_ENTRY(ql) list;
+       uint8_t value;
+};
+
+struct thread_common_data {
+       int heartbeat_usec;
+       int state;
+       uint8_t ql;
+       int extended;
+       struct synce_msg_ext_ql ext_ql;
+       struct synce_transport *transport;
+       struct synce_pdu *pdu;
+       char *name;
+       int enabled;
+};
+
+struct synce_port_tx {
+       struct thread_common_data cd;
+       int rebuild_tlv;
+};
+
+struct synce_port_rx {
+       struct thread_common_data cd;
+       uint8_t last_ql;
+       struct synce_msg_ext_ql last_ext_ql;
+       int ql_failed;
+       struct timespec last_recv_ts;
+       struct timespec first_valid_ts;
+       uint64_t n_recv;
+       int recover_time;
+       int ext_tlv_recvd;
+       uint8_t ql_dnu_val;
+
+       STAILQ_HEAD(allowed_qls_head, ql) allowed_qls;
+       STAILQ_HEAD(allowed_ext_qls_head, ql) allowed_ext_qls;
+};
+
+struct synce_port_ctrl {
+       char name[IF_NAMESIZE];
+       struct synce_port_tx tx;
+       struct synce_port_rx rx;
+       pthread_t tx_thread_id;
+       pthread_t rx_thread_id;
+       struct synce_transport *transport;
+       const uint16_t *priority_list;
+       int priority_list_count;
+};
+
+enum thread_state {
+       THREAD_NOT_STARTED = 0,
+       THREAD_STARTED,
+       THREAD_STOPPING,
+       THREAD_STOPPED,
+       THREAD_FAILED,
+};
+
+static int tx_rebuild_tlv(struct synce_port_tx *tx)
+{
+       struct thread_common_data *cd;
+       int ret = -ENXIO;
+
+       if (!tx) {
+               pr_err("synce_port_tx is NULL");
+               return ret;
+       }
+
+       cd = &tx->cd;
+       if (!cd) {
+               pr_err("%s cd is NULL", __func__);
+               return ret;
+       }
+
+       if (!cd->pdu) {
+               pr_err("tx pdu is NULL");
+               return ret;
+       }
+
+       synce_msg_reset_tlvs(cd->pdu);
+
+       ret = synce_msg_attach_ql_tlv(cd->pdu, cd->ql);
+       if (ret) {
+               pr_err("attach QL=%u TLV failed on %s", cd->ql, cd->name);
+               goto err;
+       } else {
+               pr_info("%s: attached new TLV, QL=%d on %s",
+                       __func__, cd->ql, cd->name);
+       }
+
+       if (cd->extended) {
+               ret = synce_msg_attach_extended_ql_tlv(cd->pdu,
+                                                      &cd->ext_ql);
+               if (ret) {
+                       pr_err("attach EXT_QL TLV failed on %s", cd->name);
+                       goto err;
+               } else {
+                       pr_info("%s: attached new extended TLV, EXT_QL=%d on 
%s",
+                               __func__, cd->ext_ql.enhancedSsmCode,
+                               cd->name);
+               }
+       }
+
+       tx->rebuild_tlv = 0;
+
+       return ret;
+err:
+       cd->state = THREAD_FAILED;
+       return ret;
+}
+
+static void *tx_thread(void *data)
+{
+       struct synce_port_tx *tx = (struct synce_port_tx *) data;
+       struct thread_common_data *cd;
+       volatile int *state;
+
+       if (!tx) {
+               pr_err("%s tx data is NULL", __func__);
+               pthread_exit(NULL);
+       }
+
+       cd = &tx->cd;
+       state = &cd->state;
+       if (*state != THREAD_NOT_STARTED) {
+               pr_err("tx wrong state");
+               goto out;
+       }
+
+       pr_debug("tx thread started on port %s", cd->name);
+       *state = THREAD_STARTED;
+       while (*state == THREAD_STARTED) {
+               if (tx->rebuild_tlv) {
+                       if (tx_rebuild_tlv(tx)) {
+                               pr_err("tx rebuild failed");
+                               goto out;
+                       }
+               }
+
+               /* any errors are traced inside */
+               if (cd->enabled) {
+                       synce_transport_send_pdu(cd->transport, cd->pdu);
+               }
+               usleep(cd->heartbeat_usec);
+       };
+
+out:
+       *state = (*state == THREAD_STOPPING) ? THREAD_STOPPED : *state;
+       pr_debug("tx thread exit state %d=%s port %s", *state,
+                *state == THREAD_STOPPED ? "OK" : "failed", cd->name);
+       pthread_exit(NULL);
+}
+
+static int diff_sec(struct timespec now, struct timespec before)
+{
+       return (now.tv_sec - before.tv_sec);
+}
+
+static void update_ql(struct thread_common_data *cd, int ext_tlv_recvd,
+                     uint8_t ql, const struct synce_msg_ext_ql *ext_ql)
+{
+       cd->ql = ql;
+
+       if (ext_tlv_recvd == 1) {
+               memcpy(&cd->ext_ql, ext_ql, sizeof(cd->ext_ql));
+       }
+}
+
+static int is_ql_allowed(struct allowed_qls_head *qls_stailq_head, uint8_t ql)
+{
+       struct ql *checked_ql;
+
+       if (STAILQ_EMPTY(qls_stailq_head)) {
+               /* no filter list - accept all */
+               return 1;
+       }
+
+       STAILQ_FOREACH(checked_ql, qls_stailq_head, list) {
+               if (checked_ql->value == ql) {
+                       return 1;
+               }
+       }
+
+       return 0;
+}
+
+static int get_rx_qls(struct synce_port_rx *rx, uint8_t *ql,
+                     struct synce_msg_ext_ql *ext_ql)
+{
+       struct thread_common_data *cd = &rx->cd;
+
+       if (synce_msg_get_ql_tlv(cd->pdu, ql)) {
+               return -EAGAIN;
+       }
+
+       pr_debug("QL=%d found on %s", *ql, cd->name);
+
+       if (!is_ql_allowed(&rx->allowed_qls, *ql)) {
+               pr_debug("Received not allowed QL: %i, discarding", *ql);
+               return -EBADMSG;
+       }
+
+       if (cd->extended) {
+               if (synce_msg_get_extended_ql_tlv(cd->pdu, ext_ql)) {
+                       rx->ext_tlv_recvd = 0;
+
+                       /* only extended missing - not an error */
+                       return 0;
+               }
+
+               pr_debug("extended QL=%d found on %s",
+                        ext_ql->enhancedSsmCode, cd->name);
+
+               if (!is_ql_allowed((struct allowed_qls_head *)
+                                  &rx->allowed_ext_qls,
+                                  ext_ql->enhancedSsmCode)) {
+                       pr_debug("Received not allowed ext_QL: %i, discarding",
+                                ext_ql->enhancedSsmCode);
+                       rx->ext_tlv_recvd = 0;
+                       return -EBADMSG;
+               }
+
+               rx->ext_tlv_recvd = 1;
+       }
+
+       return 0;
+}
+
+static int rx_act(struct synce_port_rx *rx)
+{
+       struct thread_common_data *cd = &rx->cd;
+       struct synce_msg_ext_ql ext_ql;
+       struct timespec now;
+       uint8_t ql;
+       int err;
+
+       /* read socket for ESMC and fill pdu */
+       err = synce_transport_recv_pdu(cd->transport, cd->pdu);
+       if (!err) {
+               rx->n_recv++;
+       }
+
+       /* wait for first frame received before starting any logic */
+       if (!rx->n_recv) {
+               return -EAGAIN;
+       }
+
+       err = get_rx_qls(rx, &ql, &ext_ql);
+       if (err) {
+               /* go to ql_failed state if continue missing frames */
+               if (rx->ql_failed == 0) {
+                       clock_gettime(CLOCK_REALTIME, &now);
+                       if (diff_sec(now, rx->last_recv_ts) >=
+                           QL_FAILED_PERIOD_SEC) {
+                               pr_info("QL not received on %s within %d s",
+                                       cd->name, QL_FAILED_PERIOD_SEC);
+                               rx->ql_failed = 1;
+                               /* clear first_valid_ts so we can recover from
+                                * ql_failed state
+                                */
+                               memset(&rx->first_valid_ts, 0,
+                                      sizeof(rx->first_valid_ts));
+                       }
+               }
+       } else {
+               clock_gettime(CLOCK_REALTIME, &rx->last_recv_ts);
+               now.tv_sec = rx->last_recv_ts.tv_sec;
+               if (rx->ql_failed == 1) {
+                       if (rx->first_valid_ts.tv_sec == 0) {
+                               clock_gettime(CLOCK_REALTIME,
+                                             &rx->first_valid_ts);
+                       } else {
+                       /* May be required to add counter for number
+                        * of received frames before exit ql_failed
+                        */
+                               if (diff_sec(now, rx->first_valid_ts) >=
+                                            rx->recover_time) {
+                                       update_ql(cd, rx->ext_tlv_recvd,
+                                                 ql, &ext_ql);
+                                       rx->ql_failed = 0;
+                                       pr_info("QL-failed recovered on %s",
+                                               cd->name);
+                               }
+                       }
+               } else {
+                       update_ql(cd, rx->ext_tlv_recvd, ql, &ext_ql);
+                       pr_debug("QL=%u received on %s", cd->ql, cd->name);
+               }
+       }
+
+       return 0;
+}
+
+static void *rx_thread(void *data)
+{
+       struct synce_port_rx *rx = (struct synce_port_rx *) data;
+       struct thread_common_data *cd;
+       volatile int *state;
+
+       if (!rx) {
+               pr_err("%s rx data NULL", __func__);
+               pthread_exit(NULL);
+       }
+
+       cd = &rx->cd;
+       state = &cd->state;
+       if (*state != THREAD_NOT_STARTED) {
+               pr_err("rx wrong state on %s", cd->name);
+               goto out;
+       }
+
+       pr_debug("rx thread started on port %s", cd->name);
+       *state = THREAD_STARTED;
+       while (*state == THREAD_STARTED) {
+               rx_act(rx);
+               usleep(cd->heartbeat_usec);
+       };
+
+out:
+       *state = (*state == THREAD_STOPPING) ? THREAD_STOPPED : *state;
+       pr_debug("rx thread exit state %d=%s port %s", *state,
+                *state == THREAD_STOPPED ? "OK" : "failed", cd->name);
+       pthread_exit(NULL);
+}
+
+static int tx_init(struct synce_port_tx *tx, int heartbeat_msec,
+                  int extended_tlv, struct synce_transport *transport,
+                  char *name)
+{
+       struct thread_common_data *cd;
+
+       if (!tx) {
+               pr_err("%s rx NULL", __func__);
+               return -EFAULT;
+       }
+
+       if (!transport) {
+               pr_err("%s transport NULL", __func__);
+               return -EFAULT;
+       }
+
+       if (!name) {
+               pr_err("%s name NULL", __func__);
+               return -EFAULT;
+       }
+
+       cd = &tx->cd;
+       memset(tx, 0, sizeof(*tx));
+
+       if (extended_tlv) {
+               memset(&cd->ext_ql, 0, sizeof(cd->ext_ql));
+               cd->extended = extended_tlv;
+       }
+       cd->heartbeat_usec = MSEC_TO_USEC(heartbeat_msec);
+       cd->name = name;
+       cd->pdu = synce_msg_create(cd->name);
+       cd->transport = transport;
+       cd->state = THREAD_NOT_STARTED;
+       tx->rebuild_tlv = 0;
+       cd->enabled = 0;
+
+       return 0;
+}
+
+static void free_allowed_qls(struct allowed_qls_head *head)
+{
+       struct ql *q;
+
+       while ((q = STAILQ_FIRST(head))) {
+               STAILQ_REMOVE_HEAD(head, list);
+               free(q);
+       }
+}
+
+#define QL_STR_MAX_LEN 256
+#define QL_STR_BASE    10
+static int init_ql_str(struct allowed_qls_head *qls_stailq_head,
+                      const char *allowed_qls)
+{
+       unsigned int allowed_qls_len;
+       char *endptr = NULL;
+       const char *ptr;
+
+       if (allowed_qls == NULL) {
+               return 0;
+       }
+
+       allowed_qls_len = strnlen(allowed_qls, QL_STR_MAX_LEN);
+       if (allowed_qls_len == QL_STR_MAX_LEN) {
+               pr_err("QLs list string too long (max %i)", QL_STR_MAX_LEN);
+               return -E2BIG;
+       }
+
+       ptr = allowed_qls;
+       while (*ptr) {
+               unsigned long value;
+               struct ql *newql;
+
+               if (ptr > allowed_qls + allowed_qls_len) {
+                       break;
+               }
+
+               value = strtoul(ptr, &endptr, QL_STR_BASE);
+               if (endptr == ptr) {
+                       pr_err("QL list item read failed - please verify");
+                       free_allowed_qls(qls_stailq_head);
+                       return -EINVAL;
+               }
+               if (value > UCHAR_MAX) {
+                       pr_err("QL list item outside of range - please verify");
+                       free_allowed_qls(qls_stailq_head);
+                       return -EINVAL;
+               }
+
+               newql = malloc(sizeof(struct ql));
+               if (!newql) {
+                       pr_err("could not alloc ql");
+                       return -EINVAL;
+               }
+
+               newql->value = value;
+               STAILQ_INSERT_HEAD(qls_stailq_head, newql, list);
+
+               ptr = endptr + 1;
+       }
+
+       if (endptr != allowed_qls + allowed_qls_len) {
+               pr_err("QL list malformed - not all read - please verify");
+               free_allowed_qls(qls_stailq_head);
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
+static int init_allowed_qls(struct synce_port_rx *rx, struct config *cfg,
+                           const char *name)
+{
+       const char *allowed_qls;
+
+       STAILQ_INIT(&rx->allowed_qls);
+
+       allowed_qls = config_get_string(cfg, name, "allowed_qls");
+       if (allowed_qls == NULL) {
+               pr_warning("No allowed QLs list found - filtering disabled");
+               return 0;
+       }
+
+       return init_ql_str(&rx->allowed_qls, allowed_qls);
+}
+
+static int init_allowed_ext_qls(struct synce_port_rx *rx, struct config *cfg,
+                               const char *name)
+{
+       const char *allowed_qls;
+
+       STAILQ_INIT((struct allowed_qls_head *)&rx->allowed_ext_qls);
+
+       allowed_qls = config_get_string(cfg, name, "allowed_ext_qls");
+
+       if (allowed_qls == NULL) {
+               pr_warning("No allowed ext_QLs list found - filtering 
disabled");
+               return 0;
+       }
+
+       return init_ql_str((struct allowed_qls_head *)&rx->allowed_ext_qls,
+                           allowed_qls);
+}
+
+static int rx_init(struct synce_port_rx *rx, int heartbeat_msec,
+                  int extended_tlv, int recover_time,
+                  struct synce_transport *transport, char *name,
+                  struct config *cfg, int network_option)
+{
+       struct thread_common_data *cd;
+
+       if (!rx) {
+               pr_err("%s rx NULL", __func__);
+               return -EFAULT;
+       }
+
+       if (!transport) {
+               pr_err("%s transport NULL", __func__);
+               return -EFAULT;
+       }
+
+       if (!name) {
+               pr_err("%s name NULL", __func__);
+               return -EFAULT;
+       }
+
+       cd = &rx->cd;
+       memset(rx, 0, sizeof(*rx));
+
+       if (extended_tlv) {
+               memset(&cd->ext_ql, 0, sizeof(cd->ext_ql));
+               memcpy(&rx->last_ext_ql, &cd->ext_ql, sizeof(rx->last_ext_ql));
+               cd->extended = extended_tlv;
+       }
+       cd->heartbeat_usec = MSEC_TO_USEC(heartbeat_msec);
+       cd->name = name;
+       cd->pdu = synce_msg_create(cd->name);
+       cd->transport = transport;
+       cd->state = THREAD_NOT_STARTED;
+       rx->ql_dnu_val = synce_get_dnu_value(network_option, false);
+       rx->last_ql = rx->ql_dnu_val;
+       memset(&rx->last_recv_ts, 0, sizeof(rx->last_recv_ts));
+       memset(&rx->first_valid_ts, 0, sizeof(rx->first_valid_ts));
+       rx->n_recv = 0;
+       rx->recover_time = recover_time;
+       cd->enabled = 1;
+
+       init_allowed_qls(rx, cfg, name);
+       init_allowed_ext_qls(rx, cfg, name);
+
+       return 0;
+}
+
+static int thread_stop_wait(struct thread_common_data *cd)
+{
+       int cnt = (cd->heartbeat_usec / THREAD_STOP_SLEEP_USEC) + 1;
+
+       if (cd->state == THREAD_STARTED) {
+               cd->state = THREAD_STOPPING;
+       } else {
+               return -ESRCH;
+       }
+
+       while (cnt-- && cd->state != THREAD_STOPPED) {
+               usleep(THREAD_STOP_SLEEP_USEC);
+       };
+
+       return (cd->state == THREAD_STOPPED ? 0 : -ENXIO);
+}
+
+static int thread_start_wait(struct thread_common_data *cd)
+{
+       int cnt = (cd->heartbeat_usec / THREAD_STOP_SLEEP_USEC) + 1;
+
+       if (cd->state == THREAD_STARTED) {
+               pr_debug("THREAD_STARTED");
+
+               return 0;
+       }
+
+       while (cnt-- && cd->state != THREAD_STARTED) {
+               usleep(THREAD_STOP_SLEEP_USEC);
+       }
+
+       if (cd->state != THREAD_STARTED) {
+               pr_err("THREAD_FAILED");
+
+               return -ESRCH;
+       }
+
+       pr_debug("THREAD_STARTED");
+
+       return 0;
+}
+
+static int synce_port_ctrl_thread_create(char *name, void *data, int tx,
+                                        pthread_t *thread_id)
+{
+       char thread_name[TASK_COMM_LEN];
+       pthread_attr_t attr;
+       int err;
+
+       err = pthread_attr_init(&attr);
+       if (err) {
+               pr_err("init %s thread attr failed for %s",
+                      tx ? "tx" : "rx", name);
+               goto err_attr;
+       }
+
+       err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+       if (err) {
+               pr_err("set %s thread detached failed for %s",
+                      tx ? "tx" : "rx", name);
+               goto err_attr;
+       }
+
+       err = pthread_attr_setstacksize(&attr, SYNCE_THREAD_STACK_SIZE);
+       if (err) {
+               pr_err("set %s thread stack failed for %s",
+                      tx ? "tx" : "rx", name);
+               goto err_attr;
+       }
+
+       if (tx) {
+               err = pthread_create(thread_id, &attr, tx_thread, data);
+       } else {
+               err = pthread_create(thread_id, &attr, rx_thread, data);
+       }
+       if (err) {
+               pr_err("create %s thread failed for %s",
+                      tx ? "tx" : "rx", name);
+               goto err_attr;
+       }
+
+       snprintf(thread_name, TASK_COMM_LEN, "%s-%s",
+                tx ? "tx" : "rx", name);
+       err = pthread_setname_np(*thread_id, thread_name);
+       if (err) {
+               pr_info("failed to set %s thread's name for %s",
+                       tx ? "tx" : "rx", name);
+       }
+
+       pthread_attr_destroy(&attr);
+       return 0;
+
+err_attr:
+       pthread_attr_destroy(&attr);
+       return -ECHILD;
+}
+
+static uint16_t get_ql_priority(struct synce_port_ctrl *pc)
+{
+       if (pc->rx.cd.extended) {
+               return QL_PRIORITY(pc->rx.cd.ql,
+                                  pc->rx.cd.ext_ql.enhancedSsmCode);
+       } else {
+               return QL_PRIORITY(pc->rx.cd.ql,
+                                  QL_OTHER_CLOCK_TYPES_ENHSSM);
+       }
+}
+
+static struct synce_port_ctrl *is_valid_source(struct synce_port_ctrl *pc)
+{
+       uint16_t ql_priority;
+       int i;
+
+       if (!pc) {
+               pr_debug("pc is NULL");
+               return NULL;
+       }
+
+       ql_priority = get_ql_priority(pc);
+
+       if (pc->rx.n_recv > 0 && !pc->rx.ql_failed) {
+               for (i = 0; i < pc->priority_list_count; i++) {
+                       if (ql_priority == pc->priority_list[i]) {
+                               return pc;
+                       }
+               }
+       }
+       pr_info("not valid source: %s", pc->name);
+
+       return NULL;
+}
+
+int synce_port_ctrl_running(struct synce_port_ctrl *pc)
+{
+       int ret;
+
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -EFAULT;
+       }
+
+       if (!pc->rx.cd.enabled) {
+               ret = (pc->tx.cd.state == THREAD_STARTED);
+       } else {
+               ret = (pc->tx.cd.state == THREAD_STARTED) &&
+                     (pc->rx.cd.state == THREAD_STARTED);
+       }
+
+       return ret;
+}
+
+int synce_port_ctrl_destroy(struct synce_port_ctrl *pc)
+{
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -EFAULT;
+       }
+       pr_debug("%s on %s", __func__, pc->name);
+
+       thread_stop_wait(&pc->tx.cd);
+       if (pc->tx.cd.pdu) {
+               synce_msg_delete(pc->tx.cd.pdu);
+       }
+
+       if (pc->rx.cd.enabled) {
+               thread_stop_wait(&pc->rx.cd);
+               if (pc->tx.cd.pdu) {
+                       synce_msg_delete(pc->rx.cd.pdu);
+               }
+
+               free_allowed_qls(&pc->rx.allowed_qls);
+               free_allowed_qls((struct allowed_qls_head 
*)&pc->rx.allowed_ext_qls);
+       }
+
+       if (pc->transport) {
+               synce_transport_delete(pc->transport);
+       }
+       memset(pc, 0, sizeof(*pc));
+
+       return 0;
+}
+
+int synce_port_ctrl_rx_ql_failed(struct synce_port_ctrl *pc)
+{
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -EFAULT;
+       }
+
+       return (pc->rx.ql_failed != 0);
+}
+
+int synce_port_ctrl_rx_dnu(struct synce_port_ctrl *pc, uint8_t dnu_val)
+{
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -EFAULT;
+       }
+
+       return pc->rx.n_recv && !pc->rx.ql_failed ?
+               pc->rx.cd.ql == dnu_val : -EAGAIN;
+}
+
+int synce_port_ctrl_rx_ql_changed(struct synce_port_ctrl *pc)
+{
+       struct thread_common_data *cd;
+       int ret;
+
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -EFAULT;
+       }
+
+       cd = &pc->rx.cd;
+       if (!pc->rx.ext_tlv_recvd) {
+               ret = (cd->ql != pc->rx.last_ql);
+               pc->rx.last_ql = cd->ql;
+       } else {
+               ret = (cd->ql != pc->rx.last_ql) ||
+                     (memcmp(&cd->ext_ql,
+                             &pc->rx.last_ext_ql,
+                             sizeof(cd->ext_ql)) != 0);
+               pc->rx.last_ql = cd->ql;
+               memcpy(&pc->rx.last_ext_ql, &cd->ext_ql, 
sizeof(pc->rx.last_ext_ql));
+       }
+
+       if (ret) {
+               pr_debug("%s on %s", __func__, pc->name);
+       }
+
+       return ret;
+}
+
+int synce_port_ctrl_rx_ext_tlv(struct synce_port_ctrl *pc)
+{
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -EFAULT;
+       }
+
+       return pc->rx.ext_tlv_recvd;
+}
+
+int synce_port_ctrl_get_rx_ql(struct synce_port_ctrl *pc, uint8_t *ql)
+{
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -EFAULT;
+       }
+
+       if (!ql) {
+               pr_err("%s ql is NULL", __func__);
+               return -EFAULT;
+       }
+
+       *ql = pc->rx.cd.ql;
+
+       return 0;
+}
+
+int synce_port_ctrl_get_rx_ext_ql(struct synce_port_ctrl *pc,
+                                 struct synce_msg_ext_ql *ext_ql)
+{
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -EFAULT;
+       }
+
+       if (!ext_ql) {
+               pr_err("%s ext_ql is NULL", __func__);
+               return -EFAULT;
+       }
+
+       if (!pc->rx.cd.extended) {
+               pr_err("ext_ql was not enabled for %s", pc->name);
+               return -EFAULT;
+       }
+
+       memcpy(ext_ql, &pc->rx.cd.ext_ql, sizeof(*ext_ql));
+
+       return 0;
+}
+
+int synce_port_ctrl_set_tx_ql(struct synce_port_ctrl *pc, uint8_t ql)
+{
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -EFAULT;
+       }
+
+       pc->tx.cd.ql = ql;
+
+       return 0;
+}
+
+int synce_port_ctrl_set_tx_ext_ql(struct synce_port_ctrl *pc,
+                                 struct synce_msg_ext_ql *ext_ql)
+{
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -EFAULT;
+       }
+
+       if (!ext_ql) {
+               pr_err("%s ext_ql is NULL", __func__);
+               return -EFAULT;
+       }
+
+       memcpy(&pc->tx.cd.ext_ql, ext_ql, sizeof(pc->tx.cd.ext_ql));
+
+       return 0;
+}
+
+int synce_port_ctrl_rebuild_tx(struct synce_port_ctrl *pc)
+{
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -EFAULT;
+       }
+
+       pc->tx.rebuild_tlv = 1;
+
+       return 0;
+}
+
+int synce_port_ctrl_enable_tx(struct synce_port_ctrl *pc)
+{
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -EFAULT;
+       }
+
+       pc->tx.cd.enabled = 1;
+       return 0;
+}
+
+struct synce_port_ctrl *
+synce_port_ctrl_compare_ql(struct synce_port_ctrl *left,
+                          struct synce_port_ctrl *right)
+{
+       uint16_t left_ql_priority, right_ql_priority;
+       struct synce_port_ctrl *best = NULL;
+       int i;
+
+       if (!left && !right) {
+               pr_err("%s both left and right are NULL", __func__);
+               goto out;
+       }
+
+       left = is_valid_source(left);
+       right = is_valid_source(right);
+
+       if (!left && !right) {
+               pr_err("both left and right are invalid");
+               goto out;
+       } else if (!left != !right) {
+               best = left ? left : right;
+               pr_debug("only one valid source %s QL=%u",
+                        best->name, best->rx.cd.ql);
+               goto out;
+       }
+
+       left_ql_priority = get_ql_priority(left);
+       right_ql_priority = get_ql_priority(right);
+
+       /* the left and right lists should be the same */
+       if (left->priority_list != right->priority_list ||
+           left->priority_list_count != right->priority_list_count) {
+               pr_err("priority lists on compared ports are different");
+               return NULL;
+       }
+       /* we can use either left or right priority list */
+       for (i = 0; i < left->priority_list_count; i++) {
+               if (left->priority_list[i] == left_ql_priority) {
+                       best = left;
+                       goto out;
+               }
+               if (left->priority_list[i] == right_ql_priority) {
+                       best = right;
+                       goto out;
+               }
+       }
+
+       pr_debug("didn't found neither of QLs on priorities list");
+out:
+       if (!best) {
+               pr_debug("no valid source found");
+       }
+       return best;
+}
+
+int synce_port_ctrl_init(struct synce_port_ctrl *pc, struct config *cfg,
+                        int rx_enabled, int extended_tlv, int recover_time,
+                        int network_option)
+{
+       if (!pc) {
+               pr_err("%s pc is NULL", __func__);
+               return -ENODEV;
+       }
+
+       if (!cfg) {
+               pr_err("%s cfg is NULL", __func__);
+               return -ENXIO;
+       }
+
+       pc->transport = synce_transport_create(pc->name);
+       if (!pc->transport) {
+               pr_err("init synce_transport failed for port %s", pc->name);
+               return -ENXIO;
+       }
+
+       tx_init(&pc->tx,
+               config_get_int(cfg, pc->name, "tx_heartbeat_msec"),
+               extended_tlv, pc->transport, pc->name);
+       if (synce_port_ctrl_thread_create(pc->tx.cd.name, &pc->tx, TX_THREAD,
+                                         &pc->tx_thread_id)) {
+               pr_err("tx thread create failed on port %s", pc->name);
+               goto transport_err;
+       }
+
+       if (thread_start_wait(&pc->tx.cd)) {
+               pr_err("tx thread start wait failed for %s", pc->name);
+               goto transport_err;
+       }
+
+       switch (network_option) {
+       case SYNCE_NETWORK_OPT_1:
+               pc->priority_list = O1N_priority;
+               pc->priority_list_count = O1N_PRIORITY_COUNT;
+               break;
+       case SYNCE_NETWORK_OPT_2:
+               pc->priority_list = O2N_priority;
+               pc->priority_list_count = O2N_PRIORITY_COUNT;
+               break;
+       default:
+               pr_err("wrong network option - only 1 and 2 supported");
+               goto transport_err;
+       }
+
+       if (rx_enabled) {
+               rx_init(&pc->rx,
+                       config_get_int(cfg, pc->name, "rx_heartbeat_msec"),
+                       extended_tlv, recover_time, pc->transport, pc->name,
+                       cfg, network_option);
+
+               if (synce_port_ctrl_thread_create(pc->rx.cd.name,
+                                                 &pc->rx, RX_THREAD,
+                                                 &pc->rx_thread_id)) {
+                       pr_err("rx thread create failed on port %s", pc->name);
+                       goto rx_err;
+               }
+               if (thread_start_wait(&pc->rx.cd)) {
+                       pr_err("tx thread start wait failed for %s", pc->name);
+                       goto rx_err;
+               }
+       } else {
+               pc->rx.cd.enabled = 0;
+               pr_debug("rx thread not needed on port %s", pc->name);
+       }
+
+       return 0;
+rx_err:
+       thread_stop_wait(&pc->tx.cd);
+transport_err:
+       synce_transport_delete(pc->transport);
+
+       return -ECHILD;
+}
+
+struct synce_port_ctrl *synce_port_ctrl_create(const char *name)
+{
+       struct synce_port_ctrl *p = NULL;
+
+       if (!name) {
+               pr_err("name not profided in %s", __func__);
+               return NULL;
+       }
+
+       p = malloc(sizeof(struct synce_port_ctrl));
+       if (!p) {
+               pr_err("could not alloc synce_port_ctrl for %s", name);
+               return NULL;
+       }
+
+       memcpy(p->name, name, sizeof(p->name));
+
+       return p;
+}
+
+void synce_port_ctrl_invalidate_rx_ql(struct synce_port_ctrl *pc)
+{
+       pc->rx.last_ql = pc->rx.ql_dnu_val;
+       pc->rx.cd.ql = pc->rx.ql_dnu_val;
+}
diff --git a/synce_port_ctrl.h b/synce_port_ctrl.h
new file mode 100644
index 000000000000..ae022564880f
--- /dev/null
+++ b/synce_port_ctrl.h
@@ -0,0 +1,177 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/**
+ * @file synce_port_ctrl.h
+ * @brief Interface between synce port and socket handling theads, used
+ * for controling data on the wire. Allows acquire incoming data and
+ * submit new outgoing data.
+ * TX thread is always present, RX only if required (internal_input mode).
+ * @note Copyright (C) 2022 Intel Corporation
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2, as published
+ * by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef HAVE_SYNC_PORT_CTRL_H
+#define HAVE_SYNC_PORT_CTRL_H
+#include <stdint.h>
+
+/* Opaque types */
+struct synce_port_ctrl;
+struct config;
+struct synce_msg_ext_ql;
+
+/**
+ * Check if created threads are running.
+ *
+ * @param pc           Questioned instance
+ * @return             1 if true, 0 if false, negative on failure
+ */
+int synce_port_ctrl_running(struct synce_port_ctrl *pc);
+
+/**
+ * Stop threads, deinit given instance.
+ *
+ * @param pc           Managed instance
+ * @return             0 on success, otherwise fault
+ */
+int synce_port_ctrl_destroy(struct synce_port_ctrl *pc);
+
+/**
+ * Check if QL-failed condition is present.
+ *
+ * @param pc           Questioned instance
+ * @return             1 if true, 0 if false, negative on failure
+ */
+int synce_port_ctrl_rx_ql_failed(struct synce_port_ctrl *pc);
+
+/**
+ * Check if Do Not Use QL is present on a port.
+ *
+ * @param pc           Questioned instance
+ * @param dnu          Value to compare against
+ * @return             1 if true, 0 if false, negative on failure
+ */
+int synce_port_ctrl_rx_dnu(struct synce_port_ctrl *pc, uint8_t dnu);
+
+/**
+ * Check if QL has changed on RX.
+ *
+ * @param pc           Questioned instance
+ * @return             1 if true, 0 if false, negative on failure
+ */
+int synce_port_ctrl_rx_ql_changed(struct synce_port_ctrl *pc);
+
+/**
+ * Check if extended TLV was acquired on RX wire.
+ *
+ * @param pc           Questioned instance
+ * @return             1 if true, 0 if false, negative on failure
+ */
+int synce_port_ctrl_rx_ext_tlv(struct synce_port_ctrl *pc);
+
+/**
+ * Acquire last QL on the RX wire.
+ *
+ * @param pc           Questioned instance
+ * @param ql           Returned QL
+ * @return             0 on success, negative on failure
+ */
+int synce_port_ctrl_get_rx_ql(struct synce_port_ctrl *pc, uint8_t *ql);
+
+/**
+ * Acquire last extended QL on the RX wire.
+ *
+ * @param pc           Questioned instance
+ * @param ext_ql       Returned extended QL struct
+ * @return             0 on success, negative on failure
+ */
+int synce_port_ctrl_get_rx_ext_ql(struct synce_port_ctrl *pc,
+                                 struct synce_msg_ext_ql *ext_ql);
+
+/**
+ * Set QL for TX thread.
+ *
+ * @param pc           Managed instance
+ * @param ql           QL to be sent
+ * @return             0 on success, negative on failure
+ */
+int synce_port_ctrl_set_tx_ql(struct synce_port_ctrl *pc, uint8_t ql);
+
+/**
+ * Set extended QL for TX thread.
+ *
+ * @param pc           Managed instance
+ * @param ext_ql       Extended QL to be sent
+ * @return             0 on success, negative on failure
+ */
+int synce_port_ctrl_set_tx_ext_ql(struct synce_port_ctrl *pc,
+                                 struct synce_msg_ext_ql *ext_ql);
+/**
+ * Whenever new QL was set for TX thread, rebuild must be invoked explicitly.
+ *
+ * @param pc           Managed instance
+ * @return             0 on success, negative on failure
+ */
+int synce_port_ctrl_rebuild_tx(struct synce_port_ctrl *pc);
+
+/**
+ * Explicit start sending QL that was set for TX thread, used once init and set
+ * QL are finished.
+ *
+ * @param pc           Managed instance
+ * @return             0 on success, negative on failure
+ */
+int synce_port_ctrl_enable_tx(struct synce_port_ctrl *pc);
+
+/**
+ * Check if sources given port sources are valid, than compare them,
+ * choose the one with higher priority in terms of its received QL.
+ *
+ * @param left                 Port instance for comparison
+ * @param right                        Port instance for comparison
+ * @return                     Pointer to a higher quality input port instance,
+ *                             NULL on failure or equal
+ */
+struct synce_port_ctrl
+*synce_port_ctrl_compare_ql(struct synce_port_ctrl *left,
+                           struct synce_port_ctrl *right);
+
+/**
+ * Initialize given instance with the given config.
+ *
+ * @param pc                   Instance to be initialized
+ * @param cfg                  Configuration of SYNCE type
+ * @param rx_enabled           If RX thread shall also start
+ * @param extended_tlv         If extended tlv was enabled
+ * @param recover_time         What time was set for recovery [s]
+ * @param network_option       Network option, either 0 or 1
+ * @return                     0 on success, otherwise fail
+ */
+int synce_port_ctrl_init(struct synce_port_ctrl *pc, struct config *cfg,
+                        int rx_enabled, int extended_tlv, int recover_time,
+                        int network_option);
+
+/**
+ * Create instance and set name of its port.
+ *
+ * @param name         Port name
+ * @return             Pointer to allocated instance
+ */
+struct synce_port_ctrl *synce_port_ctrl_create(const char *name);
+
+/**
+ * Invalidate QL received in the past.
+ *
+ * @param pc           Port control instance
+ */
+void synce_port_ctrl_invalidate_rx_ql(struct synce_port_ctrl *pc);
+
+#endif /* HAVE_SYNC_PORT_CTRL_H */
-- 
2.34.1



_______________________________________________
Linuxptp-devel mailing list
Linuxptp-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/linuxptp-devel

Reply via email to