perf(fourkit): server gc, fast-path event dispatch, tps probe

This commit is contained in:
itsRevela
2026-04-24 21:31:06 -05:00
parent 6db68bda06
commit b64b1f17db
11 changed files with 486 additions and 57 deletions

View File

@@ -22,60 +22,80 @@ internal sealed class EventDispatcher
public int CompareTo(RegisteredHandler other) => Priority.CompareTo(other.Priority);
}
private readonly Dictionary<Type, List<RegisteredHandler>> _handlers = new();
private readonly object _lock = new();
// Snapshot-on-write: writers swap _handlers atomically; Fire reads it lock-free.
private volatile Dictionary<Type, RegisteredHandler[]> _handlers = new();
private readonly object _writeLock = new();
// Fired when an event type gains its first handler.
internal Action<Type>? OnSubscriptionChanged;
public void Register(Listener listener)
{
var methods = listener.GetType().GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance);
lock (_lock)
List<(Type eventType, RegisteredHandler handler)>? pending = null;
foreach (var method in methods)
{
foreach (var method in methods)
var attr = method.GetCustomAttribute<Event.EventHandlerAttribute>();
if (attr == null)
continue;
var parameters = method.GetParameters();
if (parameters.Length != 1)
{
var attr = method.GetCustomAttribute<Event.EventHandlerAttribute>();
if (attr == null)
continue;
var parameters = method.GetParameters();
if (parameters.Length != 1)
{
Console.WriteLine($"[FourKit] Warning: @EventHandler method {method.Name} must have exactly 1 parameter, skipping.");
continue;
}
var eventType = parameters[0].ParameterType;
if (!typeof(Event.Event).IsAssignableFrom(eventType))
{
Console.WriteLine($"[FourKit] Warning: @EventHandler method {method.Name} parameter must extend Event, skipping.");
continue;
}
if (!_handlers.TryGetValue(eventType, out var list))
{
list = new List<RegisteredHandler>();
_handlers[eventType] = list;
}
list.Add(new RegisteredHandler(listener, method, attr.Priority, attr.IgnoreCancelled));
_handlers[eventType] = list.OrderBy(h => h.Priority).ToList();
Console.WriteLine($"[FourKit] Warning: @EventHandler method {method.Name} must have exactly 1 parameter, skipping.");
continue;
}
var eventType = parameters[0].ParameterType;
if (!typeof(Event.Event).IsAssignableFrom(eventType))
{
Console.WriteLine($"[FourKit] Warning: @EventHandler method {method.Name} parameter must extend Event, skipping.");
continue;
}
pending ??= new List<(Type, RegisteredHandler)>();
pending.Add((eventType, new RegisteredHandler(listener, method, attr.Priority, attr.IgnoreCancelled)));
}
if (pending == null) return;
HashSet<Type> newlySubscribed = new();
lock (_writeLock)
{
var newDict = new Dictionary<Type, RegisteredHandler[]>(_handlers);
foreach (var (eventType, handler) in pending)
{
bool hadAny = newDict.TryGetValue(eventType, out var existing);
existing ??= Array.Empty<RegisteredHandler>();
// OrderBy is stable; Array.Sort is not.
var combined = existing.Append(handler).OrderBy(h => h.Priority).ToArray();
newDict[eventType] = combined;
if (!hadAny) newlySubscribed.Add(eventType);
}
_handlers = newDict;
}
if (OnSubscriptionChanged != null)
{
foreach (var t in newlySubscribed)
OnSubscriptionChanged(t);
}
}
public void Fire(Event.Event evt)
{
List<RegisteredHandler>? handlers;
lock (_lock)
{
if (!_handlers.TryGetValue(evt.GetType(), out handlers))
return;
handlers = new List<RegisteredHandler>(handlers);
}
var snapshot = _handlers;
if (!snapshot.TryGetValue(evt.GetType(), out var handlers))
return;
var cancellable = evt as Cancellable;
foreach (var handler in handlers)
for (int i = 0; i < handlers.Length; i++)
{
ref readonly var handler = ref handlers[i];
if (handler.IgnoreCancelled && cancellable != null && cancellable.isCancelled())
continue;
@@ -89,4 +109,6 @@ internal sealed class EventDispatcher
}
}
}
internal bool IsSubscribed(Type eventType) => _handlers.ContainsKey(eventType);
}

