Event-Driven Architecture in C#: From Tight Coupling to MassTransit

April 20, 2026

Event-Driven Architecture in C#

The Problem

In OfficeOS, every service that wanted to log had to explicitly import the AgentLogService. The cron scheduler manually called what to log. The AgentTurnService even had its own TurnLogger for "structured turn logging" — half the code was logging instead of business logic.

if (agent is null) { log.Error($"Agent {agentId} not found"); return; } if (string.IsNullOrEmpty(agent.PodName)) { log.Error($"Agent {agentId} has no pod"); return; } log.TurnStart(userMessage);

Before — Tight Coupling

public class AgentTurnService : IAgentTurnService { private readonly IAgentRepository _repo; private readonly IAgentLogService _log; private readonly ITurnLogger _turnLogger; private readonly IMetricsService _metrics; public async Task ProcessTurn(Guid agentId, string userMessage) { var agent = await _repo.GetById(agentId); if (agent is null) { _log.Error($"Agent {agentId} not found"); _metrics.IncrementErrorCounter("not_found"); return; } if (string.IsNullOrEmpty(agent.PodName)) { _log.Error($"Agent {agentId} has no pod"); _metrics.IncrementErrorCounter("missing_pod"); return; } _turnLogger.TurnStart(userMessage); // Business logic buried under logging... } }

Every concern is manually wired in. Adding metrics means injecting another service. Adding audit logging means another dependency. The service grows sideways.

After — Events with MassTransit

Services publish events. They do not care who consumes them. Logging, metrics, billing — all separate consumers reacting to the same events.

// Domain events public record AgentTurnStarted(Guid AgentId, string Message, DateTime Timestamp); public record AgentErrorOccurred(Guid AgentId, string ErrorCode, string Detail); // Service — only publishes events public class AgentTurnService : IAgentTurnService { private readonly IAgentRepository _repo; private readonly IPublishEndpoint _bus; public async Task ProcessTurn(Guid agentId, string userMessage) { var agent = await _repo.GetById(agentId); if (agent is null) { await _bus.Publish(new AgentErrorOccurred(agentId, "NOT_FOUND", "Agent not found")); return; } if (string.IsNullOrEmpty(agent.PodName)) { await _bus.Publish(new AgentErrorOccurred(agentId, "NO_POD", "Agent has no pod")); return; } await _bus.Publish(new AgentTurnStarted(agentId, userMessage, DateTime.UtcNow)); // Pure business logic from here } } // Consumer 1 — Logging public class LoggingConsumer : IConsumer<AgentTurnStarted> { private readonly IAgentLogService _logService; public async Task Consume(ConsumeContext<AgentTurnStarted> context) { _logService.TurnStart(context.Message.Message); } } // Consumer 2 — Usage Metrics public class UsageMetricsConsumer : IConsumer<AgentTurnStarted> { public async Task Consume(ConsumeContext<AgentTurnStarted> context) { // Token metered billing logic } }

Adding a new concern means adding a new consumer. The service never changes. No imports, no constructor bloat.

The Full Architecture

The event bus handles the entire agent workflow:

  • User creates agent → agent has integrations and channels
  • Messages arrive bidirectionally (chat, channel integrations)
  • Agent processes messages and publishes events
  • Token consumption is tracked via events
  • Sessions restart with skills injected per DB query

The mental model simplifies to: events and consumers. Nothing explicitly imports anything else.

Guard Pattern — Synchronous Checks in an Async System

Sometimes you need a synchronous answer before proceeding — like checking if a billing quota is exceeded before executing a tool call. The pattern: a consumer updates a Redis cache asynchronously, a Guard reads it synchronously.

// Guard interface public interface IBillingGuard { Task<bool> IsQuotaExceeded(Guid agentId); Task ThrowIfQuotaExceeded(Guid agentId); } // Guard reads from Redis public class BillingGuard : IBillingGuard { private readonly IDistributedCache _cache; public async Task<bool> IsQuotaExceeded(Guid agentId) { var status = await _cache.GetStringAsync($"billing_status:{agentId}"); return status == "limit_reached"; } public async Task ThrowIfQuotaExceeded(Guid agentId) { if (await IsQuotaExceeded(agentId)) throw new QuotaExceededException($"Agent {agentId} has reached the token limit."); } } // Usage — check before each tool execution public async Task RunAgentTurn(Guid agentId) { foreach (var tool in _plannedTools) { await _billingGuard.ThrowIfQuotaExceeded(agentId); var result = await tool.Execute(); await _bus.Publish(new TokenConsumed(agentId, result.Tokens)); } } // Consumer updates cache when tokens are consumed public class BillingConsumer : IConsumer<TokenConsumed> { private readonly IBillingRepository _repo; private readonly IDistributedCache _cache; public async Task Consume(ConsumeContext<TokenConsumed> context) { var usage = await _repo.UpdateUsage(context.Message.AgentId, context.Message.Amount); if (usage >= limit) { await _cache.SetStringAsync( $"billing_status:{context.Message.AgentId}", "limit_reached"); await context.Publish( new AgentSuspended(context.Message.AgentId, "Quota Exceeded")); } } }

The consumer keeps the cache warm. The guard reads it in constant time. The service stays clean. This pattern extends to any case where an async system needs to gate a synchronous decision — rate limiting, feature flags, permission checks.

GitHub
LinkedIn
youtube