diff --git a/Linux/PosixNetLayer.cpp b/Linux/PosixNetLayer.cpp index cb9c74b..d7e685c 100644 --- a/Linux/PosixNetLayer.cpp +++ b/Linux/PosixNetLayer.cpp @@ -683,6 +683,8 @@ void WinsockNetLayer::HandleDataReceived(uint8_t fromSmallId, uint8_t toSmallId, pthread_mutex_lock(&s_earlyDataLock); s_earlyDataBuffers[fromSmallId].insert( s_earlyDataBuffers[fromSmallId].end(), data, data + dataSize); + app.DebugPrintf("POSIX LAN: Buffered %u early bytes for smallId=%d (total=%d)\n", + dataSize, fromSmallId, (int)s_earlyDataBuffers[fromSmallId].size()); pthread_mutex_unlock(&s_earlyDataLock); } return; @@ -698,6 +700,8 @@ void WinsockNetLayer::HandleDataReceived(uint8_t fromSmallId, uint8_t toSmallId, pthread_mutex_lock(&s_earlyDataLock); s_earlyDataBuffers[fromSmallId].insert( s_earlyDataBuffers[fromSmallId].end(), data, data + dataSize); + app.DebugPrintf("POSIX LAN: Buffered %u bytes waiting for socket smallId=%d (total=%d)\n", + dataSize, fromSmallId, (int)s_earlyDataBuffers[fromSmallId].size()); pthread_mutex_unlock(&s_earlyDataLock); } } @@ -722,6 +726,8 @@ void WinsockNetLayer::FlushPendingData() ::Socket *pSocket = pPlayer->GetSocket(); if (pSocket == NULL) continue; + app.DebugPrintf("POSIX LAN: Flushing %d early bytes for smallId=%d\n", + (int)s_earlyDataBuffers[i].size(), (int)i); pSocket->pushDataToQueue(s_earlyDataBuffers[i].data(), (DWORD)s_earlyDataBuffers[i].size(), false); s_earlyDataBuffers[i].clear(); @@ -796,15 +802,6 @@ void* WinsockNetLayer::AcceptThreadProc(void* /*param*/) continue; } - uint8_t assignBuf[1] = { assignedSmallId }; - ssize_t sent = send(clientSocket, (const char *)assignBuf, 1, MSG_NOSIGNAL); - if (sent != 1) - { - app.DebugPrintf("Failed to send small ID to client\n"); - close(clientSocket); - continue; - } - int flags = fcntl(clientSocket, F_GETFL, 0); fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK); @@ -842,7 +839,17 @@ void* WinsockNetLayer::AcceptThreadProc(void* /*param*/) pthread_mutex_lock(&s_pendingJoinLock); s_pendingJoinSmallIds.push_back(assignedSmallId); + app.DebugPrintf("POSIX LAN: Queued pending join for smallId=%d\n", assignedSmallId); pthread_mutex_unlock(&s_pendingJoinLock); + + uint8_t assignBuf[1] = { assignedSmallId }; + ssize_t sent = send(clientSocket, (const char *)assignBuf, 1, MSG_NOSIGNAL); + if (sent != 1) + { + app.DebugPrintf("POSIX LAN: Failed to send small ID to client smallId=%d\n", assignedSmallId); + MarkConnectionDisconnected(assignedSmallId); + continue; + } } return NULL; } @@ -863,7 +870,11 @@ bool WinsockNetLayer::ProcessRecvData(Win64RemoteConnection &conn) ((uint32_t)conn.recvBuffer[3]); if (packetSize <= 0 || (unsigned int)packetSize > WIN64_NET_MAX_PACKET_SIZE) + { + app.DebugPrintf("POSIX LAN: Invalid packet size %d from smallId=%d\n", + packetSize, conn.smallId); return false; + } conn.currentPacketSize = packetSize; conn.readingHeader = false; @@ -889,6 +900,50 @@ bool WinsockNetLayer::ProcessRecvData(Win64RemoteConnection &conn) return true; } +void WinsockNetLayer::MarkConnectionDisconnected(uint8_t smallId) +{ + bool shouldNotify = false; + + pthread_mutex_lock(&s_connectionsLock); + if ((size_t)smallId < s_connections.size()) + { + Win64RemoteConnection &conn = s_connections[smallId]; + shouldNotify = conn.active; + conn.active = false; + + if (conn.tcpSocket != INVALID_SOCKET) + { + if (s_epollFd >= 0) + epoll_ctl(s_epollFd, EPOLL_CTL_DEL, conn.tcpSocket, NULL); + shutdown(conn.tcpSocket, SHUT_RDWR); + close(conn.tcpSocket); + conn.tcpSocket = INVALID_SOCKET; + } + + pthread_mutex_lock(&conn.sendBufLock); + conn.sendBufferUsed = 0; + pthread_mutex_unlock(&conn.sendBufLock); + + conn.recvBufferUsed = 0; + conn.currentPacketSize = -1; + conn.readingHeader = true; + } + pthread_mutex_unlock(&s_connectionsLock); + + pthread_mutex_lock(&s_earlyDataLock); + if ((size_t)smallId < s_earlyDataBuffers.size()) + s_earlyDataBuffers[smallId].clear(); + pthread_mutex_unlock(&s_earlyDataLock); + + if (shouldNotify) + { + pthread_mutex_lock(&s_disconnectLock); + s_disconnectedSmallIds.push_back(smallId); + pthread_mutex_unlock(&s_disconnectLock); + app.DebugPrintf("POSIX LAN: Queued disconnect for smallId=%d\n", smallId); + } +} + void* WinsockNetLayer::EpollThreadProc(void* /*param*/) { struct epoll_event events[WIN64_NET_EPOLL_MAX_EVENTS]; @@ -915,14 +970,7 @@ void* WinsockNetLayer::EpollThreadProc(void* /*param*/) if (events[i].events & (EPOLLERR | EPOLLHUP)) { - conn.active = false; - epoll_ctl(s_epollFd, EPOLL_CTL_DEL, conn.tcpSocket, NULL); - close(conn.tcpSocket); - conn.tcpSocket = INVALID_SOCKET; - - pthread_mutex_lock(&s_disconnectLock); - s_disconnectedSmallIds.push_back(smallId); - pthread_mutex_unlock(&s_disconnectLock); + MarkConnectionDisconnected(smallId); continue; } @@ -940,6 +988,7 @@ void* WinsockNetLayer::EpollThreadProc(void* /*param*/) uint8_t *newBuf = (uint8_t *)realloc(conn.recvBuffer, newSize); if (!newBuf) { + app.DebugPrintf("POSIX LAN: Failed to grow receive buffer for smallId=%d\n", smallId); disconnected = true; break; } @@ -974,14 +1023,7 @@ void* WinsockNetLayer::EpollThreadProc(void* /*param*/) if (disconnected) { - conn.active = false; - epoll_ctl(s_epollFd, EPOLL_CTL_DEL, conn.tcpSocket, NULL); - close(conn.tcpSocket); - conn.tcpSocket = INVALID_SOCKET; - - pthread_mutex_lock(&s_disconnectLock); - s_disconnectedSmallIds.push_back(smallId); - pthread_mutex_unlock(&s_disconnectLock); + MarkConnectionDisconnected(smallId); } } } @@ -998,6 +1040,7 @@ void WinsockNetLayer::FlushSendBuffers() if (!conn.active || conn.tcpSocket == INVALID_SOCKET) continue; + bool disconnectAfterSendError = false; pthread_mutex_lock(&conn.sendBufLock); if (conn.sendBufferUsed > 0) { @@ -1007,7 +1050,11 @@ void WinsockNetLayer::FlushSendBuffers() ssize_t sent = send(conn.tcpSocket, (const char *)conn.sendBuffer + totalSent, conn.sendBufferUsed - totalSent, MSG_NOSIGNAL); if (sent <= 0) + { + app.DebugPrintf("POSIX LAN: Send failed for smallId=%d errno=%d\n", (int)i, errno); + disconnectAfterSendError = true; break; + } totalSent += (int)sent; } if (totalSent < conn.sendBufferUsed && totalSent > 0) @@ -1021,6 +1068,9 @@ void WinsockNetLayer::FlushSendBuffers() } } pthread_mutex_unlock(&conn.sendBufLock); + + if (disconnectAfterSendError) + MarkConnectionDisconnected((uint8_t)i); } pthread_mutex_unlock(&s_connectionsLock); } @@ -1036,6 +1086,8 @@ bool WinsockNetLayer::PopDisconnectedSmallId(uint8_t *outSmallId) found = true; } pthread_mutex_unlock(&s_disconnectLock); + if (found) + app.DebugPrintf("POSIX LAN: Popped disconnect for smallId=%d\n", *outSmallId); return found; } @@ -1044,6 +1096,7 @@ void WinsockNetLayer::PushFreeSmallId(uint8_t smallId) pthread_mutex_lock(&s_freeSmallIdLock); s_freeSmallIds.push_back(smallId); pthread_mutex_unlock(&s_freeSmallIdLock); + app.DebugPrintf("POSIX LAN: Returned smallId=%d to free list\n", smallId); } bool WinsockNetLayer::PopPendingJoinSmallId(uint8_t *outSmallId) @@ -1057,6 +1110,8 @@ bool WinsockNetLayer::PopPendingJoinSmallId(uint8_t *outSmallId) found = true; } pthread_mutex_unlock(&s_pendingJoinLock); + if (found) + app.DebugPrintf("POSIX LAN: Popped pending join for smallId=%d\n", *outSmallId); return found; } @@ -1068,21 +1123,8 @@ bool WinsockNetLayer::IsSmallIdConnected(uint8_t smallId) void WinsockNetLayer::CloseConnectionBySmallId(uint8_t smallId) { - pthread_mutex_lock(&s_connectionsLock); - if ((size_t)smallId < s_connections.size() && s_connections[smallId].active && s_connections[smallId].tcpSocket != INVALID_SOCKET) - { - epoll_ctl(s_epollFd, EPOLL_CTL_DEL, s_connections[smallId].tcpSocket, NULL); - shutdown(s_connections[smallId].tcpSocket, SHUT_RDWR); - close(s_connections[smallId].tcpSocket); - s_connections[smallId].tcpSocket = INVALID_SOCKET; - app.DebugPrintf("POSIX LAN: Force-closed TCP connection for smallId=%d\n", smallId); - } - pthread_mutex_unlock(&s_connectionsLock); - - pthread_mutex_lock(&s_earlyDataLock); - if ((size_t)smallId < s_earlyDataBuffers.size()) - s_earlyDataBuffers[smallId].clear(); - pthread_mutex_unlock(&s_earlyDataLock); + MarkConnectionDisconnected(smallId); + app.DebugPrintf("POSIX LAN: Force-closed TCP connection for smallId=%d\n", smallId); } void* WinsockNetLayer::ClientRecvThreadProc(void* /*param*/) diff --git a/Linux/PosixNetLayer.h b/Linux/PosixNetLayer.h index 0073d42..1dcfbdb 100644 --- a/Linux/PosixNetLayer.h +++ b/Linux/PosixNetLayer.h @@ -154,6 +154,7 @@ private: static void* DiscoveryThreadProc(void* param); static void* AsyncJoinThreadProc(void* param); static bool ProcessRecvData(Win64RemoteConnection &conn); + static void MarkConnectionDisconnected(uint8_t smallId); static SOCKET s_listenSocket; static SOCKET s_hostConnectionSocket;