View File

@@ -11,11 +11,68 @@ using Minecraft.Server.FourKit.Plugin;
/// </summary>
public static class FourKit
{
private static readonly EventDispatcher _dispatcher = new();
private static readonly EventDispatcher _dispatcher;
private static readonly Dictionary<string, Player> _players = new(StringComparer.OrdinalIgnoreCase);
private static readonly Dictionary<int, Player> _playersByEntityId = new();
private static readonly object _playerLock = new();
// Must match HandlerKind in FourKitNatives.h.
private enum HandlerKind
{
ChunkLoad = 0,
ChunkUnload = 1,
PlayerMove = 2,
}
private static uint _handlerMask;
private static readonly object _handlerMaskLock = new();
static FourKit()
{
_dispatcher = new EventDispatcher();
_dispatcher.OnSubscriptionChanged = OnEventSubscribed;
}
private static HandlerKind? MapEventTypeToHandlerKind(Type eventType)
{
if (eventType == typeof(Event.World.ChunkLoadEvent)) return HandlerKind.ChunkLoad;
if (eventType == typeof(Event.World.ChunkUnloadEvent)) return HandlerKind.ChunkUnload;
if (eventType == typeof(Event.Player.PlayerMoveEvent)) return HandlerKind.PlayerMove;
return null;
}
private static void OnEventSubscribed(Type eventType)
{
var kind = MapEventTypeToHandlerKind(eventType);
if (kind == null) return;
lock (_handlerMaskLock)
{
uint newMask = _handlerMask | (1u << (int)kind.Value);
if (newMask == _handlerMask) return;
_handlerMask = newMask;
NativeBridge.SetHandlerMask?.Invoke(_handlerMask);
}
}
internal static void ResyncHandlerMask()
{
lock (_handlerMaskLock)
{
NativeBridge.SetHandlerMask?.Invoke(_handlerMask);
}
}
/// <summary>
/// Gets the current server tick count. Increments once per server tick
/// (~20 per second under nominal load). Useful for measuring TPS by
/// sampling the delta against wall clock time.
/// </summary>
public static int getServerTick()
{
return NativeBridge.GetServerTickCount?.Invoke() ?? 0;
}
internal const int MAX_CHAT_LENGTH = 123;
private static readonly Dictionary<int, World> _worldsByDimId = new();

View File

@@ -160,4 +160,32 @@ public static partial class FourKitHost
ServerLog.Error("fourkit", $"SetWorldEntityCallbacks error: {ex}");
}
}
[UnmanagedCallersOnly]
public static void SetSubscriptionCallbacks(IntPtr setHandlerMask)
{
try
{
NativeBridge.SetSubscriptionCallbacks(setHandlerMask);
// Flush the mask accumulated during plugin onEnable.
FourKit.ResyncHandlerMask();
}
catch (Exception ex)
{
ServerLog.Error("fourkit", $"SetSubscriptionCallbacks error: {ex}");
}
}
[UnmanagedCallersOnly]
public static void SetServerCallbacks(IntPtr getServerTickCount)
{
try
{
NativeBridge.SetServerCallbacks(getServerTickCount);
}
catch (Exception ex)
{
ServerLog.Error("fourkit", $"SetServerCallbacks error: {ex}");
}
}
}

View File

@@ -7,5 +7,7 @@
<AssemblyName>Minecraft.Server.FourKit</AssemblyName>
<EnableDynamicLoading>true</EnableDynamicLoading>
<BaseOutputPath>bin</BaseOutputPath>
<ServerGarbageCollection>true</ServerGarbageCollection>
<ConcurrentGarbageCollection>true</ConcurrentGarbageCollection>
</PropertyGroup>
</Project>

View File

