feat: add stress testing tool for dedicated server stability validation

Python-based bot tool that rapidly connects/disconnects to the server
using the full LCE protocol (PreLogin, Login, cipher handshake, identity
tokens, keepalive). Configurable concurrent bots, hold times, burst
joins, movement packets, and duration. Includes batch files for common
test scenarios (basic, aggressive, movement, burst, endurance).
This commit is contained in:
itsRevela
2026-04-10 01:14:11 -05:00
parent 20229fc07b
commit d14e46701c
7 changed files with 741 additions and 0 deletions

View File

@@ -0,0 +1,54 @@
# LCE Server Stress Test
Rapid bot connect/disconnect cycles to stress test server thread safety.
## Requirements
- Python 3.10+
- `pycryptodome` (only if server has cipher enabled)
```
pip install pycryptodome
```
## Usage
```bash
# Basic: 50 cycles, 8 concurrent bots, 2-10s hold time
python stress_test.py 127.0.0.1 19132
# Aggressive: rapid join/leave with short hold times
python stress_test.py 127.0.0.1 19132 --bots 12 --hold 0.5 2 --ramp 0.2
# With movement packets (tests "moved wrongly" code path)
python stress_test.py 127.0.0.1 19132 --move --hold 5 15
# Burst mode: 4 bots join simultaneously
python stress_test.py 127.0.0.1 19132 --burst 4 --hold 1 3
# Run for 5 minutes continuously
python stress_test.py 127.0.0.1 19132 --cycles 0 --duration 300
# Quiet mode (only shows stats, no per-bot messages)
python stress_test.py 127.0.0.1 19132 --quiet
```
## What it tests
- **Player list thread safety**: Rapid add/remove from the players vector
- **Socket write races**: Disconnect while server is mid-write
- **Movement validation**: `--move` flag sends position packets that trigger "moved wrongly" checks
- **Concurrent join/leave**: `--burst` flag joins multiple bots at once
## Options
| Option | Default | Description |
|--------|---------|-------------|
| `--bots N` | 8 | Max concurrent bot connections |
| `--cycles N` | 50 | Total connect/disconnect cycles (0 = infinite) |
| `--hold MIN MAX` | 2 10 | Random hold time range before disconnect (seconds) |
| `--ramp SECS` | 0.5 | Delay between spawning bots |
| `--move` | off | Send movement packets while connected |
| `--burst N` | 1 | Bots to spawn simultaneously per cycle |
| `--duration SECS` | 0 | Time limit (0 = run until cycles complete) |
| `--quiet` | off | Only show periodic stats, not per-bot logs |

View File

