Building a Robust Service Scheduler in Node.js – Part 2: Core Service Infrastructure

Building a Robust Service Scheduler in Node.js – Part 2: Core Service Infrastructure

This entry is part 2 of 2 in the series Building a Robust Service Scheduler in Node.js

Welcome back to our service scheduler series! In Part 1, we outlined the architecture and requirements for our robust background service system. Today, we’ll roll up our sleeves and start building the core infrastructure, beginning with the BaseService class and ServiceRegistry.

Setting Up the Project

Let’s start by initializing our Node.js project with TypeScript support:

npm init -y
npm install --save-dev typescript @types/node ts-node nodemon
npm install winston uuid eventemitter3
npm install --save-dev @types/uuid jest @types/jest ts-jest

Create a tsconfig.json file:

{
  "compilerOptions": {
    "target": "ES2022",
    "module": "commonjs",
    "lib": ["ES2022"],
    "outDir": "./dist",
    "rootDir": "./src",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true,
    "resolveJsonModule": true,
    "declaration": true,
    "declarationMap": true,
    "sourceMap": true
  },
  "include": ["src/**/*"],
  "exclude": ["node_modules", "dist", "tests"]
}

Creating the Logger Utility

Before we dive into services, let’s set up a robust logging system:

// src/utils/logger.ts
import winston from 'winston';

const logger = winston.createLogger({
  level: process.env.LOG_LEVEL || 'info',
  format: winston.format.combine(
    winston.format.timestamp(),
    winston.format.errors({ stack: true }),
    winston.format.splat(),
    winston.format.json()
  ),
  defaultMeta: { service: 'service-scheduler' },
  transports: [
    new winston.transports.Console({
      format: winston.format.combine(
        winston.format.colorize(),
        winston.format.simple()
      )
    })
  ]
});

// Add file transport in production
if (process.env.NODE_ENV === 'production') {
  logger.add(new winston.transports.File({ 
    filename: 'error.log', 
    level: 'error' 
  }));
  logger.add(new winston.transports.File({ 
    filename: 'combined.log' 
  }));
}

export default logger;

Building the BaseService Class

The BaseService class is the foundation of our system. Every service will extend this class to inherit common functionality:

// src/core/BaseService.ts
import { EventEmitter } from 'events';
import { v4 as uuidv4 } from 'uuid';
import logger from '../utils/logger';

export enum ServiceStatus {
  IDLE = 'idle',
  RUNNING = 'running',
  STOPPED = 'stopped',
  ERROR = 'error',
  PAUSED = 'paused'
}

export interface ServiceConfig {
  name: string;
  interval: number; // in milliseconds
  enabled?: boolean;
  retryOnError?: boolean;
  maxRetries?: number;
  metadata?: Record<string, any>;
}

export interface ServiceStats {
  totalRuns: number;
  successfulRuns: number;
  failedRuns: number;
  lastRunTime?: Date;
  lastError?: Error;
  averageExecutionTime: number;
}

export abstract class BaseService extends EventEmitter {
  protected id: string;
  protected config: ServiceConfig;
  protected status: ServiceStatus;
  protected stats: ServiceStats;
  protected currentExecution: Promise<void> | null = null;
  protected abortController: AbortController | null = null;
  private executionTimes: number[] = [];

  constructor(config: ServiceConfig) {
    super();
    this.id = uuidv4();
    this.config = {
      enabled: true,
      retryOnError: true,
      maxRetries: 3,
      ...config
    };
    this.status = ServiceStatus.IDLE;
    this.stats = {
      totalRuns: 0,
      successfulRuns: 0,
      failedRuns: 0,
      averageExecutionTime: 0
    };
    
    logger.info(`Service ${this.config.name} (${this.id}) initialized`);
  }

  // Abstract method that child services must implement
  protected abstract execute(signal: AbortSignal): Promise<void>;

  // Optional lifecycle hooks
  protected async onStart(): Promise<void> {}
  protected async onStop(): Promise<void> {}
  protected async onError(error: Error): Promise<void> {}

