<KG/>
Published on

How to monitor replication queues in AEM

Authors

Few years back I worked on a use case where it was crucial to monitor the replication queues in AEM. If the replication queue becomes blocked, it can be challenging to pinpoint the reason, which can lead to content delivery issues. To address this, I've designed and developed a custom script that monitors the replication queue and notifies admin users whenever potential issues arise. In this post, I’ll walk you through how to implement this solution, including setting up a scheduler to automate the monitoring process.

Prerequisites

  • Basic understanding of AEM and Java.
  • Access to an AEM instance.
  • A development environment set up with AEM dependencies.

Step 1: Create the OSGi Configuration Class with Scheduler

Let's start by creating an OSGi configuration class that holds the configurable parameters for monitoring the replication queue that also includes a scheduler to run the monitoring service at specified intervals.

package com.tutorials.khalilganiga.aem.core.config;

import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;

@ObjectClassDefinition(name = "Replication Queue Monitor Configuration")
@Designate(ocd = ReplicationQueueMonitorConfig.class)
public @interface ReplicationQueueMonitorConfig {

    @AttributeDefinition(
        name = "Maximum Queue Size",
        description = "The maximum number of items allowed in the replication queue before triggering an alert."
    )
    int maxQueueSize() default 50;

    @AttributeDefinition(
        name = "Maximum Wait Time",
        description = "The maximum amount of time (in minutes) an item is allowed to stay in the replication queue before triggering an alert."
    )
    int maxWaitTime() default 30;

    @AttributeDefinition(
        name = "Notification Email",
        description = "Email address where alerts should be sent when thresholds are exceeded."
    )
    String notificationEmail() default "admin@example.com";

    @AttributeDefinition(
        name = "Scheduler Expression",
        description = "Cron expression to schedule the replication queue monitoring service."
    )
    String schedulerExpression() default "0 0/15 * * * ?";
}

Step 2: Implement the Replication Queue Monitor Service

With our configuration class ready, we can now implement the service that will monitor the replication queues. This service will run according to the schedule defined in the OSGi configuration.

package com.tutorials.khalilganiga.aem.core.services;

import com.day.cq.replication.Agent;
import com.day.cq.replication.AgentManager;
import com.day.cq.replication.ReplicationAction;
import com.day.cq.replication.ReplicationQueue;
import com.example.aem.core.config.ReplicationQueueMonitorConfig;
import org.apache.sling.api.resource.ResourceResolver;
import org.json.JSONArray;
import org.json.JSONObject;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.propertytypes.ServiceDescription;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.TimeUnit;

@Component(service = Runnable.class, immediate = true)
@Designate(ocd = ReplicationQueueMonitorConfig.class)
@ServiceDescription("Scheduled service to monitor replication queues")
public class ReplicationQueueMonitorService implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(ReplicationQueueMonitorService.class);

    @Reference
    private AgentManager agentManager;

    private int maxQueueSize;
    private int maxWaitTime;
    private String notificationEmail;

    @Activate
    @Modified
    protected void activate(ReplicationQueueMonitorConfig config) {
        this.maxQueueSize = config.maxQueueSize();
        this.maxWaitTime = config.maxWaitTime();
        this.notificationEmail = config.notificationEmail();
    }

    @Override
    public void run() {
        monitorReplicationQueue();
    }

    private void monitorReplicationQueue() {
        logger.info("Running Replication Queue Monitor");
        try (ResourceResolver resourceResolver = getResourceResolver()) {
            if (null != agentManager) {
                logger.info("Retrieving Replication Agents.");
                JSONObject agentsObj = new JSONObject();
                for (Map.Entry<String, Agent> entry : agentManager.getAgents().entrySet()) {
                    JSONArray agentsDataArray = new JSONArray();
                    logger.info("Agents found: {}", agentManager.getAgents().entrySet());
                    Agent agent = entry.getValue();
                    String agentId = entry.getKey();
                    if (agent.isValid() && agent.isEnabled()) {
                        logger.info("Agent ID: {}", agentId);
                        ReplicationQueue replicationQueue = agent.getQueue();
                        if (!replicationQueue.entries().isEmpty()) {
                            logger.info("Replication Queue is not empty for Agent ID: {}", agentId);
                            for (ReplicationQueue.Entry rqEntry : replicationQueue.entries()) {
                                JSONObject actionItems = new JSONObject();
                                ReplicationAction action = rqEntry.getAction();
                                long currentTime = System.currentTimeMillis();
                                long queueEntryTime = action.getTime();
                                long waitTime = currentTime - queueEntryTime;
                                if ((replicationQueue.entries().size() > maxQueueSize)
                                        || (waitTime > TimeUnit.MINUTES.toMillis(maxWaitTime))) {
                                    actionItems.put("entryTime", TimeUnit.MILLISECONDS.toMinutes(waitTime));
                                    actionItems.put("path", action.getPath());
                                    agentsDataArray.put(actionItems);
                                    agentsObj.put(agentId, agentsDataArray);
                                }
                            }
                        }
                    }
                }
                if (agentsObj.length() > 0) {
                    sendNotification(agentsObj, resourceResolver);
                } else {
                    logger.info("Replication Queue is Empty");
                }
            }
        } catch (Exception e) {
            logger.error("Error occurred while monitoring replication queue: ", e);
        }
    }

    private void sendNotification(JSONObject agentsObj, ResourceResolver resourceResolver) {
        // Implementation to send email notification
        logger.info("Sending notification to: {}", notificationEmail);
        // Example of email sending logic here...
    }

    private ResourceResolver getResourceResolver() {
        // Logic to obtain a ResourceResolver
        return null;
    }
}

Step 3: Deploy and Test the Service

  1. Build the project: Use Maven to build the project and deploy the bundle to your AEM instance.
mvn clean install -PautoInstallPackage
  1. Test the service: The service will automatically run based on the scheduler configuration. It monitors the replication queues and sends notifications if the queue size or wait time exceeds the configured thresholds.

Conclusion

In this post, we explored how to create a custom service in AEM to monitor replication queues, including setting up a scheduler for automatic monitoring. This solution helps ensure that content replication runs smoothly and that any issues are promptly flagged to admin users. You can further customize this service to meet specific needs, such as integrating with external monitoring tools or enhancing notification capabilities.

Share

Khalil

Khalil Ganiga

Just another programmer.. This blog expresses my views of various technologies and scenarios I have come across in realtime.

Keep watching this space for more updates.