UDP Reliability Part 1 (Connections)

Published by

on

I’ve started working on the reliability layer of PMO. Good place to start is with the concept of “connections”. Since we are dealing with UDP, this isn’t really a thing, as each packet is independent. I plan on building the entire reliability layer inside of flecs’ systems, queries and observers.

All of this is done inside of the network::Network module code. A “connection” can be in one of four states:

  1. Pre-Auth
  2. Connected
  3. Heart beat timed out
  4. Disconnected

Pre-Auth

This is where the client is in the process of negotiating a symmetrical key from the game server. A Player entity has been created and introduced into the game world with it’s network::Network components. TODO: Server needs to handle the case where we send the generated key to the client, but then client never responds!

Connected

This is where the client has begun sending ClientCommand messages, which are decrypted by the server thread then pushed onto our lock free queue to be read by the main game loop.

Heart beat timed out

This occurs when a client hasn’t sent a ClientCommand in a configurable amount of time (or the server hasn’t seen one due to network connectivity issues). But it’s still within the “not disconnected” threshold amount. We want this state so we can potentially increase input buffer sizes for replays or figure out how to handle gaps in movement/other messages if the client comes back before the deadline.

Disconnected

This occurs when a client exceeds the heartbeat and timed out values and is considered dead. We will delete the entity from the game world.

Game loop

Since we are doing the connection management logic inside of our game loop, it helps to understand what this process will look like.

while(GameWorld.progress())
{
    for (auto Packet = InQueue->Pop(); Packet != std::nullopt; Packet = InQueue->Pop())
    {
        auto PacketUserId = Packet->get()->UserId;
        switch (Packet->get()->MessageType)
        {
            case Game::Message::MessageType_ClientCommand:
            {
                auto Command = Game::Message::GetClientCommand(Packet->get()->MessageData->data());

                auto NetEntity = GameWorld.get_mut<network::Network>(); 
                NetEntity->ExecServerQueries(GameWorld, PacketUserId, Command);
            }
            default:
            {
                break;
            }
        }

        // process outgoing packets
        for (auto Packet = OutQueue->Pop(); Packet != std::nullopt; Packet = OutQueue->Pop())
        {
            switch (Packet->get()->MessageType)
            {
                // GenKeyResponse means we have a new player
                case Game::Message::MessageType_GenKeyResponse:
                {
                    auto NewPlayer = GameWorld.entity()
                        .set<network::NetworkComponent>({Packet->get()->UserId, Packet->get()->ClientAddr})
                        .set<network::NetworkServerComponent>({&NetworkThread})
                        .set<network::AuthComponent>({std::move(Packet->get()->MessageData)});
                    break;
                }
                default:
                {
                    break;
                }
            }
        }
    }
}

Some error handling and logging has been omitted, but the gist is once we prepare sending a key to the client, we’ll add them to the world, once we start getting client commands from them, we’ll grab a reference to our network module, then execute some queries.

Right now only two queries are defined:

void Network::ExecServerQueries(flecs::world &GameWorld, const uint32_t UserId, const Game::Message::ClientCommand *Command)
{
    GameWorld.defer_begin();
    ExecServerHeartbeatQuery(GameWorld, UserId, Command);
    ExecAuthQuery(GameWorld, UserId, Command);
    GameWorld.defer_end();
}

Note the defer_begin() and defer_end() calls, those are necessary since we are modifying state of entities in the queries.

Heart beat query is pretty straight forward, we query for all entities that have a NetworkComponent, NetworkServerComponent, and a Connected component. We then check if the packet we just saw matches that user, then update their last time seen as well as the last sequence id sent by the client.

ServerHeartbeatQuery.each([&](flecs::entity Player, network::NetworkComponent &Network, network::NetworkServerComponent &Server, network::Connected &Connected) 
    {
        if (Network.UserId != UserId)
        {
            return;
        }
        Player.set<network::Connected>({std::chrono::system_clock::now(), Command->sequence_id()});
    }
);

Auth query is some what similar:

AuthQuery.each([&](flecs::entity Player, network::NetworkComponent &Network, network::NetworkServerComponent &, network::AuthComponent &) 
    {
        if (Network.UserId != UserId)
        {
            return;
        }

        Player.remove<network::AuthComponent>()
              .add<network::Connected>()
              .is_a<prefabs::Character>();
    }
);