  public async run(): Promise<void> {
    if (this.status === ServiceStatus.RUNNING) {
      logger.warn(`Service ${this.config.name} is already running`);
      return;
    }

    this.status = ServiceStatus.RUNNING;
    this.emit('statusChanged', this.status);
    
    const startTime = Date.now();
    this.stats.totalRuns++;
    
    try {
      // Create a new AbortController for this execution
      this.abortController = new AbortController();
      
      logger.debug(`Executing service ${this.config.name}`);
      this.currentExecution = this.execute(this.abortController.signal);
      
      await this.currentExecution;
      
      // Update statistics
      const executionTime = Date.now() - startTime;
      this.updateExecutionTime(executionTime);
      this.stats.successfulRuns++;
      this.stats.lastRunTime = new Date();
      
      logger.info(`Service ${this.config.name} executed successfully in ${executionTime}ms`);
      this.emit('executionComplete', { 
        success: true, 
        executionTime 
      });
      
    } catch (error) {
      if (error.name === 'AbortError') {
        logger.info(`Service ${this.config.name} was cancelled`);
        this.emit('executionCancelled');
      } else {
        this.stats.failedRuns++;
        this.stats.lastError = error as Error;
        
        logger.error(`Service ${this.config.name} failed:`, error);
        this.emit('executionComplete', { 
          success: false, 
          error 
        });
        
        await this.onError(error as Error);
        
        if (this.config.retryOnError) {
          await this.handleRetry();
        }
      }
    } finally {
      this.currentExecution = null;
      this.abortController = null;
      
      if (this.status === ServiceStatus.RUNNING) {
        this.status = ServiceStatus.IDLE;
        this.emit('statusChanged', this.status);
      }
    }
  }

  private updateExecutionTime(time: number): void {
    this.executionTimes.push(time);
    
    // Keep only last 100 execution times
    if (this.executionTimes.length > 100) {
      this.executionTimes.shift();
    }
    
    // Calculate average
    const sum = this.executionTimes.reduce((a, b) => a + b, 0);
    this.stats.averageExecutionTime = Math.round(sum / this.executionTimes.length);
  }

  private async handleRetry(): Promise<void> {
    let retries = 0;
    const maxRetries = this.config.maxRetries || 3;
    
    while (retries < maxRetries) {
      retries++;
      logger.info(`Retrying service ${this.config.name} (attempt ${retries}/${maxRetries})`);
      
      // Exponential backoff
      await this.delay(Math.pow(2, retries) * 1000);
      
      try {
        await this.run();
        break; // Success, exit retry loop
      } catch (error) {
        if (retries === maxRetries) {
          logger.error(`Service ${this.config.name} failed after ${maxRetries} retries`);
          this.status = ServiceStatus.ERROR;
          this.emit('statusChanged', this.status);
        }
      }
    }
  }

  public async cancel(): Promise<void> {
    if (this.abortController) {
      logger.info(`Cancelling service ${this.config.name}`);
      this.abortController.abort();
      
      // Wait for current execution to complete
      if (this.currentExecution) {
        try {
          await this.currentExecution;
        } catch (error) {
          // Ignore abort errors
          if (error.name !== 'AbortError') {
            throw error;
          }
        }
      }
    }
  }

  // Getters
  public getId(): string { return this.id; }
  public getName(): string { return this.config.name; }
  public getConfig(): ServiceConfig { return { ...this.config }; }
  public getStatus(): ServiceStatus { return this.status; }
  public getStats(): ServiceStats { return { ...this.stats }; }
  
  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

Implementing the Service Registry

Now let’s create the ServiceRegistry to manage all our services:

// src/core/ServiceRegistry.ts
import { BaseService } from './BaseService';
import logger from '../utils/logger';

export class ServiceRegistry {
  private services: Map<string, BaseService>;
  private servicesByName: Map<string, Set<string>>;

  constructor() {
    this.services = new Map();
    this.servicesByName = new Map();
  }

  public register(service: BaseService): string {
    const id = service.getId();
    const name = service.getName();

    if (this.services.has(id)) {
      throw new Error(`Service with ID ${id} already registered`);
    }

    this.services.set(id, service);

    // Track services by name for grouped operations
    if (!this.servicesByName.has(name)) {
      this.servicesByName.set(name, new Set());
    }
    this.servicesByName.get(name)!.add(id);

    logger.info(`Service registered: ${name} (${id})`);
    return id;
  }