@@ -236,6 +236,15 @@ internal static class NativeBridge
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
internal delegate void NativeSetBiomeIdDelegate(int dimId, int x, int z, int biomeId);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
internal delegate void NativeSetHandlerMaskDelegate(uint mask);
internal static NativeSetHandlerMaskDelegate? SetHandlerMask;
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
internal delegate int NativeGetServerTickCountDelegate();
internal static NativeGetServerTickCountDelegate? GetServerTickCount;
internal static NativeDamageDelegate? DamagePlayer;
internal static NativeSetHealthDelegate? SetPlayerHealth;
internal static NativeTeleportDelegate? TeleportPlayer;
@@ -439,4 +448,14 @@ internal static class NativeBridge
GetBiomeId = Marshal.GetDelegateForFunctionPointer<NativeGetBiomeIdDelegate>(getBiomeId);
SetBiomeId = Marshal.GetDelegateForFunctionPointer<NativeSetBiomeIdDelegate>(setBiomeId);
}
internal static void SetSubscriptionCallbacks(IntPtr setHandlerMask)
{
SetHandlerMask = Marshal.GetDelegateForFunctionPointer<NativeSetHandlerMaskDelegate>(setHandlerMask);
}
internal static void SetServerCallbacks(IntPtr getServerTickCount)
{
GetServerTickCount = Marshal.GetDelegateForFunctionPointer<NativeGetServerTickCountDelegate>(getServerTickCount);
}
}

View File

@@ -106,6 +106,8 @@ typedef int(__stdcall *fn_fire_block_from_to)(int dimId, int fromX, int fromY, i
typedef void(__stdcall *fn_set_chunk_callbacks)(void *isChunkLoaded, void *loadChunk, void *unloadChunk, void *getLoadedChunks, void *isChunkInUse, void *getChunkSnapshot, void *unloadChunkRequest, void *regenerateChunk, void *refreshChunk);
typedef void(__stdcall *fn_set_block_info_callbacks)(void *getSkyLight, void *getBlockLight, void *getBiomeId, void *setBiomeId);
typedef void(__stdcall *fn_set_world_entity_callbacks)(void *getWorldEntities, void *getChunkEntities);
typedef void(__stdcall *fn_set_subscription_callbacks)(void *setHandlerMask);
typedef void(__stdcall *fn_set_server_callbacks)(void *getServerTickCount);
typedef void(__stdcall *fn_fire_chunk_load)(int dimId, int chunkX, int chunkZ, int isNewChunk);
typedef int(__stdcall *fn_fire_chunk_unload)(int dimId, int chunkX, int chunkZ);
@@ -168,6 +170,8 @@ static fn_fire_block_from_to s_managedFireBlockFromTo = nullptr;
static fn_set_chunk_callbacks s_managedSetChunkCallbacks = nullptr;
static fn_set_block_info_callbacks s_managedSetBlockInfoCallbacks = nullptr;
static fn_set_world_entity_callbacks s_managedSetWorldEntityCallbacks = nullptr;
static fn_set_subscription_callbacks s_managedSetSubscriptionCallbacks = nullptr;
static fn_set_server_callbacks s_managedSetServerCallbacks = nullptr;
static fn_fire_chunk_load s_managedFireChunkLoad = nullptr;
static fn_fire_chunk_unload s_managedFireChunkUnload = nullptr;
@@ -255,6 +259,8 @@ void Initialize()
{L"SetChunkCallbacks", (void **)&s_managedSetChunkCallbacks},
{L"SetBlockInfoCallbacks", (void **)&s_managedSetBlockInfoCallbacks},
{L"SetWorldEntityCallbacks", (void **)&s_managedSetWorldEntityCallbacks},
{L"SetSubscriptionCallbacks", (void **)&s_managedSetSubscriptionCallbacks},
{L"SetServerCallbacks", (void **)&s_managedSetServerCallbacks},
{L"FireChunkLoad", (void **)&s_managedFireChunkLoad},
{L"FireChunkUnload", (void **)&s_managedFireChunkUnload},
};
@@ -376,6 +382,12 @@ void Initialize()
(void *)&NativeGetWorldEntities,
(void *)&NativeGetChunkEntities);
s_managedSetSubscriptionCallbacks(
(void *)&NativeSetHandlerMask);
s_managedSetServerCallbacks(
(void *)&NativeGetServerTickCount);
LogInfo("fourkit", "FourKit initialized successfully.");
}
@@ -521,8 +533,12 @@ bool FirePlayerMove(int entityId,
double toX, double toY, double toZ,
double *outToX, double *outToY, double *outToZ)
{
if (!s_initialized || !s_managedFireMove)
// Caller reads outTo* unconditionally; init on every early-return.
if (!s_initialized || !s_managedFireMove || !HasHandlers(kHandlerKind_PlayerMove))
{
*outToX = toX;
*outToY = toY;
*outToZ = toZ;
return false;
}
@@ -1059,6 +1075,8 @@ void FireChunkLoad(int dimId, int chunkX, int chunkZ, bool isNewChunk)
{
if (!s_initialized || !s_managedFireChunkLoad)
return;
if (!HasHandlers(kHandlerKind_ChunkLoad))
return;
s_managedFireChunkLoad(dimId, chunkX, chunkZ, isNewChunk ? 1 : 0);
}
@@ -1066,6 +1084,8 @@ bool FireChunkUnload(int dimId, int chunkX, int chunkZ)
{
if (!s_initialized || !s_managedFireChunkUnload)
return false;
if (!HasHandlers(kHandlerKind_ChunkUnload))
return false;
return s_managedFireChunkUnload(dimId, chunkX, chunkZ) != 0;
}
} // namespace FourKitBridge