@@ -0,0 +1,647 @@
"""
LCE Server Stress Testing Tool
Rapid bot connect/disconnect cycles to stress test thread safety
of the server's player list, socket handling, and movement processing.
Usage:
python stress_test.py <host> <port> [options]
Options:
--bots N Max concurrent bots (default: 8)
--cycles N Number of connect/disconnect cycles (default: 50, 0=infinite)
--hold MIN MAX Hold time range in seconds before disconnect (default: 2 10)
--ramp SECS Delay between spawning each bot (default: 0.5)
--move Bots send movement packets while connected
--burst N Spawn N bots simultaneously (default: 1)
--duration SECS Run for N seconds then stop (default: 0, unlimited)
--quiet Suppress per-bot log messages
"""
import argparse
import logging
import os
import random
import secrets
import socket
import struct
import sys
import threading
import time
from dataclasses import dataclass, field
# Import protocol code from the server-monitor tool.
# Search sibling tools directory first, then the itsRevela tools directory.
_TOOLS = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
_SEARCH_PATHS = [
os.path.join(_TOOLS, "server-monitor"),
os.path.join(os.path.dirname(_TOOLS), "server-monitor"),
os.path.normpath(os.path.join(_TOOLS, "..", "..", "..", "itsRevela", "tools", "server-monitor")),
]
for _p in _SEARCH_PATHS:
if os.path.isdir(_p) and _p not in sys.path:
sys.path.append(_p)
break
from protocol import frame_packet, DataInputStream, DataOutputStream
from packets import (
PRE_LOGIN, LOGIN, KEEP_ALIVE, CUSTOM_PAYLOAD, DISCONNECT,
build_prelogin, build_login, build_keep_alive,
build_custom_payload, parse_prelogin_response,
parse_login_response_stream, DISCONNECT_REASONS,
)
logger = logging.getLogger("stress")
SMALLID_REJECT = 0xFF
CIPHER_KEY_PREFIX = (
b"\xFA\x00\x07"
b"\x00\x4D\x00\x43\x00\x7C\x00\x43\x00\x4B\x00\x65\x00\x79"
b"\x00\x20"
)
CIPHER_KEY_TOTAL = len(CIPHER_KEY_PREFIX) + 32
CIPHER_ON_PATTERN = (
b"\xFA\x00\x06"
b"\x00\x4D\x00\x43\x00\x7C\x00\x43\x00\x4F\x00\x6E"
b"\x00\x00"
)
CIPHER_ACK_CHANNEL = "MC|CAck"
IDENTITY_TOKEN_ISSUE = "MC|CTIssue"
IDENTITY_TOKEN_CHALLENGE = "MC|CTChallenge"
IDENTITY_TOKEN_RESPONSE = "MC|CTResponse"
def _build_channel_pattern(channel: str) -> bytes:
result = b"\xFA"
result += struct.pack(">h", len(channel))
for ch in channel:
result += struct.pack(">H", ord(ch))
return result
# ---------------------------------------------------------------------------
# CipherState (AES-CTR, optional dependency)
# ---------------------------------------------------------------------------
class CipherState:
def __init__(self, key: bytes, iv: bytes):
from Crypto.Cipher import AES
from Crypto.Util import Counter
ctr = Counter.new(128, initial_value=int.from_bytes(iv, "big"))
self._cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
def process(self, data: bytes) -> bytes:
return self._cipher.encrypt(data)
# ---------------------------------------------------------------------------
# Statistics tracker
# ---------------------------------------------------------------------------
@dataclass
class Stats:
lock: threading.Lock = field(default_factory=threading.Lock)
connects: int = 0
disconnects: int = 0
rejections: int = 0
errors: int = 0
moves_sent: int = 0
keepalives_sent: int = 0
start_time: float = 0.0
def summary(self) -> str:
elapsed = time.time() - self.start_time if self.start_time else 0
with self.lock:
return (
f"[{elapsed:.0f}s] "
f"connects={self.connects} "
f"disconnects={self.disconnects} "
f"rejections={self.rejections} "
f"errors={self.errors} "
f"moves={self.moves_sent} "
f"keepalives={self.keepalives_sent}"
)
# ---------------------------------------------------------------------------
# Movement packet builder
# ---------------------------------------------------------------------------
MOVE_PLAYER = 0x0D # MovePlayerPacket ID
def build_move_player(x: float, y: float, z: float,
yaw: float, pitch: float, on_ground: bool) -> bytes:
dos = DataOutputStream()
dos.write_double(x)
dos.write_double(y + 1.62) # stance
dos.write_double(y)
dos.write_double(z)
dos.write_float(yaw)
dos.write_float(pitch)
dos.write_bool(on_ground)
return dos.getvalue()
# ---------------------------------------------------------------------------
# Single bot stress connection
# ---------------------------------------------------------------------------
class StressBot:
"""A bot that connects, optionally moves around, then disconnects."""
def __init__(self, name: str, host: str, port: int, xuid: int,
hold_min: float, hold_max: float, send_moves: bool,
stats: Stats, quiet: bool):
self.name = name
self.host = host
self.port = port
self.xuid = xuid
self.hold_min = hold_min
self.hold_max = hold_max
self.send_moves = send_moves
self.stats = stats
self.quiet = quiet
self._sock: socket.socket | None = None
self._recv_buf = bytearray()
self._scan_buf = bytearray()
self._recv_cipher: CipherState | None = None
self._send_cipher: CipherState | None = None
self._cipher_key = b""
self._cipher_iv = b""
self._identity_token = b""
self._entity_id = 0
self._running = True
def log(self, msg: str) -> None:
if not self.quiet:
logger.info(msg)
def run(self) -> bool:
"""Run one connect-hold-disconnect cycle. Returns True on success."""
try:
return self._do_cycle()
except Exception as e:
self.log(f"[{self.name}] error: {e}")
with self.stats.lock:
self.stats.errors += 1
return False
finally:
if self._sock:
try:
self._sock.close()
except OSError:
pass
self._sock = None
def _do_cycle(self) -> bool:
# Connect
self._sock = socket.create_connection((self.host, self.port), timeout=10)
self._sock.settimeout(5.0)
# SmallID
first_byte = self._recv_exact(1)
if first_byte[0] == SMALLID_REJECT:
reject_data = self._recv_exact(5)
reason_id = struct.unpack(">i", reject_data[1:5])[0]
reason = DISCONNECT_REASONS.get(reason_id, f"Unknown({reason_id})")
self.log(f"[{self.name}] rejected: {reason}")
with self.stats.lock:
self.stats.rejections += 1
return False
small_id = first_byte[0]
self.log(f"[{self.name}] assigned smallId={small_id}")
# PreLogin
self._send_packet(PRE_LOGIN, build_prelogin(self.name))
# Read PreLogin response
self._sock.settimeout(5.0)
if not self._read_until_packet(PRE_LOGIN, timeout=5.0):
self.log(f"[{self.name}] no PreLogin response")
with self.stats.lock:
self.stats.errors += 1
return False
# Login
self._send_packet(LOGIN, build_login(self.name, self.xuid))
# Read Login response
if not self._read_until_packet(LOGIN, timeout=5.0):
self.log(f"[{self.name}] no Login response")
with self.stats.lock:
self.stats.errors += 1
return False
with self.stats.lock:
self.stats.connects += 1
self.log(f"[{self.name}] connected (entityId={self._entity_id})")
# Cipher scan (3 second window)
self._sock.settimeout(1.0)
self._do_cipher_scan()
# Hold phase: stay connected, send keepalives and optional movement
hold_time = random.uniform(self.hold_min, self.hold_max)
hold_end = time.time() + hold_time
last_keepalive = time.time()
keepalive_counter = 0
move_x, move_z = random.uniform(-50, 50), random.uniform(-50, 50)
move_y = 65.0
while time.time() < hold_end and self._running:
# Drain incoming data
try:
chunk = self._sock.recv(65536)
if not chunk:
break
self._recv_buf.extend(chunk)
self._drain_frames()
except socket.timeout:
pass
except OSError:
break
now = time.time()
# Keepalive every 10s
if now - last_keepalive >= 10.0:
keepalive_counter += 1
self._send_packet(KEEP_ALIVE, build_keep_alive(keepalive_counter))
with self.stats.lock:
self.stats.keepalives_sent += 1
last_keepalive = now
# Movement packets every 50ms
if self.send_moves:
move_x += random.uniform(-0.5, 0.5)
move_z += random.uniform(-0.5, 0.5)
yaw = random.uniform(0, 360)
self._send_packet(MOVE_PLAYER,
build_move_player(move_x, move_y, move_z, yaw, 0.0, True))
with self.stats.lock:
self.stats.moves_sent += 1
time.sleep(0.05)
# Disconnect (just close the socket)
self.log(f"[{self.name}] disconnecting after {hold_time:.1f}s hold")
with self.stats.lock:
self.stats.disconnects += 1
return True
def _do_cipher_scan(self) -> None:
"""Scan for cipher handshake for up to 3 seconds."""
scan_start = time.time()
scan_buf = bytearray()
while time.time() - scan_start < 0.5 and self._running:
try:
chunk = self._sock.recv(65536)
if not chunk:
return
self._recv_buf.extend(chunk)
except socket.timeout:
pass
except OSError:
return
# Also drain framed packets
self._drain_frames()
# Accumulate raw data for cipher pattern scanning
scan_buf.extend(self._recv_buf)
# Look for MC|CKey
if not self._cipher_key:
idx = scan_buf.find(CIPHER_KEY_PREFIX)
if idx >= 0 and idx + CIPHER_KEY_TOTAL <= len(scan_buf):
key_start = idx + len(CIPHER_KEY_PREFIX)
key_data = bytes(scan_buf[key_start:key_start + 32])
self._cipher_key = key_data[:16]
self._cipher_iv = key_data[16:32]
self.log(f"[{self.name}] got cipher key")
self._send_packet(CUSTOM_PAYLOAD,
build_custom_payload(CIPHER_ACK_CHANNEL))
iv_send = bytearray(self._cipher_iv)
iv_send[0] ^= 0x80
self._send_cipher = CipherState(self._cipher_key, bytes(iv_send))
del scan_buf[:idx + CIPHER_KEY_TOTAL]
# Look for MC|COn
if self._cipher_key and not self._recv_cipher:
idx = scan_buf.find(CIPHER_ON_PATTERN)
if idx >= 0:
self._recv_cipher = CipherState(self._cipher_key, self._cipher_iv)
self.log(f"[{self.name}] cipher active")
return
if not self._cipher_key:
self.log(f"[{self.name}] no cipher (server has it disabled)")
def _read_until_packet(self, expected_id: int, timeout: float) -> bool:
"""Read frames until we get the expected packet ID or timeout."""
deadline = time.time() + timeout
while time.time() < deadline:
try:
chunk = self._sock.recv(65536)
if not chunk:
return False
self._recv_buf.extend(chunk)
except socket.timeout:
continue
except OSError:
return False
while len(self._recv_buf) >= 4:
payload_len = struct.unpack(">I", self._recv_buf[:4])[0]
total = 4 + payload_len
if len(self._recv_buf) < total:
break
raw_payload = bytes(self._recv_buf[4:total])
del self._recv_buf[:total]
if self._recv_cipher:
raw_payload = self._recv_cipher.process(raw_payload)
if not raw_payload:
continue
packet_id = raw_payload[0]
data = raw_payload[1:]
if packet_id == expected_id:
if packet_id == PRE_LOGIN:
try:
parsed = parse_prelogin_response(data)
self.log(f"[{self.name}] PreLogin: {parsed['player_count']} online")
except Exception:
pass
return True
elif packet_id == LOGIN:
try:
dis = DataInputStream(data)
parsed = parse_login_response_stream(dis)
self._entity_id = parsed["entity_id"]
except Exception:
pass
return True
elif packet_id == DISCONNECT:
try:
dis = DataInputStream(data)
reason_id = dis.read_int()
reason = DISCONNECT_REASONS.get(reason_id, f"Unknown({reason_id})")
self.log(f"[{self.name}] kicked during login: {reason}")
except Exception:
pass
return False
return False
def _drain_frames(self) -> None:
"""Drain and discard all complete frames from recv buffer."""
while len(self._recv_buf) >= 4:
payload_len = struct.unpack(">I", self._recv_buf[:4])[0]
total = 4 + payload_len
if len(self._recv_buf) < total:
break
raw_payload = bytes(self._recv_buf[4:total])
del self._recv_buf[:total]
if self._recv_cipher:
raw_payload = self._recv_cipher.process(raw_payload)
if not raw_payload:
continue
packet_id = raw_payload[0]
data = raw_payload[1:]
# Handle identity tokens
if packet_id == CUSTOM_PAYLOAD:
self._handle_custom_payload(data)
def _handle_custom_payload(self, data: bytes) -> None:
"""Handle identity token packets."""
try:
dis = DataInputStream(data)
channel = dis.read_utf()
length = dis.read_short()
payload = dis.read_raw(length) if length > 0 else b""
if channel == IDENTITY_TOKEN_ISSUE and len(payload) == 32:
self._identity_token = payload
self.log(f"[{self.name}] got identity token")
elif channel == IDENTITY_TOKEN_CHALLENGE:
if self._identity_token and len(self._identity_token) == 32:
self._send_packet(CUSTOM_PAYLOAD,
build_custom_payload(IDENTITY_TOKEN_RESPONSE, self._identity_token))
else:
self._send_packet(CUSTOM_PAYLOAD,
build_custom_payload(IDENTITY_TOKEN_RESPONSE))
self.log(f"[{self.name}] answered identity challenge")
except Exception:
pass
def _send_packet(self, packet_id: int, payload: bytes) -> None:
raw = frame_packet(packet_id, payload)
if self._send_cipher:
header = raw[:4]
encrypted = self._send_cipher.process(raw[4:])
raw = header + encrypted
try:
if self._sock:
self._sock.sendall(raw)
except OSError:
pass
def _recv_exact(self, n: int) -> bytes:
data = b""
while len(data) < n:
chunk = self._sock.recv(n - len(data))
if not chunk:
raise ConnectionError("Connection closed during recv")
data += chunk
return data
# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------
class StressTestRunner:
def __init__(self, host: str, port: int, max_bots: int, cycles: int,
hold_min: float, hold_max: float, ramp: float,
send_moves: bool, burst: int, duration: float,
quiet: bool):
self.host = host
self.port = port
self.max_bots = max_bots
self.cycles = cycles
self.hold_min = hold_min
self.hold_max = hold_max
self.ramp = ramp
self.send_moves = send_moves
self.burst = burst
self.duration = duration
self.quiet = quiet
self.stats = Stats()
self._active = threading.Semaphore(max_bots)
self._stop = threading.Event()
self._bot_counter = 0
self._counter_lock = threading.Lock()
def run(self) -> None:
self.stats.start_time = time.time()
logger.info(f"Stress test: {self.host}:{self.port}")
logger.info(f" max_bots={self.max_bots} cycles={self.cycles or 'infinite'} "
f"hold={self.hold_min}-{self.hold_max}s ramp={self.ramp}s "
f"burst={self.burst} moves={self.send_moves}")
if self.duration:
logger.info(f" duration={self.duration}s")
# Status printer
status_thread = threading.Thread(target=self._print_status, daemon=True)
status_thread.start()
# Duration timer
if self.duration > 0:
timer = threading.Timer(self.duration, self._stop.set)
timer.daemon = True
timer.start()
cycle = 0
try:
while not self._stop.is_set():
if self.cycles > 0 and cycle >= self.cycles:
break
# Spawn a burst of bots
threads = []
for _ in range(self.burst):
if self._stop.is_set():
break
if self.cycles > 0 and cycle >= self.cycles:
break
self._active.acquire()
if self._stop.is_set():
self._active.release()
break
with self._counter_lock:
self._bot_counter += 1
bot_name = f"StressBot{self._bot_counter}"
xuid = secrets.randbits(62) | (1 << 32)
bot = StressBot(
name=bot_name,
host=self.host,
port=self.port,
xuid=xuid,
hold_min=self.hold_min,
hold_max=self.hold_max,
send_moves=self.send_moves,
stats=self.stats,
quiet=self.quiet,
)
t = threading.Thread(
target=self._run_bot, args=(bot,), daemon=True)
t.start()
threads.append(t)
cycle += 1
# Ramp delay between bursts
if self.ramp > 0 and not self._stop.is_set():
self._stop.wait(self.ramp)
except KeyboardInterrupt:
logger.info("\nInterrupted by user")
self._stop.set()
# Wait for active bots to finish
logger.info("Waiting for active bots to finish...")
for _ in range(self.max_bots):
self._active.acquire(timeout=30)
logger.info(f"\nFinal: {self.stats.summary()}")
def _run_bot(self, bot: StressBot) -> None:
try:
bot.run()
finally:
self._active.release()
def _print_status(self) -> None:
while not self._stop.is_set():
self._stop.wait(5.0)
if not self._stop.is_set():
logger.info(self.stats.summary())
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="LCE Server Stress Testing Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("host", nargs="?", default="127.0.0.1",
help="Server hostname or IP (default: 127.0.0.1)")
parser.add_argument("port", nargs="?", type=int, default=19132,
help="Server port (default: 19132)")
parser.add_argument("--bots", type=int, default=8,
help="Max concurrent bots (default: 8)")
parser.add_argument("--cycles", type=int, default=50,
help="Connect/disconnect cycles (default: 50, 0=infinite)")
parser.add_argument("--hold", type=float, nargs=2, default=[2.0, 10.0],
metavar=("MIN", "MAX"),
help="Hold time range in seconds (default: 2 10)")
parser.add_argument("--ramp", type=float, default=0.5,
help="Delay between spawning bots (default: 0.5)")
parser.add_argument("--move", action="store_true",
help="Bots send movement packets while connected")
parser.add_argument("--burst", type=int, default=1,
help="Bots to spawn simultaneously (default: 1)")
parser.add_argument("--duration", type=float, default=0,
help="Run for N seconds then stop (default: unlimited)")
parser.add_argument("--quiet", action="store_true",
help="Suppress per-bot log messages")
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(message)s",
datefmt="%H:%M:%S",
)
runner = StressTestRunner(
host=args.host,
port=args.port,
max_bots=args.bots,
cycles=args.cycles,
hold_min=args.hold[0],
hold_max=args.hold[1],
ramp=args.ramp,
send_moves=args.move,
burst=args.burst,
duration=args.duration,
quiet=args.quiet,
)
runner.run()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,8 @@
@echo off
REM Aggressive: short holds, fast ramp - hammers the players.erase() path
set /p HOST="Server IP [127.0.0.1]: " || set HOST=127.0.0.1
set /p PORT="Server Port [19132]: " || set PORT=19132
if "%HOST%"=="" set HOST=127.0.0.1
if "%PORT%"=="" set PORT=19132
python "%~dp0stress_test.py" %HOST% %PORT% --bots 12 --hold 0.5 2 --ramp 0.2
pause

