This is an automated email from the ASF dual-hosted git repository. shuaijinchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apisix-python-plugin-runner.git
commit 1d353fd03e912c1d70afb52e21612d671818a367 Author: Janko <[email protected]> AuthorDate: Tue Aug 3 10:43:10 2021 +0800 feat: add socket server module for runner. --- src/runner/socket/server.py | 105 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/src/runner/socket/server.py b/src/runner/socket/server.py new file mode 100644 index 0000000..37e66d4 --- /dev/null +++ b/src/runner/socket/server.py @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import socket +from _thread import start_new_thread +from runner.socket.handle import Handle as A6ServerHandle + + +def runner_protocol_decode(buf): + """ + decode for runner protocol + :param buf: + :return: + """ + if not buf: + return None, "runner protocol undefined." + if len(buf) != 4: + return None, "runner protocol invalid." + + buf = bytearray(buf) + # request buf type + buf_type = buf[0] + buf[0] = 0 + # request buf length + buf_len = int.from_bytes(buf, byteorder="big") + return {"type": buf_type, "len": buf_len}, None + + +def runner_protocol_encode(reps_type, reps_data): + """ + encode for runner protocol + :param reps_type: + :param reps_data: + :return: + """ + reps_len = len(reps_data) + reps_header = reps_len.to_bytes(4, byteorder="big") + reps_header = bytearray(reps_header) + reps_header[0] = reps_type + reps_header = bytes(reps_header) + return reps_header + reps_data + + +def threaded(conn): + while True: + header_buf = conn.recv(4) + protocol, err = runner_protocol_decode(header_buf) + if err: + print(err) + break + + # rpc request type + req_type = protocol.get("type") + # rpc request length + req_len = protocol.get("len") + + req_data = conn.recv(req_len) + + rpc_handler = A6ServerHandle(req_type, req_data) + response = rpc_handler.dispatch() + + reps_type = response.get("type") + reps_data = response.get("data") + reps = runner_protocol_encode(reps_type, reps_data) + + err = conn.sendall(reps) + if err: + print(err) + break + conn.close() + + +class Server: + def __init__(self, socket_address): + if os.path.exists(socket_address): + os.remove(socket_address) + self.socket_address = socket_address + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.bind(socket_address) + self.sock.listen(1024) + print("listening on unix:%s" % socket_address) + + def receive(self): + while True: + conn, address = self.sock.accept() + + start_new_thread(threaded, (conn,)) + + def __del__(self): + self.sock.close() + print("Bye")
