How to Back Fill Data in An Existing Database

Problem Statement

You have one data set which is kept in any SQL/NoSql Table and few fields of that data of are either Null or outdated and now you need to populate that data using a back filling job in bulk where the table have millions of missing records.

Condition Given

Need to update filed(s) in existing table (in System A) which have millions of records
Table have key which is uniquely identifying the data in table.
There is one system exist which is source of truth(System B) for the data we want to back fill.
The system which is holding source of truth can provide interfaces to transfer that data to calling party.


Approach -1 – Using Kafka

System B can take all the id of records in one text file and can generate the events in one kafka topic.
System A can consume the events and can update the outdated records in the table.


Approach -2 – Pull mechanism (REST Api Based)

System B already has one REST api which can provide data in response for the requested records
System A can create one batch job which will call multiple Rest API in Parallel and update the records in their own data base.



Detailed Discussion on Approach 2 Using spring Boot and Java 17


Below is the sample spring Boot Code with sample Employee class which will call Employee Controller Rest Url and filling the missing data in System A


Details of System A

1) Employee Class which have records

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Employee {
private int id;
private String name;
private String department;
private double salary;
private String email;
}

# Task Job table which will keep track of last processed Index of table.

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TaskJobCompletion {
private String jobId;
private long lastSuccessIndex;
private long timestamp;
}

Employee DTO in System A

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class EmployeeDto {
private int id;
private String name;
private String department;
private double salary;
private String email;
private String address;
}

# Employee Service Class in System A – Client side

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class EmployeeService {

@Autowired
private RestTemplate restTemplate;

public EmployeeDto fetchEmployeeData(int employeeId) {
return restTemplate.getForObject(“http://localhost:8080/employee?employeeId=” + employeeId, EmployeeDto.class);
}
}

# Repository class in System A which keep track of last Processed index of data

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;

@Repository
public interface TaskJobCompletionRepository extends JpaRepository<TaskJobCompletion, String> {

@Query(“SELECT t.lastSuccessIndex FROM TaskJobCompletion t WHERE t.jobId = ’employeeJob'”)
long findLastProcessedIndex();

@Modifying
@Transactional
@Query(“UPDATE TaskJobCompletion t SET t.lastSuccessIndex = :newLastProcessedIndex WHERE t.jobId = ’employeeJob'”)
void updateLastProcessedIndex(long newLastProcessedIndex);
}

# Employee Repository Class

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import java.util.List;

@Repository
public interface EmployeeRepository extends JpaRepository<Employee, Integer> {

@Query(“SELECT e FROM Employee e WHERE (e.address IS NULL OR e.address = ”) AND e.id > :lastProcessedIndex ORDER BY e.id ASC”)
List<Employee> findEmployeesWithNullOrBlankAddress(long lastProcessedIndex, int limit);

@Modifying
@Query(“UPDATE Employee e SET e.name = :#{#employeeDto.name}, e.department = :#{#employeeDto.department}, e.salary = :#{#employeeDto.salary}, e.email = :#{#employeeDto.email}, e.address = :#{#employeeDto.address} WHERE e.id = :#{#employeeDto.id}”)
void updateEmployee(EmployeeDto employeeDto);
}

# ShedLock Config Class

import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider;

@Configuration
@EnableSchedulerLock(defaultLockAtMostFor = “PT30M”)
public class ShedLockConfig {

@Bean
public JdbcTemplateLockProvider lockProvider(JdbcTemplate jdbcTemplate) {
return new JdbcTemplateLockProvider(
JdbcTemplateLockProvider.Configuration.builder()
.withJdbcTemplate(jdbcTemplate)
.usingDbTime() // Works with PostgreSQL, MySQL, MariaDB, MS SQL, Oracle, HSQLDB, H2, and others
.build()
);
}
}

# Cron Job worker Class which will fetch records from System B and update in Employee Table and also update the Job Table as well.

import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Component
public class CronJob {

@Autowired
private TaskJobCompletionRepository taskJobCompletionRepository;

@Autowired
private EmployeeRepository employeeRepository;

@Autowired
private EmployeeService employeeService;

@Scheduled(cron = “0 */5 * * * ?”) // Runs every 5 minutes
@SchedulerLock(name = “CronJob_runJob”, lockAtMostFor = “PT30M”, lockAtLeastFor = “PT5M”)
public void runJob() {
long lastProcessedIndex = taskJobCompletionRepository.findLastProcessedIndex();
List<Employee> employees = employeeRepository.findEmployeesWithNullOrBlankAddress(lastProcessedIndex, 1000);

CompletableFuture.runAsync(() -> {
employees.forEach(employee -> {
EmployeeDto employeeDto = employeeService.fetchEmployeeData(employee.getId());
employeeRepository.updateEmployee(employeeDto);
});
if (!employees.isEmpty()) {
long newLastProcessedIndex = employees.get(employees.size() – 1).getId();
taskJobCompletionRepository.updateLastProcessedIndex(newLastProcessedIndex);
}
}).join();
}
}



Details Of System B

# Employee DTO response class which is response from System B

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class EmployeeDto {
private int id;
private String name;
private String department;
private double salary;
private String email;
}

Rest Controller deployed in System B

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(“/employee”)
public class EmployeeController {

@GetMapping
public EmployeeDto getEmployee(@RequestParam int employeeId) {
// For demonstration, returning a dummy EmployeeDto
return new EmployeeDto(employeeId, “John Doe”, “Engineering”, 75000.0, “john.doe@example.com”);
}
}

# Service class in System B

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(“/employee”)
public class EmployeeController {

@GetMapping
public EmployeeDto getEmployee(@RequestParam int employeeId) {
// For demonstration, returning a dummy EmployeeDto
return new EmployeeDto(employeeId, “John Doe”, “Engineering”, 75000.0, “john.doe@example.com”);
}
}

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.