365 lines
11 KiB
TypeScript
365 lines
11 KiB
TypeScript
import { Test, TestingModule } from '@nestjs/testing';
|
|
import { UnifiedQueueService } from '../../src/core/queue/unifiedQueueService';
|
|
import { DatabaseQueueProvider } from '../../src/core/queue/databaseQueueProvider';
|
|
import { Logger } from '@nestjs/common';
|
|
import { getRepositoryToken } from '@nestjs/typeorm';
|
|
import { Repository } from 'typeorm';
|
|
import { JobEntity } from '../../src/core/queue/entities/job.entity';
|
|
import { EventEntity } from '../../src/core/queue/entities/event.entity';
|
|
import {
|
|
TASK_QUEUE_PROVIDER,
|
|
EVENT_BUS_PROVIDER,
|
|
} from '../../src/core/interfaces/queue.interface';
|
|
|
|
describe('Queue System Unit Tests', () => {
|
|
let unifiedQueueService: UnifiedQueueService;
|
|
let databaseQueueProvider: DatabaseQueueProvider;
|
|
let mockJobRepository: jest.Mocked<Repository<JobEntity>>;
|
|
let mockEventRepository: jest.Mocked<Repository<EventEntity>>;
|
|
|
|
beforeEach(async () => {
|
|
// Create mock repositories
|
|
mockJobRepository = {
|
|
create: jest.fn(),
|
|
save: jest.fn(),
|
|
find: jest.fn(),
|
|
findOne: jest.fn(),
|
|
update: jest.fn(),
|
|
delete: jest.fn(),
|
|
createQueryBuilder: jest.fn(),
|
|
} as any;
|
|
|
|
mockEventRepository = {
|
|
create: jest.fn(),
|
|
save: jest.fn(),
|
|
find: jest.fn(),
|
|
findOne: jest.fn(),
|
|
update: jest.fn(),
|
|
delete: jest.fn(),
|
|
createQueryBuilder: jest.fn(),
|
|
} as any;
|
|
|
|
const module: TestingModule = await Test.createTestingModule({
|
|
providers: [
|
|
UnifiedQueueService,
|
|
DatabaseQueueProvider,
|
|
{
|
|
provide: getRepositoryToken(JobEntity),
|
|
useValue: mockJobRepository,
|
|
},
|
|
{
|
|
provide: getRepositoryToken(EventEntity),
|
|
useValue: mockEventRepository,
|
|
},
|
|
{
|
|
provide: TASK_QUEUE_PROVIDER,
|
|
useExisting: DatabaseQueueProvider,
|
|
},
|
|
{
|
|
provide: EVENT_BUS_PROVIDER,
|
|
useExisting: DatabaseQueueProvider,
|
|
},
|
|
{
|
|
provide: Logger,
|
|
useValue: {
|
|
log: jest.fn(),
|
|
error: jest.fn(),
|
|
warn: jest.fn(),
|
|
debug: jest.fn(),
|
|
},
|
|
},
|
|
],
|
|
}).compile();
|
|
|
|
unifiedQueueService = module.get<UnifiedQueueService>(UnifiedQueueService);
|
|
databaseQueueProvider = module.get<DatabaseQueueProvider>(
|
|
DatabaseQueueProvider,
|
|
);
|
|
});
|
|
|
|
describe('UnifiedQueueService', () => {
|
|
it('should be defined', () => {
|
|
expect(unifiedQueueService).toBeDefined();
|
|
});
|
|
|
|
it('should have all required methods', () => {
|
|
expect(typeof unifiedQueueService.addTask).toBe('function');
|
|
expect(typeof unifiedQueueService.processTask).toBe('function');
|
|
expect(typeof unifiedQueueService.publishEvent).toBe('function');
|
|
expect(typeof unifiedQueueService.publishEvents).toBe('function');
|
|
expect(typeof unifiedQueueService.getQueueStatus).toBe('function');
|
|
expect(typeof unifiedQueueService.pauseTaskQueue).toBe('function');
|
|
expect(typeof unifiedQueueService.resumeTaskQueue).toBe('function');
|
|
expect(typeof unifiedQueueService.cleanTaskQueue).toBe('function');
|
|
expect(typeof unifiedQueueService.close).toBe('function');
|
|
});
|
|
|
|
it('should validate task options', () => {
|
|
const validOptions = {
|
|
data: { test: 'data' },
|
|
priority: 1,
|
|
delay: 0,
|
|
attempts: 3,
|
|
};
|
|
|
|
expect(() => {
|
|
// This should not throw
|
|
const options = validOptions;
|
|
expect(options.data).toBeDefined();
|
|
expect(typeof options.priority).toBe('number');
|
|
expect(typeof options.delay).toBe('number');
|
|
expect(typeof options.attempts).toBe('number');
|
|
}).not.toThrow();
|
|
});
|
|
|
|
it('should validate event structure', () => {
|
|
const validEvent = {
|
|
eventType: 'test.event',
|
|
aggregateId: 'test-123',
|
|
aggregateType: 'Test',
|
|
version: '1.0',
|
|
occurredAt: new Date().toISOString(),
|
|
tenantId: 'tenant-1',
|
|
idempotencyKey: 'key-123',
|
|
traceId: 'trace-123',
|
|
data: { test: 'data' },
|
|
};
|
|
|
|
expect(validEvent.eventType).toBeDefined();
|
|
expect(validEvent.aggregateId).toBeDefined();
|
|
expect(validEvent.aggregateType).toBeDefined();
|
|
expect(validEvent.version).toBeDefined();
|
|
expect(validEvent.occurredAt).toBeDefined();
|
|
expect(validEvent.tenantId).toBeDefined();
|
|
expect(validEvent.idempotencyKey).toBeDefined();
|
|
expect(validEvent.traceId).toBeDefined();
|
|
expect(validEvent.data).toBeDefined();
|
|
});
|
|
});
|
|
|
|
describe('DatabaseQueueProvider', () => {
|
|
it('should be defined', () => {
|
|
expect(databaseQueueProvider).toBeDefined();
|
|
});
|
|
|
|
it('should have all required methods', () => {
|
|
expect(typeof databaseQueueProvider.add).toBe('function');
|
|
expect(typeof databaseQueueProvider.process).toBe('function');
|
|
expect(typeof databaseQueueProvider.getStatus).toBe('function');
|
|
expect(typeof databaseQueueProvider.pause).toBe('function');
|
|
expect(typeof databaseQueueProvider.resume).toBe('function');
|
|
expect(typeof databaseQueueProvider.clean).toBe('function');
|
|
expect(typeof databaseQueueProvider.publish).toBe('function');
|
|
expect(typeof databaseQueueProvider.subscribe).toBe('function');
|
|
expect(typeof databaseQueueProvider.close).toBe('function');
|
|
});
|
|
|
|
it('should create job entity correctly', async () => {
|
|
const mockJob = {
|
|
id: 1,
|
|
queue_name: 'test-queue',
|
|
job_type: 'test-job',
|
|
payload: { test: 'data' },
|
|
status: 'pending',
|
|
priority: 1,
|
|
attempts: 0,
|
|
max_attempts: 3,
|
|
created_at: Date.now(),
|
|
updated_at: Date.now(),
|
|
scheduled_at: Date.now(),
|
|
processed_at: null,
|
|
failed_at: null,
|
|
error_message: null,
|
|
};
|
|
|
|
mockJobRepository.create.mockReturnValue(mockJob as any);
|
|
mockJobRepository.save.mockResolvedValue(mockJob as any);
|
|
|
|
const result = await databaseQueueProvider.add(
|
|
'test-queue',
|
|
'test-job',
|
|
{ test: 'data' },
|
|
{
|
|
priority: 1,
|
|
delay: 0,
|
|
attempts: 3,
|
|
},
|
|
);
|
|
|
|
expect(mockJobRepository.create).toHaveBeenCalled();
|
|
expect(mockJobRepository.save).toHaveBeenCalled();
|
|
expect(result).toBeDefined();
|
|
});
|
|
|
|
it('should create event entity correctly', async () => {
|
|
const mockEvent = {
|
|
id: 1,
|
|
event_type: 'test.event',
|
|
aggregate_id: 'test-123',
|
|
aggregate_type: 'Test',
|
|
version: '1.0',
|
|
occurred_at: new Date().toISOString(),
|
|
tenant_id: 'tenant-1',
|
|
idempotency_key: 'key-123',
|
|
trace_id: 'trace-123',
|
|
data: { test: 'data' },
|
|
created_at: Date.now(),
|
|
};
|
|
|
|
mockEventRepository.create.mockReturnValue(mockEvent as any);
|
|
mockEventRepository.save.mockResolvedValue(mockEvent as any);
|
|
|
|
const event = {
|
|
eventType: 'test.event',
|
|
aggregateId: 'test-123',
|
|
aggregateType: 'Test',
|
|
version: '1.0',
|
|
occurredAt: new Date().toISOString(),
|
|
tenantId: 'tenant-1',
|
|
idempotencyKey: 'key-123',
|
|
traceId: 'trace-123',
|
|
data: { test: 'data' },
|
|
};
|
|
|
|
await databaseQueueProvider.publish(event);
|
|
|
|
expect(mockEventRepository.create).toHaveBeenCalled();
|
|
expect(mockEventRepository.save).toHaveBeenCalled();
|
|
});
|
|
});
|
|
|
|
describe('Service Integration', () => {
|
|
it('should have all required services available', () => {
|
|
expect(unifiedQueueService).toBeDefined();
|
|
expect(databaseQueueProvider).toBeDefined();
|
|
});
|
|
});
|
|
|
|
describe('Error Handling', () => {
|
|
it('should handle database connection errors gracefully', async () => {
|
|
mockJobRepository.save.mockRejectedValue(
|
|
new Error('Database connection failed'),
|
|
);
|
|
|
|
try {
|
|
await databaseQueueProvider.add(
|
|
'test-queue',
|
|
'test-job',
|
|
{ test: 'data' },
|
|
{
|
|
priority: 1,
|
|
delay: 0,
|
|
attempts: 3,
|
|
},
|
|
);
|
|
} catch (error) {
|
|
expect(error).toBeInstanceOf(Error);
|
|
expect(error.message).toBe('Database connection failed');
|
|
}
|
|
});
|
|
|
|
it('should handle invalid event data', () => {
|
|
const invalidEvent = {
|
|
// Missing required fields
|
|
eventType: 'test.event',
|
|
// aggregateId: missing
|
|
// aggregateType: missing
|
|
version: '1.0',
|
|
occurredAt: new Date().toISOString(),
|
|
data: { test: 'data' },
|
|
};
|
|
|
|
// Test validation logic
|
|
expect(invalidEvent.eventType).toBeDefined();
|
|
expect(invalidEvent.version).toBeDefined();
|
|
expect(invalidEvent.occurredAt).toBeDefined();
|
|
expect(invalidEvent.data).toBeDefined();
|
|
|
|
// These should be undefined, indicating invalid event
|
|
expect((invalidEvent as any).aggregateId).toBeUndefined();
|
|
expect((invalidEvent as any).aggregateType).toBeUndefined();
|
|
});
|
|
|
|
it('should handle invalid task options', () => {
|
|
const invalidOptions = {
|
|
data: { test: 'data' },
|
|
priority: 'high', // Should be number
|
|
delay: -1, // Should be non-negative
|
|
attempts: 0, // Should be positive
|
|
};
|
|
|
|
// Test validation logic
|
|
expect(typeof invalidOptions.priority).toBe('string'); // Invalid
|
|
expect(invalidOptions.delay).toBeLessThan(0); // Invalid
|
|
expect(invalidOptions.attempts).toBe(0); // Invalid
|
|
});
|
|
});
|
|
|
|
describe('Performance and Scalability', () => {
|
|
it('should handle multiple concurrent operations', async () => {
|
|
const operations = [];
|
|
|
|
// Simulate multiple concurrent task additions
|
|
for (let i = 0; i < 10; i++) {
|
|
mockJobRepository.save.mockResolvedValueOnce({
|
|
id: i,
|
|
queue_name: 'concurrent-queue',
|
|
job_type: 'concurrent-job',
|
|
payload: { index: i },
|
|
status: 'pending',
|
|
} as any);
|
|
|
|
operations.push(
|
|
databaseQueueProvider.add(
|
|
'concurrent-queue',
|
|
'concurrent-job',
|
|
{ index: i },
|
|
{
|
|
priority: 1,
|
|
delay: 0,
|
|
attempts: 3,
|
|
},
|
|
),
|
|
);
|
|
}
|
|
|
|
const results = await Promise.all(operations);
|
|
expect(results).toHaveLength(10);
|
|
expect(mockJobRepository.save).toHaveBeenCalledTimes(10);
|
|
});
|
|
|
|
it('should handle batch event publishing', async () => {
|
|
const events = [];
|
|
|
|
for (let i = 0; i < 5; i++) {
|
|
events.push({
|
|
eventType: 'batch.event',
|
|
aggregateId: `batch-${i}`,
|
|
aggregateType: 'Batch',
|
|
version: '1.0',
|
|
occurredAt: new Date().toISOString(),
|
|
tenantId: 'tenant-1',
|
|
idempotencyKey: `batch-key-${i}`,
|
|
traceId: `batch-trace-${i}`,
|
|
data: { index: i },
|
|
});
|
|
|
|
mockEventRepository.save.mockResolvedValueOnce({
|
|
id: i,
|
|
event_type: 'batch.event',
|
|
aggregate_id: `batch-${i}`,
|
|
data: { index: i },
|
|
} as any);
|
|
}
|
|
|
|
// Test batch publishing
|
|
const publishPromises = events.map((event) =>
|
|
databaseQueueProvider.publish(event),
|
|
);
|
|
await Promise.all(publishPromises);
|
|
|
|
expect(mockEventRepository.save).toHaveBeenCalledTimes(5);
|
|
});
|
|
});
|
|
});
|