Here we are removing the AuthComponent because they are now authenticated, add the Connected component and generate the entity from our prefab Character.

So now we have a method of updating each user with a last seen timestamp. We can now use this data in our flecs “HeartbeatCheck” system:

GameWorld.system<network::NetworkComponent, network::NetworkServerComponent, network::Connected>("HeartbeatCheck")
            .each([&](flecs::iter& It, size_t Index, network::NetworkComponent &Network, network::NetworkServerComponent &Client, network::Connected &Connected) 
{
    auto Now = std::chrono::system_clock::now();
    auto Updated = std::chrono::duration_cast<std::chrono::milliseconds>(Now-Connected.LastUpdate);
    auto Player = It.entity(Index);
    
    Logger->Warn("Client entity {} Heartbeat now-last = {} heart beat timeout = {}", Player.id(), (Updated).count(), HeartbeatTimeout.count());
    if (Updated < HeartbeatTimeout)
    {
        Player.remove<HeartbeatTimedOut>();
        return;
    }
    else if (Updated > HeartbeatTimeout && Updated < ConnectionTimeout && !Player.has<HeartbeatTimedOut>())
    {
        Logger->Warn("Client entity {} has not responed in {} milliseconds", Player.id(), HeartbeatTimeout.count());
        Player.set<HeartbeatTimedOut>({Now});
    }
    else if (Updated > ConnectionTimeout)
    {
        Logger->Warn("Client entity {} has not responded in {} milliseconds, removing", Player.id(), ConnectionTimeout.count());
        Player.destruct();
    }               
});

A player can be in a few states depending on when we last saw an update from them in the ServerHeartbeatQuery.

  1. We check if the last time they sent a message is less than our HeartbeatTimeout if it is, we remove any HeartbeatTimedOut component they may have (if they have one).
  2. If the last update we saw was greater than the HeartbeatTimeout but less than the ConnectionTimeout and they don’t have the HeartbeatTimedOut component, we set them as being in this HeartbeatTimedOut state.
  3. Finally, if they are above the connection timed out, then well we should destroy them.

The third step will probably have an observer to save state to the game world/database on disconnect, but I barely have a world at this point let alone a database with the game world state.

So that’s it! Best part with flecs is it’s super easy to test. I just create the entity, run through the various states and then compare them:

// Get mutable reference so we can set some setters.
auto Network = GameWorld.get_mut<network::Network>();
auto Heartbeat = std::chrono::milliseconds(100);
auto Timeout = std::chrono::milliseconds(400);

// make them short so we don't wait for 1s ~ 4s
Network->SetHeartbeatTimeout(Heartbeat);
Network->SetDisconnectedTimeout(Timeout);
        
flecs::entity Player = GameWorld.entity()
        .set<network::NetworkComponent>({1, Address, 0})
        .set<network::NetworkServerComponent>({nullptr})
        .set<network::Connected>({Start, 1});
...
auto TimedOut = false;
    GameWorld.observer<network::HeartbeatTimedOut>()
        .event(flecs::OnAdd)
        .each([&](flecs::iter& It, size_t Index, network::HeartbeatTimedOut) {
            TimedOut = true;
        }
    );

// ... sleep / spin for a bit to test if the above observer was called ...

// We should have timed out, but not disconnected yet
REQUIRE( TimedOut == true );
REQUIRE( Player.is_alive() == true );

// Update our connected with a new 'packet' to remove our HeartbeatTimedOut component
Player.set<network::Connected>({std::chrono::system_clock::now(), 2});
Logger.Info("Updated network::Connected");
GameWorld.progress();
// HeartbeatTimedOut should no longer be applied to the entity
REQUIRE( !Player.has<network::HeartbeatTimedOut>() );

// ... sleep / spin until the connectiontimeout should have triggered ...
REQUIRE( Player.is_alive() == false );

This of course is the easy part. I still am not entirely sure how I’m going to build the reliability layer for ensuring clients/server are resending packets when necessary, or interpolating when we get gaps in packets.

Once I figure out more I’ll document here of course.