  public deregister(id: string): boolean {
    const service = this.services.get(id);
    
    if (!service) {
      logger.warn(`Attempted to deregister non-existent service: ${id}`);
      return false;
    }

    const name = service.getName();
    this.services.delete(id);

    // Remove from name mapping
    const nameSet = this.servicesByName.get(name);
    if (nameSet) {
      nameSet.delete(id);
      if (nameSet.size === 0) {
        this.servicesByName.delete(name);
      }
    }

    logger.info(`Service deregistered: ${name} (${id})`);
    return true;
  }

  public get(id: string): BaseService | undefined {
    return this.services.get(id);
  }

  public getByName(name: string): BaseService[] {
    const ids = this.servicesByName.get(name);
    if (!ids) return [];

    return Array.from(ids)
      .map(id => this.services.get(id))
      .filter(service => service !== undefined) as BaseService[];
  }

  public getAll(): BaseService[] {
    return Array.from(this.services.values());
  }

  public has(id: string): boolean {
    return this.services.has(id);
  }

  public clear(): void {
    this.services.clear();
    this.servicesByName.clear();
    logger.info('Service registry cleared');
  }

  public getStats(): Record<string, any> {
    const stats = {
      totalServices: this.services.size,
      servicesByStatus: {} as Record<string, number>,
      servicesByName: {} as Record<string, number>
    };

    // Count services by status
    for (const service of this.services.values()) {
      const status = service.getStatus();
      stats.servicesByStatus[status] = (stats.servicesByStatus[status] || 0) + 1;
    }

    // Count services by name
    for (const [name, ids] of this.servicesByName.entries()) {
      stats.servicesByName[name] = ids.size;
    }

    return stats;
  }
}

Creating Example Services

Let’s implement our three example services to demonstrate the system. First, the MessageChecker that runs every 10 seconds:

// src/services/MessageChecker.ts
import { BaseService, ServiceConfig } from '../core/BaseService';
import logger from '../utils/logger';

export class MessageChecker extends BaseService {
  private messageQueue: string[] = [];

  constructor(config: Omit<ServiceConfig, 'name'>) {
    super({
      ...config,
      name: 'MessageChecker'
    });
  }

  protected async execute(signal: AbortSignal): Promise<void> {
    // Simulate checking for messages
    await this.checkMessages(signal);
    
    if (this.messageQueue.length > 0) {
      logger.info(`Found ${this.messageQueue.length} new messages`);
      await this.processMessages(signal);
    }
  }

  private async checkMessages(signal: AbortSignal): Promise<void> {
    // Simulate API call
    await this.simulateAsync(100, signal);
    
    // Randomly add messages
    if (Math.random() > 0.7) {
      this.messageQueue.push(`Message-${Date.now()}`);
    }
  }

  private async processMessages(signal: AbortSignal): Promise<void> {
    while (this.messageQueue.length > 0 && !signal.aborted) {
      const message = this.messageQueue.shift();
      logger.debug(`Processing message: ${message}`);
      await this.simulateAsync(50, signal);
    }
  }

  private simulateAsync(ms: number, signal: AbortSignal): Promise<void> {
    return new Promise((resolve, reject) => {
      const timeout = setTimeout(resolve, ms);
      
      signal.addEventListener('abort', () => {
        clearTimeout(timeout);
        reject(new Error('Operation aborted'));
      });
    });
  }
}

Key Takeaways

In this post, we’ve built the foundation of our service scheduler system:

  • BaseService Class: Provides a robust foundation with built-in error handling, retry logic, and cancellation support
  • ServiceRegistry: Manages service registration and provides lookup capabilities
  • AbortSignal Support: Enables graceful cancellation of long-running operations
  • Event-Driven Architecture: Services emit events for monitoring and integration
  • Statistics Tracking: Built-in performance monitoring and error tracking

Next Steps

In Part 3, we’ll implement the ServiceManager that orchestrates all services, handles scheduling, and provides a unified control interface. We’ll also add support for different scheduling strategies and explore how to manage service dependencies.

The code from this tutorial is available on GitHub, and you can start experimenting with creating your own services by extending the BaseService class. Happy coding!

Navigate<< Building a Robust Service Scheduler in Node.js – Part 1: Introduction and Architecture

Written by:

265 Posts

View All Posts
Follow Me :
How to whitelist website on AdBlocker?

How to whitelist website on AdBlocker?

  1. 1 Click on the AdBlock Plus icon on the top right corner of your browser
  2. 2 Click on "Enabled on this site" from the AdBlock Plus option
  3. 3 Refresh the page and start browsing the site