This project implements a realtime messaging service decoupled via message queues (MQ), with:
- Broadcast: channel-wide messages; clients may publish over WebSocket or external services over HTTP.
- Presence: per-channel online state with join, update, leave events and optional full list sync.
The stack uses Redis for persistence primitives and a pluggable MQ for broadcast:
- Broadcast MQ (pluggable): default Redis Pub/Sub; Kafka, RabbitMQ, etc. can plug in via
MessageBroker. - Presence store (Redis):
- Redis Hash + TTL holds online users per channel.
Implemented in Go; WebSocket uses gorilla/websocket; HTTP API uses gin.
-
WebSocket Gateway (
internal/gateway)- Manages connections, Ping/Pong heartbeats, and channel subscriptions.
- Handles client
broadcastandpresencemessages. - Publishes to MQ (
internal/broker) so all instances receive events; each instance delivers to locally subscribed clients.
-
Presence store (
internal/presence)- Redis Hash per channel for online users:
- Key:
presence:<channel> - Field:
user_id - Value: JSON metadata (
meta,updated_at).
- Key:
- TTL evicts stale users without heartbeats.
- Redis Hash per channel for online users:
-
HTTP Event API (
POST /api/event)- Lets backend services publish a channel event (broadcast payload) over HTTP.
- Auth/authorization is left as an extension; validation then publish to MQ.
-
MQ abstraction (
internal/broker)MessageBrokerhides Redis / Kafka / RabbitMQ differences.- Current implementation:
RedisPubSubBroker(Redis Pub/Sub).
cmd/server- Entrypoint: HTTP + WebSocket server.
internal/brokerbroker.go:MessageBrokerinterface.factory.go: composes Presence + broadcast plugins.redis_pubsub.go: Redis Pub/Sub implementation; default broadcast plugin (BROADCAST_MQ_TYPE=redis).
internal/presencestore.go: Redis Hash + TTL presence store.
internal/gatewayclient.go: per-connection read/write loops and heartbeat.jwt.go: optional HS256 JWT validation for WebSocket; user id from claims.cors.go: CORS middleware for cross-origin calls to/api/event.hub.go: local clients and subscriptions.message.go: client and internal bus message types.server.go: routes/ws,POST /api/event, and core logic.
Clients send JSON over WebSocket:
- Subscribe
{
"type": "subscribe",
"channel": "room1",
"listen": ["broadcast", "presence"]
}- Broadcast
{
"type": "broadcast",
"channel": "room1",
"event": "user-message",
"payload": { "text": "hello" }
}- Presence join / update / leave
{
"type": "presence_join",
"channel": "room1",
"payload": { "name": "Alice" }
}{
"type": "presence_update",
"channel": "room1",
"payload": { "name": "Alice", "status": "typing" }
}{
"type": "presence_leave",
"channel": "room1"
}Defined in internal/gateway/message.go:
BusMessage:Type:broadcast/presence_join/presence_update/presence_leaveChannelID: channel idUserID: user who triggered the eventEvent: broadcast event namePayload: broadcast bodyMeta: presence payload (PresenceMeta)Timestamp: Unix ms
- Key:
presence:<channel> - Field:
<user_id> - Example value:
{
"user_id": "user123",
"meta": { "name": "Alice" },
"updated_at": 1620000000
}TTL is controlled by PRESENCE_TTL_SECONDS (default 60).
- Client A on gateway G1 sends
type=broadcast. - G1 wraps a
BusMessageand callsMessageBroker.PublishBroadcast("room1", data). - Redis Pub/Sub delivers to all gateways subscribed to
broadcast.*. - Each gateway looks up local subscribers for that channel with
listencontainingbroadcast. - Push JSON to those clients.
- Client B on G2 sends
presence_joinorpresence_update. - G2 writes Redis Hash (
HSET presence:room1 user123 <json>) and refreshes TTL. - G2 publishes
BusMessageviaPublishPresenceEvent("room1", data). - All gateways forward presence events to clients subscribed to presence on that channel.
- Clients update local presence UI.
- Client subscribes with
listencontaining"presence". - Gateway calls
List(channel)on the presence store. - Sends a
presence_syncpayload to that client only.
- Caller:
POST /api/event
Content-Type: application/json
{
"channel": "room1",
"event": "user-message",
"payload": { "text": "hello from http" }
}- HTTP layer may enforce auth (extension point).
- Convert to
BusMessageandPublishBroadcast. - Same delivery path as client broadcast.
- Go 1.22+
- Redis 6+ (standalone, Sentinel, or Cluster)
On startup, cmd/server loads a .env file in the current working directory (via godotenv). Variables already set in the shell take precedence. If you run go run ./cmd/server from the repo root, place .env there; running from another directory will not find it unless you cd or set env vars manually.
SERVER_ADDR: HTTP listen address, default:8080.REDIS_ADDR: Redis address, default127.0.0.1:6379.REDIS_PASSWORD: optional.REDIS_DB: Redis DB index, default0.PRESENCE_TTL_SECONDS: presence TTL seconds, default60.BROADCAST_MQ_TYPE: broadcast backend name, defaultredis(extend withkafka,rabbitmq, etc.).BROADCAST_MQ_ADDR/BROADCAST_MQ_PASSWORD/BROADCAST_MQ_DB: connection for the broadcast plugin (Redis plugin; others may reuse or add keys).CORS_ALLOWED_ORIGINS: allowedOriginvalues forPOST /api/event. Default*for dev; in production use a comma-separated list, e.g.https://app.example.com.JWT_SECRET: if set, WebSocket connections must present a valid HS256 JWT; the user id is read from JWT claims (not fromuser_idquery). Send the token viaAuthorization: Bearer <token>or querytoken/access_token(browsers often use?token=on the WebSocket URL). If unset, the legacyuser_idquery parameter is accepted (dev only).JWT_USER_CLAIM: claim name for the user id when using JWT (defaultsub). Falls back touser_idorsubin the payload if needed.
go run ./cmd/serverThe repo includes Dockerfile and docker-compose.yml starting Redis and the realtime service:
docker compose up --build- HTTP / WebSocket:
http://localhost:8080(POST /api/event,/ws) - Redis exposed on host
6379for debugging
realtime may set CORS_ALLOWED_ORIGINS=* for local use; tighten origins in production.
Inside containers: REDIS_ADDR=redis:6379, BROADCAST_MQ_ADDR=redis:6379. Override via docker-compose.yml realtime.environment or docker compose --env-file .env up.
- URL:
ws://localhost:8080/ws?user_id=user123 - Use a browser or CLI WebSocket client for testing.
internal/broker.MessageBroker abstracts:
PublishBroadcast/SubscribeBroadcastPublishPresenceEvent/SubscribePresenceEvent
To add Kafka or RabbitMQ:
- Add
kafka.go/rabbitmq.gounderinternal/broker. - Map
channelto topics/exchanges/routing keys as needed. - Wire the chosen implementation in
cmd/server/main.gointogateway.Server.
- WebSocket: If
JWT_SECRETis set, the server validates an HS256 JWT on connect and setsClient.UserIDfrom claims (JWT_USER_CLAIM, defaultsub). Pass the token asAuthorization: Bearer …or?token=/?access_token=on the upgrade request. IfJWT_SECRETis not set,user_idmay be passed as a query parameter (dev only). POST /api/event: Still unauthenticated in this sample; add API keys, JWT, or mTLS in front as needed.
- Metrics: connections/sec, messages, Redis latency, MQ lag; integrate Prometheus/Grafana.
- Scale out: gateway is stateless; put a load balancer in front (Envoy, ALB, etc.).
- Redis HA: Sentinel or Cluster.
This service provides an MQ-abstracted realtime layer with:
- Horizontal scaling
- Channel broadcast
- Presence sync
- External HTTP broadcast API
Swap MQ implementations behind MessageBroker as your throughput and reliability needs grow.