Networked Equipment Events

Published by

on

Players looking at their sweet swords

While I’ve been slowly dealing with the fallout of fixing my remapping server id’s, I’ve also been trying to get networked equipment events to work. This allows other players to see players in the area equipping new items such as weapons.

It all starts in UE5. A user opens up their inventory, opens up their equipment window and drags a weapon on to the weapon slot. This kicks off a series of processes, which I’ve detailed in my Simple RPC event system post. But how do we get that equip event to display when a networked player does it?

Frame cache

We start from the servers point of view. As a refresher, each tick our server has a Delta snapshot process that collects all our entities of interest and does a differential of what has changed on a per user basis. We then send these user specific deltas to each client. This process right now is terribly inefficient, I just built it to get something in. What I was doing was iterating over every player and independently collecting their details for every player, basically multiple times. This means if 5 players can all see each other in a sensor, I was looking up the same characters multiple times and storing their details multiple times.

So I took the time to enhance at least part of the process by adding a frame cache. Now instead of looping over visible players and extracting out their details, I first check to see if we already have iterated over this character and pull it from a frame specific cache.

const CachedEntityData* FrameEntityCache::GetOrCache(flecs::entity Neighbor, logging::Logger* Log)
    {
        uint64_t EntityId = Neighbor.id();

        auto It = Cache.find(EntityId);
        if (It != Cache.end() && It->second.FrameNumber == CurrentFrame)
        {
            Log->Debug("FrameCache: Hit for entity {}", EntityId);
            return &It->second;
        }
        
        if (!Neighbor.is_valid() || !Neighbor.is_alive())
        {
            Log->Warn("FrameCache: Invalid entity {}", EntityId);
            return nullptr;
        }

        CachedEntityData Data;
        Data.EntityId = EntityId;
        Data.FrameNumber = CurrentFrame;

        auto Pos = Neighbor.get_ref<units::Vector3>();
        auto Vel = Neighbor.get_ref<units::Velocity>();
        auto Rot = Neighbor.get_ref<units::Quat>();
        auto Attributes = Neighbor.get_ref<units::Attributes>();
        auto Traits = Neighbor.get_ref<units::TraitAttributes>();

        Data.Position = *Pos.get();
        Data.Velocity = *Vel.get();
        Data.Rotation = *Rot.get();
        Data.Attributes = *Attributes.get();
        Data.Traits = *Traits.get();

        // Serialize equipment once
        items::SerializeEquipment(Log, Data.Equipment, Neighbor);
        if (Neighbor.has<character::ActionType>())
        {
            Data.Action = Neighbor.get_constant<character::ActionType>();
        }

        // Extract damage events
        auto NeighborVisibility = Neighbor.get<character::Visibility>();
        Data.OutgoingAttacks.clear();
        Data.OutgoingAttacks.reserve(NeighborVisibility.DamageEvents.size());

        Log->Debug("FrameCache: ({}) NeighborVisibility.DamageEvents: {}", EntityId, NeighborVisibility.DamageEvents.size());

        for (auto EventEntity : NeighborVisibility.DamageEvents)
        {
            if (!EventEntity.is_valid() || !EventEntity.is_alive())
            {
                Log->Debug("FrameCache: DamageEvent EventEntity was not valid!!");
                continue;
            }

            auto DamageEvent = EventEntity.get<combat::ApplyDamage>();
            if (DamageEvent.Instigator.id() == EntityId)
            {
                Data.OutgoingAttacks.emplace_back(DamageEvent);
                Log->Debug("FrameCache: ({}) OutgoingAttacks: {} emplaced", EntityId, DamageEvent.Instigator.id());
            }
        }

        // Store in cache and return pointer
        auto [InsertIt, Inserted] = Cache.insert_or_assign(EntityId, std::move(Data));
        return &InsertIt->second;
    }
}

Now we only have to do the look up/caching once per character per frame. Of course a further enhancement to this will be having a ‘dirty bit’, where instead of doing this every frame, we can only update the cache if something actually changed. Now when applying visibility for players we can get from the cache, or fill in the cache so we can ensure we only do this process once per character.

Serializing player equipment

OK with the frame cache in place, we still need to know if the player(s) around the current player equipped anything. For that we need to look into our items::SerializeEquipment function.

void SerializeEquipment(logging::Logger *Logger, std::array<state::ItemEntity, items::EquipSlotMax> &EquipmentEntities, flecs::entity &Player)
{
	if (!Player.has(flecs::IsA, Player.world().id<items::Body>()))
	{
		return;
	}

	for (const auto Slot : SlotTypes)
	{
		SerializeSlot(Logger, Player, EquipmentEntities, Slot);
	}
}

