# Publish/Subscribe

## What is Pub/Sub?

![Redis Pub/Sub Architecture](https://dingyuliang.me/wp-content/uploads/2018/02/redis-pubsub-768x407.png)

The Redis module provides native messaging capabilities through Redis Publish/Subscribe constructs. This allows your BoxLang applications to implement real-time messaging and event-driven architectures using Redis as the message broker.

{% embed url="<https://redis.io/topics/pubsub>" %}

> Redis Pub/Sub implements the Publish/Subscribe messaging paradigm. This decoupling of publishers and subscribers can allow for greater scalability and a more dynamic network topology.

**How it works:**

1. **Subscribers** express interest in one or more channels (literal channels or pattern channels)
2. **Publishers** send messages into channels
3. **Redis** pushes these messages to all subscribers that have matched the channel

## 📋 Pub/Sub BIFs Overview

The module provides two main functions for implementing pub/sub patterns:

| Function           | Parameters                            | Returns | Description                                                                                                                        |
| ------------------ | ------------------------------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------- |
| `redisPublish()`   | `channel`, `message`, `cacheName`     | numeric | Publishes a message to a Redis channel. Returns the number of subscribers that received the message.                               |
| `redisSubscribe()` | `subscriber`, `channels`, `cacheName` | struct  | Subscribes to one or more channels using a closure/lambda or listener class. Returns a struct with `future` and `subscriber` keys. |

{% hint style="warning" %}
Pattern-based publishing and subscriptions are not yet implemented in this version.
{% endhint %}

## 📤 Publishing Messages

Use `redisPublish()` to send messages to a Redis channel. The function returns the number of subscribers that received the message.

### Function Signature

```javascript
numeric function redisPublish(
    required string channel,
    required any message,
    string cacheName = "default"
)
```

### Parameters

* **channel** - The Redis channel name to publish to
* **message** - The message content to send (will be serialized)
* **cacheName** - The name of the Redis cache connection to use (default: "default")

### Publishing Example

```javascript
// Publish messages to a channel
println( "<h2>Publishing messages...</h2>" );

// Publish single messages
var subscriberCount = redisPublish( "notifications", "System maintenance starting" );
println( "Message sent to #subscriberCount# subscriber(s)" );

redisPublish( "notifications", "Deployment complete" );
redisPublish( "notifications", "All systems operational" );

// Publish structured data
redisPublish( "user-events", {
    "event": "login",
    "userId": 12345,
    "timestamp": now()
} );

println( "<h2>Finished publishing messages</h2>" );
```

### Use Cases for Publishing

* **System notifications** - Broadcast status updates
* **Real-time updates** - Push data changes to connected clients
* **Event broadcasting** - Notify multiple services of events
* **Cache invalidation** - Signal cache updates across servers
* **Workflow triggers** - Initiate processes across distributed systems

## 📥 Subscribing to Channels

Use `redisSubscribe()` to listen for messages on Redis channels. You can subscribe using either a closure/lambda or a listener class.

### Function Signature

```javascript
struct function redisSubscribe(
    required any subscriber,
    required any channels,
    string cacheName = "default"
)
```

### Parameters

* **subscriber** - A closure/lambda function or listener class instance
* **channels** - A single channel name (string) or array of channel names
* **cacheName** - The name of the Redis cache connection to use (default: "default")

### Return Value

Returns a struct with two keys:

* **future** - A `BoxFuture` that is running your subscriber asynchronously
* **subscriber** - The Java Redis subscriber object for inspection or unsubscribing

### Subscribing with a Closure

```javascript
// Subscribe using a lambda function
var subscription = redisSubscribe(
    ( channel, message ) => {
        println( "Received on channel [#channel#]: #message#" );

        // Process the message
        if( message contains "maintenance" ) {
            // Handle maintenance notification
            enableMaintenanceMode();
        }
    },
    "notifications"
);

println( "<h1>Subscription active! Check the logs.</h1>" );

// Subscribe to multiple channels
var multiSubscription = redisSubscribe(
    ( channel, message ) => {
        println( "Channel: #channel#, Message: #message#" );
    },
    [ "notifications", "alerts", "user-events" ]
);
```

### Subscribing with a Listener Class

For more complex subscription handling, create a listener class:

```javascript
// Create and use a listener class
var listener = new NotificationListener();
var subscription = redisSubscribe( listener, "notifications" );

println( "<h1>Listener subscribed! Monitoring for messages.</h1>" );
```

### Unsubscribing

To stop receiving messages, use the `unsubscribe()` method on the subscriber:

```javascript
// Unsubscribe from all channels
subscription.subscriber.unsubscribe();

// Unsubscribe from specific channels
subscription.subscriber.unsubscribe( "notifications" );

// Cancel the future to stop the subscription thread
subscription.future.cancel();
```

## 🎧 Subscriber Listener Class

When using a class as a subscriber, implement one or more of the following callback methods. Each method is optional and will only be called if defined.

### Complete Listener Class Example

```javascript
class {

    /**
     * Called when a message is received on a subscribed channel
     * @channel The channel that received the message
     * @message The message content
     */
    public void function onMessage( string channel, string message ) {
        println( "Message received on #arguments.channel#: #arguments.message#" );

        // Process message based on channel
        switch( arguments.channel ) {
            case "notifications":
                handleNotification( arguments.message );
                break;
            case "alerts":
                handleAlert( arguments.message );
                break;
            default:
                logMessage( arguments.channel, arguments.message );
        }
    }

    /**
     * Called when a message is received on a pattern-subscribed channel
     * @pattern The pattern that matched
     * @channel The actual channel name
     * @message The message content
     */
    public void function onPMessage( string pattern, string channel, string message ) {
        println( "Pattern message: pattern=#arguments.pattern#, channel=#arguments.channel#, message=#arguments.message#" );
    }

    /**
     * Called when successfully subscribed to a channel
     * @channel The channel subscribed to
     * @subscribedChannels Total number of channels now subscribed to
     */
    public void function onSubscribe( string channel, numeric subscribedChannels ) {
        println( "Subscribed to #arguments.channel# (total channels: #arguments.subscribedChannels#)" );
    }

    /**
     * Called when unsubscribed from a channel
     * @channel The channel unsubscribed from
     * @subscribedChannels Remaining subscribed channels
     */
    public void function onUnsubscribe( string channel, numeric subscribedChannels ) {
        println( "Unsubscribed from #arguments.channel# (remaining: #arguments.subscribedChannels#)" );
    }

    /**
     * Called when successfully subscribed to a pattern
     * @pattern The pattern subscribed to
     * @subscribedChannels Total number of patterns now subscribed to
     */
    public void function onPSubscribe( string pattern, numeric subscribedChannels ) {
        println( "Subscribed to pattern #arguments.pattern# (total patterns: #arguments.subscribedChannels#)" );
    }

    /**
     * Called when unsubscribed from a pattern
     * @pattern The pattern unsubscribed from
     * @subscribedChannels Remaining subscribed patterns
     */
    public void function onPUnsubscribe( string pattern, numeric subscribedChannels ) {
        println( "Unsubscribed from pattern #arguments.pattern# (remaining: #arguments.subscribedChannels#)" );
    }

    // Helper methods
    private void function handleNotification( string message ) {
        // Custom notification handling logic
    }

    private void function handleAlert( string message ) {
        // Custom alert handling logic
    }

    private void function logMessage( string channel, string message ) {
        // Custom logging logic
    }

}
```

### Listener Method Reference

| Method             | Parameters                      | Description                                 | When Called                                              |
| ------------------ | ------------------------------- | ------------------------------------------- | -------------------------------------------------------- |
| `onMessage()`      | `channel`, `message`            | Handles messages from subscribed channels   | When a message arrives on a literal channel subscription |
| `onPMessage()`     | `pattern`, `channel`, `message` | Handles messages from pattern subscriptions | When a message arrives on a pattern-matched channel      |
| `onSubscribe()`    | `channel`, `subscribedChannels` | Confirms channel subscription               | After successfully subscribing to a channel              |
| `onUnsubscribe()`  | `channel`, `subscribedChannels` | Confirms channel unsubscription             | After unsubscribing from a channel                       |
| `onPSubscribe()`   | `pattern`, `subscribedChannels` | Confirms pattern subscription               | After successfully subscribing to a pattern              |
| `onPUnsubscribe()` | `pattern`, `subscribedChannels` | Confirms pattern unsubscription             | After unsubscribing from a pattern                       |

## 💡 Pub/Sub Best Practices

### Message Design

* **Keep messages small** - Pub/sub is optimized for many small messages
* **Use JSON** - Serialize complex data structures as JSON for interoperability
* **Include metadata** - Add timestamps, message IDs, or version info
* **Document message formats** - Maintain a schema for each channel

### Channel Naming

* **Use namespaces** - Prefix channels by domain: `app:notifications`, `system:alerts`
* **Be descriptive** - Channel names should indicate purpose: `user:login`, `order:created`
* **Avoid special characters** - Stick to alphanumeric and basic punctuation
* **Use hierarchy** - Structure channels logically: `analytics:users:signup`

### Subscriber Management

* **Handle errors gracefully** - Wrap message processing in try/catch
* **Avoid blocking operations** - Process messages quickly or queue for async processing
* **Implement reconnection logic** - Handle connection failures
* **Monitor subscription health** - Track message counts and processing times

### Performance Considerations

* **Pub/sub is fire-and-forget** - Messages are not persisted; offline subscribers miss messages
* **No delivery guarantees** - Unlike queues, pub/sub doesn't guarantee delivery
* **Use streams for reliability** - Consider Redis Streams for persistent messaging
* **Limit subscribers** - Too many subscribers can impact Redis performance
* **Consider message size** - Large messages can slow down the pub/sub system

## 🔧 Complete Pub/Sub Example

Here's a complete example demonstrating publishing and subscribing in a real-world scenario:

```javascript
// Application.bx - Configure Redis cache
this.caches["redis"] = {
    "provider": "Redis",
    "properties": {
        "host": "127.0.0.1",
        "port": "6379",
        "password": "",
        "keyprefix": "myapp"
    }
};
```

```javascript
// NotificationService.bx - Publisher service
class {

    function sendNotification( required string type, required struct data ) {
        var message = {
            "type": arguments.type,
            "data": arguments.data,
            "timestamp": now().getTime(),
            "messageId": createUUID()
        };

        var channel = "app:notifications:#arguments.type#";
        var count = redisPublish( channel, serializeJSON( message ), "redis" );

        println( "Notification sent to #count# subscriber(s)" );
        return message;
    }

}
```

```javascript
// NotificationSubscriber.bx - Subscriber listener
class {

    property name="emailService" inject="EmailService";
    property name="logService" inject="LogService";

    public void function onMessage( string channel, string message ) {
        try {
            var data = deserializeJSON( arguments.message );

            println( "Processing notification: #data.type#" );

            // Route based on notification type
            switch( data.type ) {
                case "user:signup":
                    emailService.sendWelcomeEmail( data.data.email );
                    break;
                case "order:completed":
                    emailService.sendOrderConfirmation( data.data.orderId );
                    break;
                case "system:alert":
                    logService.logAlert( data.data );
                    break;
            }

        } catch( any e ) {
            println( "Error processing message: #e.message#" );
            logService.logError( e );
        }
    }

    public void function onSubscribe( string channel, numeric subscribedChannels ) {
        println( "✓ Subscribed to #arguments.channel#" );
    }

    public void function onUnsubscribe( string channel, numeric subscribedChannels ) {
        println( "✗ Unsubscribed from #arguments.channel#" );
    }

}
```

```javascript
// Start subscriber in Application.bx onApplicationStart()
function onApplicationStart() {
    // Create subscriber instance
    var subscriber = new NotificationSubscriber();

    // Subscribe to notification channels
    application.notificationSubscription = redisSubscribe(
        subscriber,
        [
            "app:notifications:user:signup",
            "app:notifications:order:completed",
            "app:notifications:system:alert"
        ],
        "redis"
    );

    println( "✓ Notification subscriber started" );
}

// Clean up on application stop
function onApplicationEnd() {
    if( structKeyExists( application, "notificationSubscription" ) ) {
        application.notificationSubscription.subscriber.unsubscribe();
        application.notificationSubscription.future.cancel();
        println( "✓ Notification subscriber stopped" );
    }
}
```

```javascript
// Usage in your application
var notificationService = new NotificationService();

// Send notifications
notificationService.sendNotification( "user:signup", {
    "userId": 12345,
    "email": "user@example.com",
    "name": "John Doe"
});

notificationService.sendNotification( "order:completed", {
    "orderId": "ORD-2025-001",
    "total": 99.99,
    "customerId": 12345
});
```

## 🎯 Use Cases

### Real-Time Notifications

Broadcast system notifications, user alerts, or status updates across multiple application servers or clients.

### Cache Invalidation

Signal cache updates across a distributed application when data changes:

```javascript
// Publisher - invalidate cache across all servers
function updateUser( required numeric userId, required struct data ) {
    // Update database
    userDAO.update( userId, data );

    // Invalidate local cache
    cacheRemove( "user:#userId#" );

    // Tell other servers to invalidate
    redisPublish( "cache:invalidate:user", userId );
}

// Subscriber - listen for cache invalidation
var subscription = redisSubscribe(
    ( channel, message ) => {
        cacheRemove( "user:#message#" );
    },
    "cache:invalidate:user"
);
```

### Event-Driven Architecture

Implement loosely coupled microservices that react to events:

```javascript
// Order service publishes events
redisPublish( "events:order:created", {
    "orderId": newOrder.id,
    "customerId": newOrder.customerId,
    "total": newOrder.total
});

// Multiple services subscribe
// Email service sends confirmation
// Inventory service updates stock
// Analytics service tracks metrics
```

### Real-Time Monitoring

Push metrics and monitoring data to dashboards:

```javascript
// Publish metrics
redisPublish( "metrics:server:cpu", {
    "server": "web-01",
    "cpu": 45.2,
    "timestamp": now()
});

// Dashboard subscribes and updates in real-time
```

### Chat Applications

Build real-time chat or messaging features:

```javascript
// User sends message
redisPublish( "chat:room:#roomId#", {
    "userId": session.userId,
    "username": session.username,
    "message": form.message,
    "timestamp": now()
});

// All room participants receive message
var chatSubscription = redisSubscribe(
    ( channel, message ) => {
        var data = deserializeJSON( message );
        displayChatMessage( data );
    },
    "chat:room:#roomId#"
);
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://boxlang.ortusbooks.com/boxlang-+-++/modules/bx-redis/publish-subscribe.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