View File

@@ -3,6 +3,7 @@
#include "Common/StringUtils.h"
#include "stdafx.h"
#include <atomic>
#include <string>
#include <vector>
@@ -51,6 +52,8 @@
namespace
{
std::atomic<uint32_t> g_handlerMask{0};
static shared_ptr<ServerPlayer> FindPlayer(int entityId)
{
PlayerList *list = MinecraftServer::getPlayerList();
@@ -109,6 +112,24 @@ class VirtualContainer : public SimpleContainer
namespace FourKitBridge
{
void __cdecl NativeSetHandlerMask(uint32_t mask)
{
g_handlerMask.store(mask, std::memory_order_release);
}
bool HasHandlers(int kind)
{
if (kind < 0 || kind >= 32) return false;
return (g_handlerMask.load(std::memory_order_acquire) & (1u << kind)) != 0;
}
int __cdecl NativeGetServerTickCount()
{
MinecraftServer *srv = MinecraftServer::getInstance();
return srv ? srv->tickCount : 0;
}
void __cdecl NativeDamagePlayer(int entityId, float amount)
{
auto player = FindPlayer(entityId);

View File

@@ -1,8 +1,21 @@
#pragma once
#include <cstdint>
namespace FourKitBridge
{
// Must match HandlerKind in FourKit.cs.
enum HandlerKind : int {
kHandlerKind_ChunkLoad = 0,
kHandlerKind_ChunkUnload = 1,
kHandlerKind_PlayerMove = 2,
};
void __cdecl NativeSetHandlerMask(uint32_t mask);
bool HasHandlers(int kind);
int __cdecl NativeGetServerTickCount();
// core
void __cdecl NativeDamagePlayer(int entityId, float amount);
void __cdecl NativeSetPlayerHealth(int entityId, float health);

View File

@@ -36,16 +36,21 @@ public class FourKitTestPlugin : ServerPlugin
_logPath = ResolveLogPath(serverDirectory, dataDirectory);
Log("FourKitTestPlugin enabled.");
Log($"Plugin log file: {_logPath}");
FourKit.addListener(new ChunkEventLogger());
// ChunkEventLogger is intentionally NOT registered here. Subscribing
// to chunk events flips the C++ HasHandlers mask bit, which disables
// the no-listener fast-path. Use /fktest hookchunks to register it
// when you want to measure dispatch overhead specifically.
TpsProbe.Start();
var cmd = FourKit.getCommand("fktest");
cmd.setDescription("FourKit API smoke tests.");
cmd.setUsage("/fktest <help|world|chunks|snapshot|entities|loadchunk|enderchest|disenchant|events|setblock|chatcolor>");
cmd.setUsage("/fktest <help|world|chunks|snapshot|entities|loadchunk|enderchest|disenchant|events|setblock|chatcolor|tps|scatter|hookchunks|watchchunks>");
cmd.setExecutor(new TestExecutor());
}
public override void onDisable()
{
TpsProbe.Stop();
Log("FourKitTestPlugin disabled.");
}
@@ -54,6 +59,9 @@ public class FourKitTestPlugin : ServerPlugin
internal static void IncChunkLoad() => Interlocked.Increment(ref _chunkLoadCount);
internal static void IncChunkUnload() => Interlocked.Increment(ref _chunkUnloadCount);
internal static volatile bool WatchChunks = false;
internal static volatile bool ChunkListenerHooked = false;
/// <summary>
/// Writes a line both to the live server console and to a persistent log
/// file so test results are recoverable after the server window closes.
@@ -105,12 +113,93 @@ public class FourKitTestPlugin : ServerPlugin
}
}
internal static class TpsProbe
{
private static readonly object _lock = new();
private static readonly LinkedList<(double elapsed, int tick)> _samples = new();
private static System.Threading.Timer? _timer;
private static System.Diagnostics.Stopwatch? _sw;
public static void Start()
{
_sw = System.Diagnostics.Stopwatch.StartNew();
// Seed an initial sample so a /fktest tps in the first second is meaningful.
Sample();
_timer = new System.Threading.Timer(_ => Sample(), null, 1000, 1000);
}
public static void Stop()
{
_timer?.Dispose();
_timer = null;
_sw?.Stop();
_sw = null;
lock (_lock) _samples.Clear();
}
private static void Sample()
{
var sw = _sw;
if (sw == null) return;
try
{
int tick = FourKit.getServerTick();
double elapsed = sw.Elapsed.TotalSeconds;
lock (_lock)
{
_samples.AddLast((elapsed, tick));
// Keep a 65s window so the 60s average has full data.
while (_samples.Count > 66) _samples.RemoveFirst();
}
}
catch (Exception ex)
{
Console.WriteLine($"[fkplugin] TpsProbe sample error: {ex.Message}");
}
}
public static (int samples, double tps1, double tps5, double tps30, double tps60) Read()
{
(double elapsed, int tick)[] arr;
lock (_lock)
{
if (_samples.Count < 2) return (_samples.Count, 0, 0, 0, 0);
arr = _samples.ToArray();
}
var last = arr[^1];
double Window(double seconds)
{
for (int j = arr.Length - 2; j >= 0; j--)
{
if (last.elapsed - arr[j].elapsed >= seconds)
{
var first = arr[j];
double dt = last.elapsed - first.elapsed;
return dt > 0 ? (last.tick - first.tick) / dt : 0;
}
}
// Not enough history yet: report what we have.
var oldest = arr[0];
double dt0 = last.elapsed - oldest.elapsed;
return dt0 > 0 ? (last.tick - oldest.tick) / dt0 : 0;
}
return (arr.Length, Window(1), Window(5), Window(30), Window(60));
}
}
// Chunk events fire at high frequency under load (16 chunks/player/tick on
// the dedicated server). Doing disk I/O per event tanks server TPS. Default
// to counter-only; /fktest watchchunks toggles verbose disk logging on.
internal sealed class ChunkEventLogger : Listener
{
[EventHandler(Priority = EventPriority.Monitor)]
public void onChunkLoad(ChunkLoadEvent e)
{
FourKitTestPlugin.IncChunkLoad();
if (!FourKitTestPlugin.WatchChunks) return;
var chunk = e.getChunk();
FourKitTestPlugin.Log($"ChunkLoadEvent dim={chunk.getWorld().getDimensionId()} ({chunk.getX()},{chunk.getZ()}) new={e.isNewChunk()}");
}
@@ -119,6 +208,7 @@ internal sealed class ChunkEventLogger : Listener
public void onChunkUnload(ChunkUnloadEvent e)
{
FourKitTestPlugin.IncChunkUnload();
if (!FourKitTestPlugin.WatchChunks) return;
var chunk = e.getChunk();
FourKitTestPlugin.Log($"ChunkUnloadEvent dim={chunk.getWorld().getDimensionId()} ({chunk.getX()},{chunk.getZ()})");
}
@@ -136,7 +226,7 @@ internal sealed class TestExecutor : CommandExecutor
{
if (args.Length == 0)
{
Reply(sender,"Usage: /fktest <help|world|chunks|snapshot|entities|loadchunk|enderchest|disenchant|events|setblock|chatcolor>");
Reply(sender,"Usage: /fktest <help|world|chunks|snapshot|entities|loadchunk|enderchest|disenchant|events|setblock|chatcolor|tps|scatter|hookchunks|watchchunks>");
return true;
}
@@ -154,6 +244,22 @@ internal sealed class TestExecutor : CommandExecutor
case "disenchant": return TestDisenchant(sender);
case "setblock": return TestSetBlock(sender);
case "chatcolor": return TestChatColor(sender);
case "tps": return TestTps(sender);
case "scatter": return TestScatter(sender, args);
case "hookchunks":
if (FourKitTestPlugin.ChunkListenerHooked)
{
Reply(sender, "Chunk listener already hooked. EventDispatcher has no unregister, so this is one-way for the session.");
return true;
}
FourKit.addListener(new ChunkEventLogger());
FourKitTestPlugin.ChunkListenerHooked = true;
Reply(sender, "Chunk listener registered. HasHandlers fast-path now off; chunk events will dispatch.");
return true;
case "watchchunks":
FourKitTestPlugin.WatchChunks = !FourKitTestPlugin.WatchChunks;
Reply(sender, $"Verbose chunk-event disk logging {(FourKitTestPlugin.WatchChunks ? "ON" : "OFF")} (only effective once /fktest hookchunks is run)");
return true;
case "events":
Reply(sender,$"Chunk loads observed: {FourKitTestPlugin.ChunkLoadCount}");
Reply(sender,$"Chunk unloads observed: {FourKitTestPlugin.ChunkUnloadCount}");
@@ -184,6 +290,10 @@ internal sealed class TestExecutor : CommandExecutor
Reply(sender,"/fktest events - Show observed chunk-event counters");
Reply(sender,"/fktest setblock - Place wool 3 above head via setTypeIdAndData, read back");
Reply(sender,"/fktest chatcolor - Verify ChatColor parsing/strip/translate");
Reply(sender,"/fktest tps - Show server TPS over 1s/5s/30s/60s windows");
Reply(sender,"/fktest scatter [N] - Teleport every online player to a random point within +-N blocks (default 1500)");
Reply(sender,"/fktest hookchunks - Register the chunk listener (turns OFF the no-listener fast-path)");
Reply(sender,"/fktest watchchunks - Toggle per-event disk logging (off by default; expensive)");
}
private static Player? RequirePlayer(CommandSender sender)
@@ -193,6 +303,54 @@ internal sealed class TestExecutor : CommandExecutor
return null;
}
private static bool TestScatter(CommandSender sender, string[] args)
{
int range = 1500;
if (args.Length > 1 && int.TryParse(args[1], out int parsed) && parsed > 0)
range = parsed;
var world = FourKit.getWorld(0);
if (world == null)
{
Reply(sender, "Could not resolve overworld.");
return true;
}
var rng = new Random();
int count = 0;
foreach (var p in FourKit.getOnlinePlayers())
{
int x = rng.Next(-range, range + 1);
int z = rng.Next(-range, range + 1);
int y = world.getHighestBlockYAt(x, z) + 1;
try
{
p.teleport(new Location(world, x, y, z));
count++;
}
catch (Exception ex)
{
FourKitTestPlugin.Log($"scatter: failed to teleport {p.getName()}: {ex.Message}");
}
}
Reply(sender, $"Scattered {count} player(s) within ±{range} blocks.");
return true;
}
private static bool TestTps(CommandSender sender)
{
var (samples, t1, t5, t30, t60) = TpsProbe.Read();
if (samples < 2)
{
Reply(sender, $"TPS probe warming up ({samples}/2 samples). Try again in a few seconds.");
return true;
}
int tick = FourKit.getServerTick();
Reply(sender, $"TPS 1s={t1:F2} 5s={t5:F2} 30s={t30:F2} 60s={t60:F2}");
Reply(sender, $"server tick={tick} samples={samples}");
return true;
}
private static bool TestWorld(CommandSender sender)
{
var player = RequirePlayer(sender);

View File

@@ -20,6 +20,7 @@ Options:
import argparse
import logging
import math
import os
import random
import secrets
@@ -68,7 +69,9 @@ CIPHER_ON_PATTERN = (
b"\x00\x00"
)
CIPHER_KEY_CHANNEL = "MC|CKey"
CIPHER_ACK_CHANNEL = "MC|CAck"
CIPHER_ON_CHANNEL = "MC|COn"
IDENTITY_TOKEN_ISSUE = "MC|CTIssue"
IDENTITY_TOKEN_CHALLENGE = "MC|CTChallenge"
IDENTITY_TOKEN_RESPONSE = "MC|CTResponse"
@@ -130,14 +133,17 @@ class Stats:
# Movement packet builder
# ---------------------------------------------------------------------------
MOVE_PLAYER = 0x0D # MovePlayerPacket ID
MOVE_PLAYER = 0x0D # MovePlayerPacket::PosRot — what we send AND what server sends for teleports.
def build_move_player(x: float, y: float, z: float,
yaw: float, pitch: float, on_ground: bool) -> bytes:
# Wire order matches MovePlayerPacket::PosRot::write: x, y (feet),
# yView (eye), z, yaw, pitch, flags. Server kicks for IllegalStance
# if (yView - y) is outside [0.1, 1.65], so feet must come first.
dos = DataOutputStream()
dos.write_double(x)
dos.write_double(y + 1.62) # stance
dos.write_double(y)
dos.write_double(y + 1.62)
dos.write_double(z)
dos.write_float(yaw)
dos.write_float(pitch)
@@ -174,6 +180,13 @@ class StressBot:
self._identity_token = b""
self._entity_id = 0
self._running = True
# Server-tracked position. Initialized when server sends its first
# MovePlayer::PosRot teleport after login, and updated whenever the
# server teleports us (eg. plugin scatter, anti-cheat correction).
self._pos_x = 0.0
self._pos_y = 64.0
self._pos_z = 0.0
self._pos_initialized = False
def log(self, msg: str) -> None:
if not self.quiet:
@@ -250,8 +263,6 @@ class StressBot:
hold_end = time.time() + hold_time
last_keepalive = time.time()
keepalive_counter = 0
move_x, move_z = random.uniform(-50, 50), random.uniform(-50, 50)
move_y = 65.0
while time.time() < hold_end and self._running:
# Drain incoming data
@@ -276,13 +287,22 @@ class StressBot:
self.stats.keepalives_sent += 1
last_keepalive = now
# Movement packets every 50ms
if self.send_moves:
move_x += random.uniform(-0.5, 0.5)
move_z += random.uniform(-0.5, 0.5)
# Movement packets every 50ms. We can't do real travel because
# the server's anti-cheat compares our claimed position against
# what its own physics computes, and we don't simulate collision
# or gravity. Instead we drift ±0.3 blocks from whatever
# position the server most recently teleported us to. To spread
# bots out, use the test plugin's /fktest scatter from in-game.
if self.send_moves and self._pos_initialized:
new_x = self._pos_x + random.uniform(-0.3, 0.3)
new_z = self._pos_z + random.uniform(-0.3, 0.3)
yaw = random.uniform(0, 360)
self._send_packet(MOVE_PLAYER,
build_move_player(move_x, move_y, move_z, yaw, 0.0, True))
build_move_player(new_x, self._pos_y, new_z, yaw, 0.0, True))
# Optimistically update; server will correct us via PosRot
# if it disagreed (eg. we drifted into a block).
self._pos_x = new_x
self._pos_z = new_z
with self.stats.lock:
self.stats.moves_sent += 1
time.sleep(0.05)
@@ -294,11 +314,23 @@ class StressBot:
return True
def _do_cipher_scan(self) -> None:
"""Scan for cipher handshake for up to 3 seconds."""
"""Wait for the cipher handshake to finish or up to ~4s.
Returns early once both keys are exchanged. The upper bound has to
cover the worst case where a stack of plaintext setup packets
(level info, scoreboard, initial chunks) sits in front of MC|CKey
in the recv buffer. The server's own cipher-handshake grace is
100 ticks (~5s).
"""
scan_start = time.time()
scan_buf = bytearray()
while time.time() - scan_start < 0.5 and self._running:
while time.time() - scan_start < 4.0 and self._running:
# _handle_custom_payload may have already activated cipher via
# the drain path inside _read_until_packet.
if self._cipher_key and self._recv_cipher:
return
try:
chunk = self._sock.recv(65536)
if not chunk:
@@ -393,6 +425,14 @@ class StressBot:
pass
return True
# Handle cipher handshake during the login wait. With
# require-secure-client, the server holds the Login response
# behind the security gate until cipher activates, so the
# gate-bypass MC|CKey/MC|COn frames arrive before LOGIN.
# Dropping them here would deadlock both sides.
elif packet_id == CUSTOM_PAYLOAD:
self._handle_custom_payload(data)
elif packet_id == DISCONNECT:
try:
dis = DataInputStream(data)
@@ -428,16 +468,51 @@ class StressBot:
# Handle identity tokens
if packet_id == CUSTOM_PAYLOAD:
self._handle_custom_payload(data)
elif packet_id == MOVE_PLAYER:
self._handle_server_move(data)
def _handle_server_move(self, data: bytes) -> None:
"""Track server's view of our position. PosRot format:
double x, double y, double yView, double z, float yRot, float xRot, byte flags."""
try:
dis = DataInputStream(data)
x = dis.read_double()
y = dis.read_double()
_yView = dis.read_double()
z = dis.read_double()
self._pos_x = x
self._pos_y = y
self._pos_z = z
self._pos_initialized = True
except Exception:
pass
def _handle_custom_payload(self, data: bytes) -> None:
"""Handle identity token packets."""
"""Handle cipher handshake and identity token channels."""
try:
dis = DataInputStream(data)
channel = dis.read_utf()
length = dis.read_short()
payload = dis.read_raw(length) if length > 0 else b""
if channel == IDENTITY_TOKEN_ISSUE and len(payload) == 32:
# Cipher channels arrive in plaintext before encryption is active.
# Handle them here so the bot survives bursts where the whole
# handshake frame lands in a single recv(), bypassing the leftover
# byte-pattern scan.
if channel == CIPHER_KEY_CHANNEL and len(payload) == 32 and not self._cipher_key:
self._cipher_key = payload[:16]
self._cipher_iv = payload[16:32]
self.log(f"[{self.name}] got cipher key")
self._send_packet(CUSTOM_PAYLOAD,
build_custom_payload(CIPHER_ACK_CHANNEL))
iv_send = bytearray(self._cipher_iv)
iv_send[0] ^= 0x80
self._send_cipher = CipherState(self._cipher_key, bytes(iv_send))
elif channel == CIPHER_ON_CHANNEL:
if self._cipher_key and not self._recv_cipher:
self._recv_cipher = CipherState(self._cipher_key, self._cipher_iv)
self.log(f"[{self.name}] cipher active")
elif channel == IDENTITY_TOKEN_ISSUE and len(payload) == 32:
self._identity_token = payload
self.log(f"[{self.name}] got identity token")
elif channel == IDENTITY_TOKEN_CHALLENGE:

View File

@@ -0,0 +1,14 @@
@echo off
REM FourKit chunk + move event stress: 50 concurrent moving bots held for 1-2min
REM each, exercising FireChunkLoad / FireChunkUnload / FirePlayerMove. Validates
REM the HasHandlers fast-path and Server GC at the 50-player target.
REM
REM Set require-secure-client=false in server.properties before running. The
REM 100-tick cipher handshake grace cannot keep up with 50 simultaneous bot
REM joins, which is unrelated to what this test is measuring.
set /p HOST="Server IP [127.0.0.1]: " || set HOST=127.0.0.1
set /p PORT="Server Port [25565]: " || set PORT=25565
if "%HOST%"=="" set HOST=127.0.0.1
if "%PORT%"=="" set PORT=25565
python "%~dp0stress_test.py" %HOST% %PORT% --bots 50 --burst 10 --move --hold 60 120 --ramp 0.5 --duration 600 --cycles 0 --quiet
pause