Fix POSIX LAN disconnect handling

This commit is contained in:
Merval
2026-05-07 20:18:07 -07:00
parent aad5c68b7e
commit b0bbb51e80
2 changed files with 83 additions and 40 deletions

View File

@@ -683,6 +683,8 @@ void WinsockNetLayer::HandleDataReceived(uint8_t fromSmallId, uint8_t toSmallId,
pthread_mutex_lock(&s_earlyDataLock);
s_earlyDataBuffers[fromSmallId].insert(
s_earlyDataBuffers[fromSmallId].end(), data, data + dataSize);
app.DebugPrintf("POSIX LAN: Buffered %u early bytes for smallId=%d (total=%d)\n",
dataSize, fromSmallId, (int)s_earlyDataBuffers[fromSmallId].size());
pthread_mutex_unlock(&s_earlyDataLock);
}
return;
@@ -698,6 +700,8 @@ void WinsockNetLayer::HandleDataReceived(uint8_t fromSmallId, uint8_t toSmallId,
pthread_mutex_lock(&s_earlyDataLock);
s_earlyDataBuffers[fromSmallId].insert(
s_earlyDataBuffers[fromSmallId].end(), data, data + dataSize);
app.DebugPrintf("POSIX LAN: Buffered %u bytes waiting for socket smallId=%d (total=%d)\n",
dataSize, fromSmallId, (int)s_earlyDataBuffers[fromSmallId].size());
pthread_mutex_unlock(&s_earlyDataLock);
}
}
@@ -722,6 +726,8 @@ void WinsockNetLayer::FlushPendingData()
::Socket *pSocket = pPlayer->GetSocket();
if (pSocket == NULL) continue;
app.DebugPrintf("POSIX LAN: Flushing %d early bytes for smallId=%d\n",
(int)s_earlyDataBuffers[i].size(), (int)i);
pSocket->pushDataToQueue(s_earlyDataBuffers[i].data(),
(DWORD)s_earlyDataBuffers[i].size(), false);
s_earlyDataBuffers[i].clear();
@@ -796,15 +802,6 @@ void* WinsockNetLayer::AcceptThreadProc(void* /*param*/)
continue;
}
uint8_t assignBuf[1] = { assignedSmallId };
ssize_t sent = send(clientSocket, (const char *)assignBuf, 1, MSG_NOSIGNAL);
if (sent != 1)
{
app.DebugPrintf("Failed to send small ID to client\n");
close(clientSocket);
continue;
}
int flags = fcntl(clientSocket, F_GETFL, 0);
fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK);
@@ -842,7 +839,17 @@ void* WinsockNetLayer::AcceptThreadProc(void* /*param*/)
pthread_mutex_lock(&s_pendingJoinLock);
s_pendingJoinSmallIds.push_back(assignedSmallId);
app.DebugPrintf("POSIX LAN: Queued pending join for smallId=%d\n", assignedSmallId);
pthread_mutex_unlock(&s_pendingJoinLock);
uint8_t assignBuf[1] = { assignedSmallId };
ssize_t sent = send(clientSocket, (const char *)assignBuf, 1, MSG_NOSIGNAL);
if (sent != 1)
{
app.DebugPrintf("POSIX LAN: Failed to send small ID to client smallId=%d\n", assignedSmallId);
MarkConnectionDisconnected(assignedSmallId);
continue;
}
}
return NULL;
}
@@ -863,7 +870,11 @@ bool WinsockNetLayer::ProcessRecvData(Win64RemoteConnection &conn)
((uint32_t)conn.recvBuffer[3]);
if (packetSize <= 0 || (unsigned int)packetSize > WIN64_NET_MAX_PACKET_SIZE)
{
app.DebugPrintf("POSIX LAN: Invalid packet size %d from smallId=%d\n",
packetSize, conn.smallId);
return false;
}
conn.currentPacketSize = packetSize;
conn.readingHeader = false;
@@ -889,6 +900,50 @@ bool WinsockNetLayer::ProcessRecvData(Win64RemoteConnection &conn)
return true;
}
void WinsockNetLayer::MarkConnectionDisconnected(uint8_t smallId)
{
bool shouldNotify = false;
pthread_mutex_lock(&s_connectionsLock);
if ((size_t)smallId < s_connections.size())
{
Win64RemoteConnection &conn = s_connections[smallId];
shouldNotify = conn.active;
conn.active = false;
if (conn.tcpSocket != INVALID_SOCKET)
{
if (s_epollFd >= 0)
epoll_ctl(s_epollFd, EPOLL_CTL_DEL, conn.tcpSocket, NULL);
shutdown(conn.tcpSocket, SHUT_RDWR);
close(conn.tcpSocket);
conn.tcpSocket = INVALID_SOCKET;
}
pthread_mutex_lock(&conn.sendBufLock);
conn.sendBufferUsed = 0;
pthread_mutex_unlock(&conn.sendBufLock);
conn.recvBufferUsed = 0;
conn.currentPacketSize = -1;
conn.readingHeader = true;
}
pthread_mutex_unlock(&s_connectionsLock);
pthread_mutex_lock(&s_earlyDataLock);
if ((size_t)smallId < s_earlyDataBuffers.size())
s_earlyDataBuffers[smallId].clear();
pthread_mutex_unlock(&s_earlyDataLock);
if (shouldNotify)
{
pthread_mutex_lock(&s_disconnectLock);
s_disconnectedSmallIds.push_back(smallId);
pthread_mutex_unlock(&s_disconnectLock);
app.DebugPrintf("POSIX LAN: Queued disconnect for smallId=%d\n", smallId);
}
}
void* WinsockNetLayer::EpollThreadProc(void* /*param*/)
{
struct epoll_event events[WIN64_NET_EPOLL_MAX_EVENTS];
@@ -915,14 +970,7 @@ void* WinsockNetLayer::EpollThreadProc(void* /*param*/)
if (events[i].events & (EPOLLERR | EPOLLHUP))
{
conn.active = false;
epoll_ctl(s_epollFd, EPOLL_CTL_DEL, conn.tcpSocket, NULL);
close(conn.tcpSocket);
conn.tcpSocket = INVALID_SOCKET;
pthread_mutex_lock(&s_disconnectLock);
s_disconnectedSmallIds.push_back(smallId);
pthread_mutex_unlock(&s_disconnectLock);
MarkConnectionDisconnected(smallId);
continue;
}
@@ -940,6 +988,7 @@ void* WinsockNetLayer::EpollThreadProc(void* /*param*/)
uint8_t *newBuf = (uint8_t *)realloc(conn.recvBuffer, newSize);
if (!newBuf)
{
app.DebugPrintf("POSIX LAN: Failed to grow receive buffer for smallId=%d\n", smallId);
disconnected = true;
break;
}
@@ -974,14 +1023,7 @@ void* WinsockNetLayer::EpollThreadProc(void* /*param*/)
if (disconnected)
{
conn.active = false;
epoll_ctl(s_epollFd, EPOLL_CTL_DEL, conn.tcpSocket, NULL);
close(conn.tcpSocket);
conn.tcpSocket = INVALID_SOCKET;
pthread_mutex_lock(&s_disconnectLock);
s_disconnectedSmallIds.push_back(smallId);
pthread_mutex_unlock(&s_disconnectLock);
MarkConnectionDisconnected(smallId);
}
}
}
@@ -998,6 +1040,7 @@ void WinsockNetLayer::FlushSendBuffers()
if (!conn.active || conn.tcpSocket == INVALID_SOCKET)
continue;
bool disconnectAfterSendError = false;
pthread_mutex_lock(&conn.sendBufLock);
if (conn.sendBufferUsed > 0)
{
@@ -1007,7 +1050,11 @@ void WinsockNetLayer::FlushSendBuffers()
ssize_t sent = send(conn.tcpSocket, (const char *)conn.sendBuffer + totalSent,
conn.sendBufferUsed - totalSent, MSG_NOSIGNAL);
if (sent <= 0)
{
app.DebugPrintf("POSIX LAN: Send failed for smallId=%d errno=%d\n", (int)i, errno);
disconnectAfterSendError = true;
break;
}
totalSent += (int)sent;
}
if (totalSent < conn.sendBufferUsed && totalSent > 0)
@@ -1021,6 +1068,9 @@ void WinsockNetLayer::FlushSendBuffers()
}
}
pthread_mutex_unlock(&conn.sendBufLock);
if (disconnectAfterSendError)
MarkConnectionDisconnected((uint8_t)i);
}
pthread_mutex_unlock(&s_connectionsLock);
}
@@ -1036,6 +1086,8 @@ bool WinsockNetLayer::PopDisconnectedSmallId(uint8_t *outSmallId)
found = true;
}
pthread_mutex_unlock(&s_disconnectLock);
if (found)
app.DebugPrintf("POSIX LAN: Popped disconnect for smallId=%d\n", *outSmallId);
return found;
}
@@ -1044,6 +1096,7 @@ void WinsockNetLayer::PushFreeSmallId(uint8_t smallId)
pthread_mutex_lock(&s_freeSmallIdLock);
s_freeSmallIds.push_back(smallId);
pthread_mutex_unlock(&s_freeSmallIdLock);
app.DebugPrintf("POSIX LAN: Returned smallId=%d to free list\n", smallId);
}
bool WinsockNetLayer::PopPendingJoinSmallId(uint8_t *outSmallId)
@@ -1057,6 +1110,8 @@ bool WinsockNetLayer::PopPendingJoinSmallId(uint8_t *outSmallId)
found = true;
}
pthread_mutex_unlock(&s_pendingJoinLock);
if (found)
app.DebugPrintf("POSIX LAN: Popped pending join for smallId=%d\n", *outSmallId);
return found;
}
@@ -1068,21 +1123,8 @@ bool WinsockNetLayer::IsSmallIdConnected(uint8_t smallId)
void WinsockNetLayer::CloseConnectionBySmallId(uint8_t smallId)
{
pthread_mutex_lock(&s_connectionsLock);
if ((size_t)smallId < s_connections.size() && s_connections[smallId].active && s_connections[smallId].tcpSocket != INVALID_SOCKET)
{
epoll_ctl(s_epollFd, EPOLL_CTL_DEL, s_connections[smallId].tcpSocket, NULL);
shutdown(s_connections[smallId].tcpSocket, SHUT_RDWR);
close(s_connections[smallId].tcpSocket);
s_connections[smallId].tcpSocket = INVALID_SOCKET;
app.DebugPrintf("POSIX LAN: Force-closed TCP connection for smallId=%d\n", smallId);
}
pthread_mutex_unlock(&s_connectionsLock);
pthread_mutex_lock(&s_earlyDataLock);
if ((size_t)smallId < s_earlyDataBuffers.size())
s_earlyDataBuffers[smallId].clear();
pthread_mutex_unlock(&s_earlyDataLock);
MarkConnectionDisconnected(smallId);
app.DebugPrintf("POSIX LAN: Force-closed TCP connection for smallId=%d\n", smallId);
}
void* WinsockNetLayer::ClientRecvThreadProc(void* /*param*/)