40k packets

Published by

on

Simplifying the server code

I wasn’t too happy with the results from the last post, also there’s a lot of magic going on with uvw/libuv. I’m not a fan of magic. So I ripped it all out and decided to go back to basics (sort of). I wanted to try a design where we have two threads, one listening for network packets, the other the main server loop, then some synchronization primitive in between.

What I came up with is a simple std::thread running a lambda of reading from a bind() UDP socket calling recvfrom() in a while loop. Easy peasy. I got some inspiration from this dope writeup Achieving reliable UDP transmission at 10 Gb/s using BSD socket for data acquisition systems.

Ok so now I have a thread dedicated to grabbing network packets, how do I get them to the main server loop?

Lock Free Queues

I started reading this great book, C++ High Performance and it had a section on coroutines (which I was looking at) and had a small blurb on Lock Free Queues, which I totally forgot about. I come from Go so we use channels to send messages if we can avoid mutexes. However in C++ I don’t have my beautiful channels, so I’ll settle for a LFQ instead!

How do they work? Well let’s look at the code from the book (I slightly changed it to meet my coding guidelines which are similar to Unreal C++):

template <class T, size_t N>

class LockFreeQueue
{
    std::array<T, N> Buffer{};
    std::atomic<size_t> QueueSize{0};
    size_t ReadPos{0};
    size_t WritePos{0};
    static_assert(std::atomic<size_t>::is_always_lock_free);
    bool DoPush(T&& t)
    {
        if (QueueSize.load() == N)
        {
            return false;
        }
        Buffer[WritePos] = std::forward<decltype(t)>(t);
  
        WritePos = (WritePos + 1) % N;
        QueueSize.fetch_add(1);
        return true;
    }
public:
    // Writer Thread
    bool Push(T&& t) { return DoPush(std::move(t)); };
    bool Push(const T& t) { return DoPush(t); };
    // Reader Thread
    auto Pop() -> std::optional<T>
    {
        auto val = std::optional<T>{};
        if (QueueSize.load() > 0)
        {
            val = std::move(Buffer[ReadPos]);
            ReadPos = (ReadPos + 1) % N;
            QueueSize.fetch_sub(1);
        }
        return val;
    }

    // Both allowed to access (atomic)
    auto Size() const noexcept { return QueueSize.load(); }
};

Basically, the way this works is that you can ONLY have one reader and one writer. NOT TWO, NOT THREE, but 1:1. The whole trick is that the reader only touches the ReadPos and an index into the array/queue, whereas the writer only touches the WritePos and the next index into the array/queue. That’s pretty much it! This way they are never accessing the same exact entry into the queue so they can happily read/write to it. All with no locking / mutex primitives to slow down or cause contention. It’s basically a ring buffer (see the ({Read,Write}Pos +1) % N?) So we’ll overwrite old entries with new, setting a large enough queue size and we should be fine per-server loop tick.

There’s still a ton I don’t understand about this code like, std::forward<decltype(t)>(t); what the hell is that? I have a lot to learn about C++.

Here’s the network thread code, pretty simple, just bind, loop, read packets and push them onto our LockFreeQueue:

LockFreeQueue<char *, 1000000> queue;

std::thread networkThread([&queue](){
	net::Socket socket("127.0.0.1", 4242);
	auto ret = socket.Bind();
	if (ret != 0)
	{
		fmt::print("exiting due to Bind failure\n");
		exit(ret);
	}
	
	static int BUFFERSIZE{4};
	char buffer[BUFFERSIZE];
	
	while (1)
	{
		struct sockaddr remoteAddr;
		std::memset((char *)&remoteAddr, 0, sizeof(remoteAddr));
		socket.Receive(remoteAddr, &buffer, BUFFERSIZE);
		queue.Push(buffer);
	}
});

How about the server loop? Much simpler now!

LockFreeQueue<char *, 1000000> queue;

while (1)
{
	const auto tickStart = std::chrono::steady_clock::now();
	fmt::print("server time: {} tick at: {}\n", serverTime, serverTick);
	auto packet = queue.Pop();
	size_t count = 0;
	for (;packet != std::nullopt; packet = queue.Pop())
	{
		count++;
	}

	fmt::print("processed {} packets!\n", count);
	const auto iterationTime = std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - tickStart);

	fmt::print("iteration took: {}\n", iterationTime);
	const auto sleepTime = std::clamp(serverTick - iterationTime, std::chrono::duration<double, std::milli>(0.f), serverTick);
	fmt::print("sleeping for: {}\n", sleepTime);
	std::this_thread::sleep_for(sleepTime);

	serverTime += serverTick;
}

Same basic loop, but we are popping the network messages from our queue, iterating over them just to increment a counter.

Here’s the results :gasp:

server time: 966.667ms tick at: 33.3333ms
processed 45569 packets!
iteration took: 1.09584ms
sleeping for: 32.2375ms
server time: 1000ms tick at: 33.3333ms
processed 43478 packets!
iteration took: 1.01342ms
sleeping for: 32.3199ms
server time: 1033.33ms tick at: 33.3333ms
processed 42114 packets!
iteration took: 1.04946ms
sleeping for: 32.2839ms
server time: 1066.67ms tick at: 33.3333ms
processed 44454 packets!
iteration took: 1.1003ms
sleeping for: 32.233ms

Look at that, 44k packets iterated over in 1ms. That leaves our server 32ms out of 33ms to do it’s game play calculations, neato!

OK so I am only sending 4 bytes, so my next trick will be bumping the buffer to 600-1024 bytes, and also implementing ChaCha20-Poly1305 encryption with verification of packets before I enqueue them into the LFQ. So yeah, look forward to doing that over the next two days or so!

Peace.