// ...
void SerializeSlot(logging::Logger *Logger, flecs::entity &Player, std::array<state::ItemEntity, items::EquipSlotMax> &EquipmentEntities, const EquipmentSlot Slot)
{
	flecs::world GameWorld = Player.world();
	flecs::entity TargetSlot;
	flecs::entity EquippedItem;
	
	switch (Slot)
	{
		// Armor
		case EquipmentSlot::HEAD:
	// ...
	}
	// ...
	EquipmentEntities.at(static_cast<uint8_t>(Slot)) = state::ItemEntity{
		.EntityId=EquippedItem.id(), 
		.ItemId=ItemData.Id, 
		.ItemType=ItemData.Type
	};
}

This is a very basic serialization routine, we simply store an ItemEntity (server id, item type and item type id) in an array, where the Slot is just the index into the array. So HEAD is element 0 in the array, NECK is 1, SHOULDERS 2, and so forth.

Now with the equipment serialized in a nice tiny array and stored in the per-user’s state cache, we are ready to send it over the network using our Delta snapshot, note the way this works is we only send whats changed in an even smaller vector:

// NewDelta, as opposed to a UpdatedDelta, but the logic is the same for equipment, we just set bSkipComparison, since here we have no snapshot to compare against and must send all equipment.
void PlayerWorldStates::NewDelta(logging::Logger *Log, flatbuffers::FlatBufferBuilder &Builder, uint64_t ServerTime, uint64_t CurrentSequenceId)
{

	// handle equipment for the first time
	std::vector<flatbuffers::Offset<Game::Message::Equipment>> OutEquipment;
	flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Game::Message::Equipment>>> Equip;
	bool bSkipComparison = true;
	
	bool bEvenHasEquipment = UpdateEquipment(Builder, Player.Equipment, Player.Equipment, OutEquipment, bSkipComparison);
	if (bEvenHasEquipment)
	{
		Equip = Builder.CreateVectorOfSortedTables(&OutEquipment);
	}

	Game::Message::PlayerEntityBuilder PlayEnt(Builder);
	if (bEvenHasEquipment)
	{
		PlayEnt.add_equipment(Equip);
	}

// Compares previous equipment (or skips if bSkipComparison is true), and only captures those changes and the slot, this way we don't send over 16 slots of nothing everytime, but only the slots that are equipped/changed.
bool PlayerWorldStates::UpdateEquipment(flatbuffers::FlatBufferBuilder &Builder, const std::array<state::ItemEntity, items::EquipSlotMax> &CurrentEquipment, const std::array<state::ItemEntity, items::EquipSlotMax> &PreviousEquipment, std::vector<flatbuffers::Offset<Game::Message::Equipment>> &OutEquipment, bool bSkipComparison)
{
	for (uint8_t i = 0; i < CurrentEquipment.size(); ++i) 
	{
		// if we are skipping comparision check if we even have a valid item equipped
		if (bSkipComparison && CurrentEquipment[i].ItemId == 0)
		{
			continue;
		}

		if (bSkipComparison || CurrentEquipment[i] != PreviousEquipment[i]) 
		{
			Game::Message::ServerItemEntityBuilder ItemBuilder(Builder);
			ItemBuilder.add_id(CurrentEquipment[i].EntityId);
			ItemBuilder.add_item_id(CurrentEquipment[i].ItemId);
			ItemBuilder.add_type(static_cast<uint8_t>(CurrentEquipment[i].ItemType));
			auto ItemFinished = ItemBuilder.Finish();
			Game::Message::EquipmentBuilder EquipmentBuilder(Builder);
			EquipmentBuilder.add_entity(ItemFinished);
			EquipmentBuilder.add_slot(static_cast<Game::Message::EquipmentSlot>(i));
			OutEquipment.push_back(EquipmentBuilder.Finish());
		}
	}
	return OutEquipment.size() > 0;
}

We now have our delta snapshots to send to clients. Which is done in a flecs system:

// Sending out all client state should be the last system we run
GameWorld.system<network::User, network::UserTime, state::PlayerWorldStates, network::Connected>("SendClientStateUpdate")
	.kind(flecs::PostFrame)
	.each([&](flecs::iter& It, size_t Index, network::User &Network, network::UserTime &Time, state::PlayerWorldStates &States, network::Connected &Connected) 
	{
		flatbuffers::FlatBufferBuilder Builder(1024);
		// Get this clients latest state
		auto Now = std::chrono::steady_clock::now();
		auto UserTime = Now - Time.UserTime;
		const auto ServerTime = std::chrono::duration_cast<std::chrono::milliseconds>(UserTime).count();
		States.Delta(Logger, Builder, ServerTime);
		auto OutputLen = Builder.GetSize();
		auto ServerCommands = std::make_unique<std::vector<uint8_t>>(OutputLen);
		
		std::memcpy(ServerCommands->data(), Builder.GetBufferPointer(), OutputLen);
		Logger->Debug("PostFrame: Sending Client {} SeqId {}-1 Time: {} State size: {}, Last Seen Id: {}", Network.UserId, States.SequenceId, ServerTime, static_cast<uint64_t>(OutputLen), States.ClientSequenceId);
		
		auto CommandMessage = std::make_unique<Game::Message::GameMessage>(Network.UserId, std::move(ServerCommands), MessageData_EncServerCommand);
		OutQueue->Push(std::move(CommandMessage));
		Builder.Release();
		Thread->NotifyWrite();
	});

This is our primary ‘delta’ snapshot system, but it has a rather big flaw.

  1. Our max packet size is actually 1024 bytes, what happens if we have 100 players…?
    1. Packet fragmentation only works for reliable channels, not the unreliable channel which this delta snapshot uses.
  2. Equipping things should be an ‘event’ and we can’t afford to lose/ignore it, we probably shouldn’t be doing this in the unreliable channel.

I’m going to be re-working my delta snapshot system to do better distance calculations, and move a lot of this other stuff to reliable channels, but that’s a post I’m working on writing up after the next post. Right now we only have 2 players on screen so this will work for the short term. I don’t see the delta work being a major re-work to fix this, I just need to split out my delta snapshots and move some of the data to the reliable channel. But I digress!

From the client’s point of view

Player updates get processed in our PMOWorld ProcessServerUpdate method. When we update/add new players we are going to extract this equipment information:

void PMOWorld::UpdateNetworkPlayerEquipment(flecs::entity NetworkPlayer, const flatbuffers::Vector<flatbuffers::Offset<Game::Message::Equipment>> *EquipmentUpdates, flecs::entity NetworkPlayerInventory)
{
	items::NetworkPlayerEquipEvent Equipped = {.NetworkPlayer = NetworkPlayer};
	bool bHadEquipEvent = false;

	items::NetworkPlayerRemoveEquipEvent Unequipped = {.NetworkPlayer = NetworkPlayer};
	bool bHadUnequipEvent = false;

	World.defer_suspend();
	for (auto It = EquipmentUpdates->begin(); It != EquipmentUpdates->end(); ++It)
	{
		const auto Slot = It->slot();
		const auto ItemType = It->entity()->type();
		const auto ItemId = It->entity()->item_id(); // we only care about item id for prefab creation, server id is not important
		const bool bWasRemoved = It->remove();

		Log->Info("PreFrame: Slot {}, ItemType: {}, ItemId: {}, WasRemoved: {}", static_cast<uint8_t>(Slot), ItemType, ItemId, bWasRemoved);

		// make sure it's valid
		if (Slot == Game::Message::EquipmentSlot_INVALID || ItemType == 0 || ItemId == 0)
		{
			continue;
		}
		
		if (bWasRemoved)
		{
			Unequipped.Items.at(Slot) = items::GameItem{.Id=ItemId, .Type=static_cast<items::ItemType>(ItemType)};
			bHadUnequipEvent = true;
			continue;
		}

		const auto ClientWorldId = World.id();
		auto InventoryItem = EquipmentService->SpawnFromPrefab(World, ClientWorldId, ItemId, static_cast<items::ItemType>(ItemType));
		if (!InventoryItem.is_valid())
		{
			Log->Warn("PreFrame: Failed to spawn item for network client");
			continue;
		}
		Log->Warn("PreFrame INVENTORY: {}", InventoryItem.get_mut<items::GameItem>().ServerId);
		bool bEquipped = items::EquipItem(NetworkPlayer, InventoryItem, static_cast<items::EquipmentSlot>(Slot), NetworkPlayerInventory);
		if (bEquipped)
		{
			Equipped.Items.at(Slot) = items::GameItem{.Id=ItemId, .Type=static_cast<items::ItemType>(ItemType)};
			bHadEquipEvent = true;
		}
		Log->Info("PreFrame: Slot {}, ItemType: {}, ItemId: {}, WasEquipped: {}", static_cast<uint8_t>(Slot), ItemType, ItemId, bEquipped);
	}
	World.defer_resume();

	if (bHadEquipEvent)
	{
		NetworkPlayer.set<items::NetworkPlayerEquipEvent>(Equipped);
		Log->Info("PreFrame: NetworkPlayerEquipEvent set for networked player");
	}

	if (bHadUnequipEvent)
	{
		NetworkPlayer.set<items::NetworkPlayerRemoveEquipEvent>(Unequipped);
		Log->Debug("PreFrame: NetworkPlayerRemoveEquipEvent set for networked player");
	}

	NetworkPlayer.add(animations::AnimSet::SWORDSH); // TODO: get from equipped item
}

This method works in the following manner:

  1. We have two component types, an equip type and an unequip type. Part of the flatbuffer message includes a ‘was this equipment removed’ flag.
  2. If it was removed, we add it to the unequipped component array.
  3. If it was added, we add it to the equipped component array.
    1. Call EquipmentService to spawn the item type from our prefab list
    2. Call EquipItem to actually equip it
  4. Then depending on the type(s) we set a new component on the player to signal that an event occurred.

There maybe one thing you didn’t notice: World.defer_suspend() and World.defer_resume() and this is a very important point that stumped me for a long time.

Inside of EquipmentService->SpawnFromPrefab we need to modify a component and see its changes inside that same function, this is a big no-no in flecs systems’ land because of the way that data model writes are executed. All writes are staged, but not actually written until it is safe to do so. (Think of iterating over a container while deleting the items, same concept).

In our case we need to look up an item, then instantiate it, then modify it, all within a flecs system, which again isn’t really allowed unless you do two things: Set your flecs system to use immediate and use World.defer_suspend() / World.defer_resume():

flecs::entity Equipment::SpawnFromPrefab(flecs::world &GameWorld, const uint64_t ServerId, const uint16_t EquipmentId, const items::ItemType TypeOfItem)
    {
        flecs::entity FoundItem;
        
        switch (TypeOfItem)
        {
		    // ...
            case ItemType::WEAPON:
            {
                FoundItem = WeaponQuery.find([&](items::GameItem &Item, items::Weapon)
                {
                    return Item.Id == EquipmentId && Item.Type == TypeOfItem;
                });
                break;
            }
            // ...
        }

        if (FoundItem.is_valid())
        {
            flecs::entity Instance = GameWorld.entity().is_a(FoundItem);
            Instance.get_mut<items::GameItem>().ServerId = ServerId;
            return Instance;
        }
        
        // return the invalid object instead so the caller can test
        return FoundItem;
    }
}

