# Copyright (c) 2023, Bjarki Arge Andreasen # SPDX-License-Identifier: Apache-2.0 import socket import threading import select class TEUDPEcho(): def __init__(self): self.running = True self.thread = threading.Thread(target=self._target_) def start(self): self.thread.start() def stop(self): self.running = False self.thread.join(1) def _target_(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setblocking(False) sock.bind(('0.0.0.0', 7780)) while self.running: try: ready_to_read, _, _ = select.select([sock], [sock], [], 0.5) if not ready_to_read: continue data, address = sock.recvfrom(4096) print(f'udp echo {len(data)} bytes to {address[0]}:{address[1]}') sock.sendto(data, address) except Exception as e: print(e) break sock.close()