View File

@@ -0,0 +1,8 @@
@echo off
REM Basic connect/disconnect churn - 50 cycles, 8 concurrent bots
set /p HOST="Server IP [127.0.0.1]: " || set HOST=127.0.0.1
set /p PORT="Server Port [19132]: " || set PORT=19132
if "%HOST%"=="" set HOST=127.0.0.1
if "%PORT%"=="" set PORT=19132
python "%~dp0stress_test.py" %HOST% %PORT%
pause

View File

@@ -0,0 +1,8 @@
@echo off
REM Burst joins - 4 bots connect simultaneously
set /p HOST="Server IP [127.0.0.1]: " || set HOST=127.0.0.1
set /p PORT="Server Port [19132]: " || set PORT=19132
if "%HOST%"=="" set HOST=127.0.0.1
if "%PORT%"=="" set PORT=19132
python "%~dp0stress_test.py" %HOST% %PORT% --burst 4 --hold 1 3
pause

View File

@@ -0,0 +1,8 @@
@echo off
REM Endurance: run all patterns for 5 minutes continuously
set /p HOST="Server IP [127.0.0.1]: " || set HOST=127.0.0.1
set /p PORT="Server Port [25565]: " || set PORT=25565
if "%HOST%"=="" set HOST=127.0.0.1
if "%PORT%"=="" set PORT=19132
python "%~dp0stress_test.py" %HOST% %PORT% --cycles 0 --duration 300 --bots 512 --burst 32 --move --hold 10 30 --ramp 0.1 --quiet
pause

View File

@@ -0,0 +1,8 @@
@echo off
REM Movement packets - triggers the "moved wrongly" code path
set /p HOST="Server IP [127.0.0.1]: " || set HOST=127.0.0.1
set /p PORT="Server Port [19132]: " || set PORT=19132
if "%HOST%"=="" set HOST=127.0.0.1
if "%PORT%"=="" set PORT=19132
python "%~dp0stress_test.py" %HOST% %PORT% --move --hold 5 15
pause