If we didn’t use an immediate system and the defers, creating the Instance and then trying to write the ServerId wouldn’t work, get_mut<itgems::GameItem> would fail with an assertion. This really threw me off and I couldn’t figure out why I was getting asserts until I asked on the flecs discord. I had a similar problem when creating a new networked player and trying to instantiate their inventory, where I needed to get a reference to a relationship but you can’t do that unless it’s immediate and you defer.

Visualizing it

So our server handles sending the data, our client now set a items::NetworkPlayerEquipEvent component which we can read from, how do we get this information to our UE5 frontend?

With a flecs system defined in our GameMode:

FlecsWorld.system<NetworkCharacterRef, items::NetworkPlayerEquipEvent>("UpdateNetCharacterEquip")
	.kind(flecs::PreStore)
	//.term_at(0).second(flecs::Wildcard)
	.each([&](flecs::iter& It, size_t Index, NetworkCharacterRef& NetActor, items::NetworkPlayerEquipEvent &Equip)
	{
		UE_LOG(LogTemp, Warning, TEXT("PreStore: UpdateNetCharacterEquip Updating Equipment..."));
		NetActor.Actor->UpdateEquipment(Equip);
		It.entity(Index).remove<items::NetworkPlayerEquipEvent>();
	});

We get a reference to our NetworkActor and call it’s UpdateEquipment method which calls into its InventoryComponent to handle adding it to the skeletal mesh.

