server: tests: ci ensure the server is stopped before scenario, and do not quit while the server is listening

This commit is contained in:
Pierrick HYMBERT 2024-02-22 23:18:42 +01:00
parent 8b96bdaf08
commit f820e10fa7

View file

@ -1,8 +1,38 @@
import multiprocessing
import os
import socket
import subprocess
import time
from contextlib import closing
from signal import SIGKILL
def before_scenario(context, scenario):
if is_server_listening("localhost", 8080):
assert False, "Server already started"
def after_scenario(context, scenario):
print("stopping server...")
print(f"stopping server pid={context.server_process.pid} ...")
context.server_process.kill()
# Wait few for socket to be free up
time.sleep(0.05)
# Wait few for socket to free up
time.sleep(0.1)
attempts = 0
while is_server_listening(context.server_fqdn, context.server_port):
print(f"stopping server pid={context.server_process.pid} ...")
os.kill(context.server_process.pid, SIGKILL)
time.sleep(0.5)
attempts += 1
if attempts > 1:
print(f"Server dandling exits, killing all {context.server_path} ...")
process = subprocess.run(['killall', '-9', context.server_path],
stderr=subprocess.PIPE,
universal_newlines=True)
print(process)
def is_server_listening(server_fqdn, server_port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
result = sock.connect_ex((server_fqdn, server_port))
return result == 0