Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@

import java.time.LocalDateTime;

import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;

import io.mosip.kernel.auditmanager.request.AuditRequestDto;

/**
* The Audit request builder class is used to create new {@link AuditRequestDto}
* with all required fields
*
*
* @author Dharmesh Khandelwal
* @since 1.0.0
*
*/
@Service
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public class AuditRequestBuilder {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import jakarta.persistence.MappedSuperclass;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.springframework.data.domain.Persistable;

/**
* Base class for {@link Audit} with {@link #uuid} and {@link #createdAt}
Expand All @@ -19,7 +20,7 @@
@Data
@AllArgsConstructor
@MappedSuperclass
public class BaseAudit {
public class BaseAudit implements Persistable<String> {

/**
* Field for immutable universally unique identifier (UUID)
Expand All @@ -39,4 +40,14 @@ public BaseAudit() {
createdAt = LocalDateTime.now();
}

@Override
public String getId() {
return uuid;
}

@Override
public boolean isNew() {
return true;
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package io.mosip.kernel.auditmanager.impl;

import io.mosip.kernel.core.logger.spi.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.stereotype.Service;

import io.mosip.kernel.auditmanager.entity.Audit;
import io.mosip.kernel.auditmanager.repository.AuditRepository;
import io.mosip.kernel.auditmanager.queue.AuditQueueService;
import io.mosip.kernel.auditmanager.request.AuditRequestDto;
import io.mosip.kernel.auditmanager.util.AuditUtils;
import io.mosip.kernel.core.auditmanager.spi.AuditHandler;
import org.springframework.transaction.annotation.Transactional;

/**
* Implementation of {@link AuditHandler} with function to write
Expand All @@ -23,31 +20,20 @@
@Service
public class AuditHandlerImpl implements AuditHandler<AuditRequestDto> {

/**
* Field for {@link AuditRepository} having data access operations related to
* audit
*/
@Autowired
private AuditRepository auditRepository;
private AuditQueueService auditQueueService;

/*
* (non-Javadoc)
*
*
* @see
* io.mosip.kernel.core.audit.handler.AuditHandler#writeAudit(io.mosip.kernel.
* core.audit.dto.AuditRequest)
*/
@Override
@Transactional
public boolean addAudit(AuditRequestDto auditRequest) {
try {
AuditUtils.validateAuditRequestDto(auditRequest);
Audit event = getAuditEntity(auditRequest);
auditRepository.save(event);
return true;
} catch (DataAccessException ex) {
return false;
}
AuditUtils.validateAuditRequestDto(auditRequest);
return auditQueueService.enqueue(auditRequest);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.mosip.kernel.auditmanager.queue;

import io.mosip.kernel.auditmanager.entity.Audit;
import io.mosip.kernel.auditmanager.impl.AuditHandlerImpl;
import io.mosip.kernel.auditmanager.repository.AuditRepository;
import io.mosip.kernel.auditmanager.request.AuditRequestDto;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataAccessException;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* Scheduled batch writer that drains the {@link AuditQueueService} and persists
* records in bulk using {@code saveAll()}, reducing DB round-trips under load.
*
* <p>Retry strategy: exponential back-off (1s, 2s, 4s) up to
* {@code mosip.kernel.auditmanager.batch.retry-attempts} times before giving up.
*
* <p>Graceful shutdown: {@link #flushOnShutdown()} drains remaining queue entries
* so no audit records are silently lost when the pod terminates.
*
* @since 1.3.0
*/
@Component
public class AuditBatchWriter {

private static final Logger log = LoggerFactory.getLogger(AuditBatchWriter.class);

@Autowired
private AuditQueueService queueService;

@Autowired
private AuditRepository auditRepository;

@Autowired
private AuditHandlerImpl auditHandler;

@Value("${mosip.kernel.auditmanager.batch.size:100}")
private int batchSize;

@Value("${mosip.kernel.auditmanager.batch.retry-attempts:3}")
private int retryAttempts;

/**
* Runs every {@code flush-interval-ms} milliseconds (default 100ms).
* Uses {@code fixedDelay} so the next run starts only after the current one
* finishes, preventing concurrent writes to the DB.
*/
@Scheduled(fixedDelayString = "${mosip.kernel.auditmanager.batch.flush-interval-ms:100}")
public void flushBatch() {
List<AuditRequestDto> batch = new ArrayList<>(batchSize);
int drained = queueService.drainTo(batch, batchSize);
if (drained == 0) {
return;
}
writeBatchWithRetry(batch);
}

/**
* Called by the JVM shutdown hook (via Spring context close).
* Flushes whatever remains in the queue so in-flight audits are not lost.
*/
@PreDestroy
public void flushOnShutdown() {
log.info("Shutdown signal received — flushing remaining audit queue entries...");
List<AuditRequestDto> remaining = new ArrayList<>();
queueService.drainTo(remaining, Integer.MAX_VALUE);
if (!remaining.isEmpty()) {
log.info("Writing {} remaining audit records before shutdown", remaining.size());
writeBatchWithRetry(remaining);
}
}

private void writeBatchWithRetry(List<AuditRequestDto> batch) {
for (int attempt = 0; attempt < retryAttempts; attempt++) {
try {
List<Audit> entities = batch.stream()
.map(auditHandler::getAuditEntity)
.collect(Collectors.toList());
auditRepository.saveAll(entities);
log.debug("Persisted batch of {} audit records", entities.size());
return;
} catch (DataAccessException ex) {
if (attempt < retryAttempts - 1) {
long backoffMs = (long) Math.pow(2, attempt) * 1000;
log.warn("Batch write failed (attempt {}/{}), retrying in {}ms: {}",
attempt + 1, retryAttempts, backoffMs, ex.getMessage());
try {
Thread.sleep(backoffMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("Retry interrupted — {} audit records may be lost", batch.size());
return;
}
} else {
log.error("Batch write failed after {} attempts — {} audit records lost",
retryAttempts, batch.size(), ex);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.mosip.kernel.auditmanager.queue;

import io.mosip.kernel.auditmanager.request.AuditRequestDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Thread-safe in-memory queue that decouples HTTP request threads from DB writes.
* Producers call {@link #enqueue} and return immediately; a background batch
* writer drains this queue on a fixed schedule.
*
* @since 1.3.0
*/
@Component
public class AuditQueueService {

private static final Logger log = LoggerFactory.getLogger(AuditQueueService.class);

private final LinkedBlockingQueue<AuditRequestDto> queue;
private final int capacity;

public AuditQueueService(
@Value("${mosip.kernel.auditmanager.queue.capacity:50000}") int capacity) {
this.capacity = capacity;
this.queue = new LinkedBlockingQueue<>(capacity);
}

/**
* Non-blocking enqueue. Returns {@code false} and logs a warning if the queue
* is full, allowing callers to degrade gracefully instead of blocking.
*/
public boolean enqueue(AuditRequestDto auditRequest) {
boolean accepted = queue.offer(auditRequest);
if (!accepted) {
log.warn("Audit queue is full (capacity={}). Dropping audit event: eventId={}",
capacity, auditRequest.getEventId());
}
return accepted;
}

/**
* Drains up to {@code maxElements} entries into the provided list.
* Thread-safe; safe to call from a single scheduler thread.
*/
public int drainTo(List<AuditRequestDto> target, int maxElements) {
return queue.drainTo(target, maxElements);
}

public int size() {
return queue.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* Audit manager application
Expand All @@ -12,6 +13,7 @@
*/
@SpringBootApplication(scanBasePackages = { "io.mosip.kernel.auditmanager.*","${mosip.auth.adapter.impl.basepackage}"
,"io.mosip.kernel.core.logger.config"})
@EnableScheduling
public class AuditManagerBootApplication {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class AuditManagerServiceImpl implements AuditManagerService {
@Override
public AuditResponseDto addAudit(AuditRequestDto auditRequestDto) {
AuditResponseDto auditResponseDto = new AuditResponseDto();
auditHandler.addAudit(auditRequestDto);
auditResponseDto.setStatus(true);
boolean status = auditHandler.addAudit(auditRequestDto);
auditResponseDto.setStatus(status);
return auditResponseDto;
}

Expand Down
Loading