void UPMOInventoryComponent::SpawnAndAttachEquipment(const FWeaponData& WeaponData, const EEquipmentSlot Slot)
{
	UE_LOG(LogTemp, Warning, TEXT("Equipping Weapon to slot: %d"), Slot);
	Unequip(Slot);
	FName SocketName;
	if (Slot == EEquipmentSlot::LEFT_HAND)
	{
		SocketName = FName(WeaponData.Name.ToLower().Replace(TEXT(" "), TEXT("")) + TEXT("_l"));
	}
	else if (Slot == EEquipmentSlot::RIGHT_HAND)
	{
		SocketName = FName(WeaponData.Name.ToLower().Replace(TEXT(" "), TEXT("")) + TEXT("_r"));
	}

	UE_LOG(LogTemp, Warning, TEXT("Equipping Weapon to Socket: %s"), *SocketName.ToString());

	auto CharacterMesh = GetOwner()->GetComponentByClass<USkeletalMeshComponent>();

	FTransform Transform; // Just use empty transform
	auto Weapon = GetWorld()->SpawnActorDeferred<AWeapon>(AWeapon::StaticClass(), Transform, GetOwner());
	FAttachmentTransformRules Rules(EAttachmentRule::SnapToTarget, true);

	Weapon->WeaponData = WeaponData;
	UGameplayStatics::FinishSpawningActor(Weapon, Transform);

	Weapon->AttachToComponent(CharacterMesh, Rules, SocketName);
	EquippedItems.Emplace(Slot, Weapon);
}

And that is how we handle spawning equipment over the network!