- Building a Robust Service Scheduler in Node.js – Part 1: Introduction and Architecture
- Building a Robust Service Scheduler in Node.js – Part 2: Core Service Infrastructure
Welcome back to our service scheduler series! In Part 1, we outlined the architecture and requirements for our robust background service system. Today, we’ll roll up our sleeves and start building the core infrastructure, beginning with the BaseService class and ServiceRegistry.
Setting Up the Project
Let’s start by initializing our Node.js project with TypeScript support:
npm init -y
npm install --save-dev typescript @types/node ts-node nodemon
npm install winston uuid eventemitter3
npm install --save-dev @types/uuid jest @types/jest ts-jest
Create a tsconfig.json
file:
{
"compilerOptions": {
"target": "ES2022",
"module": "commonjs",
"lib": ["ES2022"],
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"declaration": true,
"declarationMap": true,
"sourceMap": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "tests"]
}
Creating the Logger Utility
Before we dive into services, let’s set up a robust logging system:
// src/utils/logger.ts
import winston from 'winston';
const logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.splat(),
winston.format.json()
),
defaultMeta: { service: 'service-scheduler' },
transports: [
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
)
})
]
});
// Add file transport in production
if (process.env.NODE_ENV === 'production') {
logger.add(new winston.transports.File({
filename: 'error.log',
level: 'error'
}));
logger.add(new winston.transports.File({
filename: 'combined.log'
}));
}
export default logger;
Building the BaseService Class
The BaseService class is the foundation of our system. Every service will extend this class to inherit common functionality:
// src/core/BaseService.ts
import { EventEmitter } from 'events';
import { v4 as uuidv4 } from 'uuid';
import logger from '../utils/logger';
export enum ServiceStatus {
IDLE = 'idle',
RUNNING = 'running',
STOPPED = 'stopped',
ERROR = 'error',
PAUSED = 'paused'
}
export interface ServiceConfig {
name: string;
interval: number; // in milliseconds
enabled?: boolean;
retryOnError?: boolean;
maxRetries?: number;
metadata?: Record<string, any>;
}
export interface ServiceStats {
totalRuns: number;
successfulRuns: number;
failedRuns: number;
lastRunTime?: Date;
lastError?: Error;
averageExecutionTime: number;
}
export abstract class BaseService extends EventEmitter {
protected id: string;
protected config: ServiceConfig;
protected status: ServiceStatus;
protected stats: ServiceStats;
protected currentExecution: Promise<void> | null = null;
protected abortController: AbortController | null = null;
private executionTimes: number[] = [];
constructor(config: ServiceConfig) {
super();
this.id = uuidv4();
this.config = {
enabled: true,
retryOnError: true,
maxRetries: 3,
...config
};
this.status = ServiceStatus.IDLE;
this.stats = {
totalRuns: 0,
successfulRuns: 0,
failedRuns: 0,
averageExecutionTime: 0
};
logger.info(`Service ${this.config.name} (${this.id}) initialized`);
}
// Abstract method that child services must implement
protected abstract execute(signal: AbortSignal): Promise<void>;
// Optional lifecycle hooks
protected async onStart(): Promise<void> {}
protected async onStop(): Promise<void> {}
protected async onError(error: Error): Promise<void> {}
public async run(): Promise<void> {
if (this.status === ServiceStatus.RUNNING) {
logger.warn(`Service ${this.config.name} is already running`);
return;
}
this.status = ServiceStatus.RUNNING;
this.emit('statusChanged', this.status);
const startTime = Date.now();
this.stats.totalRuns++;
try {
// Create a new AbortController for this execution
this.abortController = new AbortController();
logger.debug(`Executing service ${this.config.name}`);
this.currentExecution = this.execute(this.abortController.signal);
await this.currentExecution;
// Update statistics
const executionTime = Date.now() - startTime;
this.updateExecutionTime(executionTime);
this.stats.successfulRuns++;
this.stats.lastRunTime = new Date();
logger.info(`Service ${this.config.name} executed successfully in ${executionTime}ms`);
this.emit('executionComplete', {
success: true,
executionTime
});
} catch (error) {
if (error.name === 'AbortError') {
logger.info(`Service ${this.config.name} was cancelled`);
this.emit('executionCancelled');
} else {
this.stats.failedRuns++;
this.stats.lastError = error as Error;
logger.error(`Service ${this.config.name} failed:`, error);
this.emit('executionComplete', {
success: false,
error
});
await this.onError(error as Error);
if (this.config.retryOnError) {
await this.handleRetry();
}
}
} finally {
this.currentExecution = null;
this.abortController = null;
if (this.status === ServiceStatus.RUNNING) {
this.status = ServiceStatus.IDLE;
this.emit('statusChanged', this.status);
}
}
}
private updateExecutionTime(time: number): void {
this.executionTimes.push(time);
// Keep only last 100 execution times
if (this.executionTimes.length > 100) {
this.executionTimes.shift();
}
// Calculate average
const sum = this.executionTimes.reduce((a, b) => a + b, 0);
this.stats.averageExecutionTime = Math.round(sum / this.executionTimes.length);
}
private async handleRetry(): Promise<void> {
let retries = 0;
const maxRetries = this.config.maxRetries || 3;
while (retries < maxRetries) {
retries++;
logger.info(`Retrying service ${this.config.name} (attempt ${retries}/${maxRetries})`);
// Exponential backoff
await this.delay(Math.pow(2, retries) * 1000);
try {
await this.run();
break; // Success, exit retry loop
} catch (error) {
if (retries === maxRetries) {
logger.error(`Service ${this.config.name} failed after ${maxRetries} retries`);
this.status = ServiceStatus.ERROR;
this.emit('statusChanged', this.status);
}
}
}
}
public async cancel(): Promise<void> {
if (this.abortController) {
logger.info(`Cancelling service ${this.config.name}`);
this.abortController.abort();
// Wait for current execution to complete
if (this.currentExecution) {
try {
await this.currentExecution;
} catch (error) {
// Ignore abort errors
if (error.name !== 'AbortError') {
throw error;
}
}
}
}
}
// Getters
public getId(): string { return this.id; }
public getName(): string { return this.config.name; }
public getConfig(): ServiceConfig { return { ...this.config }; }
public getStatus(): ServiceStatus { return this.status; }
public getStats(): ServiceStats { return { ...this.stats }; }
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Implementing the Service Registry
Now let’s create the ServiceRegistry to manage all our services:
// src/core/ServiceRegistry.ts
import { BaseService } from './BaseService';
import logger from '../utils/logger';
export class ServiceRegistry {
private services: Map<string, BaseService>;
private servicesByName: Map<string, Set<string>>;
constructor() {
this.services = new Map();
this.servicesByName = new Map();
}
public register(service: BaseService): string {
const id = service.getId();
const name = service.getName();
if (this.services.has(id)) {
throw new Error(`Service with ID ${id} already registered`);
}
this.services.set(id, service);
// Track services by name for grouped operations
if (!this.servicesByName.has(name)) {
this.servicesByName.set(name, new Set());
}
this.servicesByName.get(name)!.add(id);
logger.info(`Service registered: ${name} (${id})`);
return id;
}
public deregister(id: string): boolean {
const service = this.services.get(id);
if (!service) {
logger.warn(`Attempted to deregister non-existent service: ${id}`);
return false;
}
const name = service.getName();
this.services.delete(id);
// Remove from name mapping
const nameSet = this.servicesByName.get(name);
if (nameSet) {
nameSet.delete(id);
if (nameSet.size === 0) {
this.servicesByName.delete(name);
}
}
logger.info(`Service deregistered: ${name} (${id})`);
return true;
}
public get(id: string): BaseService | undefined {
return this.services.get(id);
}
public getByName(name: string): BaseService[] {
const ids = this.servicesByName.get(name);
if (!ids) return [];
return Array.from(ids)
.map(id => this.services.get(id))
.filter(service => service !== undefined) as BaseService[];
}
public getAll(): BaseService[] {
return Array.from(this.services.values());
}
public has(id: string): boolean {
return this.services.has(id);
}
public clear(): void {
this.services.clear();
this.servicesByName.clear();
logger.info('Service registry cleared');
}
public getStats(): Record<string, any> {
const stats = {
totalServices: this.services.size,
servicesByStatus: {} as Record<string, number>,
servicesByName: {} as Record<string, number>
};
// Count services by status
for (const service of this.services.values()) {
const status = service.getStatus();
stats.servicesByStatus[status] = (stats.servicesByStatus[status] || 0) + 1;
}
// Count services by name
for (const [name, ids] of this.servicesByName.entries()) {
stats.servicesByName[name] = ids.size;
}
return stats;
}
}
Creating Example Services
Let’s implement our three example services to demonstrate the system. First, the MessageChecker that runs every 10 seconds:
// src/services/MessageChecker.ts
import { BaseService, ServiceConfig } from '../core/BaseService';
import logger from '../utils/logger';
export class MessageChecker extends BaseService {
private messageQueue: string[] = [];
constructor(config: Omit<ServiceConfig, 'name'>) {
super({
...config,
name: 'MessageChecker'
});
}
protected async execute(signal: AbortSignal): Promise<void> {
// Simulate checking for messages
await this.checkMessages(signal);
if (this.messageQueue.length > 0) {
logger.info(`Found ${this.messageQueue.length} new messages`);
await this.processMessages(signal);
}
}
private async checkMessages(signal: AbortSignal): Promise<void> {
// Simulate API call
await this.simulateAsync(100, signal);
// Randomly add messages
if (Math.random() > 0.7) {
this.messageQueue.push(`Message-${Date.now()}`);
}
}
private async processMessages(signal: AbortSignal): Promise<void> {
while (this.messageQueue.length > 0 && !signal.aborted) {
const message = this.messageQueue.shift();
logger.debug(`Processing message: ${message}`);
await this.simulateAsync(50, signal);
}
}
private simulateAsync(ms: number, signal: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(resolve, ms);
signal.addEventListener('abort', () => {
clearTimeout(timeout);
reject(new Error('Operation aborted'));
});
});
}
}
Key Takeaways
In this post, we’ve built the foundation of our service scheduler system:
- BaseService Class: Provides a robust foundation with built-in error handling, retry logic, and cancellation support
- ServiceRegistry: Manages service registration and provides lookup capabilities
- AbortSignal Support: Enables graceful cancellation of long-running operations
- Event-Driven Architecture: Services emit events for monitoring and integration
- Statistics Tracking: Built-in performance monitoring and error tracking
Next Steps
In Part 3, we’ll implement the ServiceManager that orchestrates all services, handles scheduling, and provides a unified control interface. We’ll also add support for different scheduling strategies and explore how to manage service dependencies.
The code from this tutorial is available on GitHub, and you can start experimenting with creating your own services by extending the BaseService class. Happy coding!