Simplifying the server code
I wasn’t too happy with the results from the last post, also there’s a lot of magic going on with uvw/libuv. I’m not a fan of magic. So I ripped it all out and decided to go back to basics (sort of). I wanted to try a design where we have two threads, one listening for network packets, the other the main server loop, then some synchronization primitive in between.
What I came up with is a simple std::thread
running a lambda of reading from a bind()
UDP socket calling recvfrom()
in a while loop. Easy peasy. I got some inspiration from this dope writeup Achieving reliable UDP transmission at 10 Gb/s using BSD socket for data acquisition systems.
Ok so now I have a thread dedicated to grabbing network packets, how do I get them to the main server loop?
Lock Free Queues
I started reading this great book, C++ High Performance and it had a section on coroutines (which I was looking at) and had a small blurb on Lock Free Queues, which I totally forgot about. I come from Go so we use channels to send messages if we can avoid mutexes. However in C++ I don’t have my beautiful channels, so I’ll settle for a LFQ instead!
How do they work? Well let’s look at the code from the book (I slightly changed it to meet my coding guidelines which are similar to Unreal C++):
template <class T, size_t N>
class LockFreeQueue
{
std::array<T, N> Buffer{};
std::atomic<size_t> QueueSize{0};
size_t ReadPos{0};
size_t WritePos{0};
static_assert(std::atomic<size_t>::is_always_lock_free);
bool DoPush(T&& t)
{
if (QueueSize.load() == N)
{
return false;
}
Buffer[WritePos] = std::forward<decltype(t)>(t);
WritePos = (WritePos + 1) % N;
QueueSize.fetch_add(1);
return true;
}
public:
// Writer Thread
bool Push(T&& t) { return DoPush(std::move(t)); };
bool Push(const T& t) { return DoPush(t); };
// Reader Thread
auto Pop() -> std::optional<T>
{
auto val = std::optional<T>{};
if (QueueSize.load() > 0)
{
val = std::move(Buffer[ReadPos]);
ReadPos = (ReadPos + 1) % N;
QueueSize.fetch_sub(1);
}
return val;
}
// Both allowed to access (atomic)
auto Size() const noexcept { return QueueSize.load(); }
};
Basically, the way this works is that you can ONLY have one reader and one writer. NOT TWO, NOT THREE, but 1:1. The whole trick is that the reader only touches the ReadPos
and an index into the array/queue, whereas the writer only touches the WritePos
and the next index into the array/queue. That’s pretty much it! This way they are never accessing the same exact entry into the queue so they can happily read/write to it. All with no locking / mutex primitives to slow down or cause contention. It’s basically a ring buffer (see the ({Read,Write}Pos +1) % N
?) So we’ll overwrite old entries with new, setting a large enough queue size and we should be fine per-server loop tick.
There’s still a ton I don’t understand about this code like, std::forward<decltype(t)>(t);
what the hell is that? I have a lot to learn about C++.
Here’s the network thread code, pretty simple, just bind, loop, read packets and push them onto our LockFreeQueue:
LockFreeQueue<char *, 1000000> queue;
std::thread networkThread([&queue](){
net::Socket socket("127.0.0.1", 4242);
auto ret = socket.Bind();
if (ret != 0)
{
fmt::print("exiting due to Bind failure\n");
exit(ret);
}
static int BUFFERSIZE{4};
char buffer[BUFFERSIZE];
while (1)
{
struct sockaddr remoteAddr;
std::memset((char *)&remoteAddr, 0, sizeof(remoteAddr));
socket.Receive(remoteAddr, &buffer, BUFFERSIZE);
queue.Push(buffer);
}
});
How about the server loop? Much simpler now!
LockFreeQueue<char *, 1000000> queue;
while (1)
{
const auto tickStart = std::chrono::steady_clock::now();
fmt::print("server time: {} tick at: {}\n", serverTime, serverTick);
auto packet = queue.Pop();
size_t count = 0;
for (;packet != std::nullopt; packet = queue.Pop())
{
count++;
}
fmt::print("processed {} packets!\n", count);
const auto iterationTime = std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - tickStart);
fmt::print("iteration took: {}\n", iterationTime);
const auto sleepTime = std::clamp(serverTick - iterationTime, std::chrono::duration<double, std::milli>(0.f), serverTick);
fmt::print("sleeping for: {}\n", sleepTime);
std::this_thread::sleep_for(sleepTime);
serverTime += serverTick;
}
Same basic loop, but we are popping the network messages from our queue, iterating over them just to increment a counter.
Here’s the results :gasp:
server time: 966.667ms tick at: 33.3333ms
processed 45569 packets!
iteration took: 1.09584ms
sleeping for: 32.2375ms
server time: 1000ms tick at: 33.3333ms
processed 43478 packets!
iteration took: 1.01342ms
sleeping for: 32.3199ms
server time: 1033.33ms tick at: 33.3333ms
processed 42114 packets!
iteration took: 1.04946ms
sleeping for: 32.2839ms
server time: 1066.67ms tick at: 33.3333ms
processed 44454 packets!
iteration took: 1.1003ms
sleeping for: 32.233ms
Look at that, 44k packets iterated over in 1ms. That leaves our server 32ms out of 33ms to do it’s game play calculations, neato!
OK so I am only sending 4 bytes, so my next trick will be bumping the buffer to 600-1024 bytes, and also implementing ChaCha20-Poly1305 encryption with verification of packets before I enqueue them into the LFQ. So yeah, look forward to doing that over the next two days or so!
Peace.