# [Introduction to Spring Batch](https://www.baeldung.com/introduction-to-spring-batch)
pom.xml
```xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
```
application.yml
```
# 自動建立 Spring Batch Schema
spring:
batch:
jdbc:
initialize-schema: "always"
```
# [Spring Batch using Partitioner](https://www.baeldung.com/spring-batch-partitioner)
---
Spring Batch 是一個專門處理批次作業的框架,提供 **Reader(讀取)、Processor(處理)和 Writer(寫入)** 來處理大規模數據的讀取、轉換與存儲。這三個組件是 **Step** 的核心,負責處理 **Chunk**(區塊)或 **單筆資料**。
---
# 1️⃣ **ItemReader(讀取)**
**ItemReader** 負責從數據源讀取資料,並將其轉換為物件,每次調用 `read()` 方法時返回一筆資料,當數據源耗盡時返回 `null`。
## **常見 Reader 實作**
| 類型 | 說明 |
|------|------|
| `FlatFileItemReader` | 讀取 **CSV / TXT** 文件 |
| `JdbcCursorItemReader` | 使用 **JDBC** 讀取資料庫(基於游標機制) |
| `JdbcPagingItemReader` | 使用 **JDBC** 分頁讀取資料庫 |
| `JpaItemReader` | 使用 **JPA** 讀取資料庫 |
| `MongoItemReader` | 讀取 **MongoDB** |
| `StaxEventItemReader` | 讀取 **XML** 文件 |
| `JsonItemReader` | 讀取 **JSON** 文件 |
| `KafkaItemReader` | 讀取 **Kafka** 訊息 |
| `MultiResourceItemReader` | 讀取 **多個文件** |
### **範例:讀取 CSV 文件**
```java
@Bean
public FlatFileItemReader<User> reader() {
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new FileSystemResource("users.csv"))
.delimited()
.names("id", "name", "email")
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(User.class);
}})
.build();
}
```
---
# 2️⃣ **ItemProcessor(處理)**
**ItemProcessor** 負責資料的轉換、驗證與過濾,處理後的數據會傳遞給 **ItemWriter**。
## **常見應用**
- 數據格式轉換(如 String 轉 Integer)
- 過濾資料(例如跳過無效數據)
- 資料加密(如 Hash 密碼)
- 調用 API 進行補充數據
### **範例:過濾 Email 無效的使用者**
```java
@Bean
public ItemProcessor<User, User> processor() {
return user -> user.getEmail().contains("@") ? user : null;
}
```
若 `processor()` 返回 `null`,則該筆資料不會進入 **ItemWriter**。
---
# 3️⃣ **ItemWriter(寫入)**
**ItemWriter** 負責將處理後的數據批量寫入目標儲存(如資料庫、文件或 API)。
## **常見 Writer 實作**
| 類型 | 說明 |
|------|------|
| `FlatFileItemWriter` | 寫入 **CSV / TXT** |
| `JdbcBatchItemWriter` | 使用 **JDBC** 寫入資料庫 |
| `JpaItemWriter` | 使用 **JPA** 寫入資料庫 |
| `MongoItemWriter` | 寫入 **MongoDB** |
| `KafkaItemWriter` | 寫入 **Kafka** |
| `JsonFileItemWriter` | 寫入 **JSON** 文件 |
### **範例:寫入 MySQL 資料庫**
```java
@Bean
public JdbcBatchItemWriter<User> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<User>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO users (id, name, email) VALUES (:id, :name, :email)")
.dataSource(dataSource)
.build();
}
```
---
# 🎯 **完整 Spring Batch 配置**
```java
@Bean
public Step userStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("userStep", jobRepository)
.<User, User>chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer(null))
.build();
}
@Bean
public Job importUserJob(JobRepository jobRepository, Step userStep) {
return new JobBuilder("importUserJob", jobRepository)
.start(userStep)
.build();
}
```
---
# ✅ **總結**
| 組件 | 作用 |
|------|------|
| **ItemReader** | 讀取數據(CSV、DB、JSON、Kafka...) |
| **ItemProcessor** | 處理數據(轉換、驗證、過濾、調用 API...) |
| **ItemWriter** | 將處理後的數據寫入目的地(DB、文件、API...) |
Spring Batch 透過 **Reader → Processor → Writer** 三段式處理流程,有效處理大量數據,並支援多種數據來源與目的地。
---
# 並行處理
在 **Spring Batch** 中,可以透過不同的方式來並行執行 **Step**,以提升批次作業的效能。常見的並行化方法包括 **多執行緒處理(Chunk-oriented & Tasklet-based)、多步驟併行(Split Flow)、Partitioning、Remote Chunking** 以及 **Remote Partitioning**。
---
## 🔹 **1. 多執行緒處理(Chunk-oriented 或 Tasklet-based)**
這種方式讓 **單個 Step 內的讀取(Reader)、處理(Processor)和寫入(Writer)** 同時運行多個執行緒,以提高吞吐量。
### **使用 `TaskExecutor` 並行處理**
Spring Batch 提供 `TaskExecutor` 來啟動多執行緒模式,讓 `ItemReader`、`ItemProcessor` 和 `ItemWriter` 可以在多個執行緒中同時運行。
#### **🔹 設定 `taskExecutor`**
```java
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("spring_batch");
executor.setConcurrencyLimit(5); // 設定最大並行執行緒數量
return executor;
}
```
#### **🔹 並行化 Step**
```java
@Bean
public Step parallelStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("parallelStep", jobRepository)
.<User, User>chunk(10, transactionManager) // Chunk-based
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor()) // 啟用多執行緒
.build();
}
```
👉 **適用場景**:
- 需要提高單一 Step 的吞吐量。
- 但仍受限於單一數據來源(如 Reader 需要線性讀取)。
---
## 🔹 **2. Split Flow(並行執行多個 Step)**
這種方法允許 **不同的 Step 並行執行**,提高批次作業效率。
### **🔹 設定 `Flow` 並行執行**
```java
@Bean
public Flow flow1() {
return new FlowBuilder<Flow>("flow1")
.start(step1())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<Flow>("flow2")
.start(step2())
.build();
}
@Bean
public Job parallelJob(JobRepository jobRepository) {
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("splitFlow")
.split(taskExecutor()) // 啟用並行
.add(flow1(), flow2()) // 並行執行 step1 和 step2
.build()
)
.end()
.build();
}
```
👉 **適用場景**:
- 兩個 **彼此獨立** 的 Step 可以同時執行(例如,一個處理 CSV,另一個處理 API 資料)。
- 可以有效利用 CPU 資源,但 Step 之間 **不可有相依性**。
---
## 🔹 **3. Partitioning(資料分區並行)**
`Partitioning` 允許一個 **Step 被分割為多個子 Step**,每個子 Step 可以在不同執行緒中執行。
### **🔹 設定 PartitionHandler**
```java
@Bean
public PartitionHandler partitionHandler(TaskExecutor taskExecutor) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setTaskExecutor(taskExecutor);
handler.setStep(slaveStep());
handler.setGridSize(5); // 設定並行處理的分區數
return handler;
}
```
### **🔹 設定 Master-Slave Step**
```java
@Bean
public Step masterStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("masterStep", jobRepository)
.partitioner("slaveStep", partitioner()) // 設定分區
.step(slaveStep()) // 設定 slaveStep 負責處理每個分區
.partitionHandler(partitionHandler(taskExecutor())) // 設定 Partition Handler
.build();
}
@Bean
public Step slaveStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("slaveStep", jobRepository)
.<User, User>chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
```
👉 **適用場景**:
- 可以將 **大數據集分割成小區塊**,並行處理。
- **每個執行緒負責不同的數據區塊**,適合資料庫查詢、檔案處理等。
---
## 🔹 **4. Remote Chunking(遠端分散式處理)**
**Remote Chunking** 允許 **Master Step(負責讀取 & 分配)**,而 **Slave Step 負責處理 & 寫入**,適合分佈式系統(如 Kafka、RabbitMQ)。
### **🔹 主要概念**
- `Master Step`:
- 負責讀取資料,並將其發送到遠端 Worker 進行處理。
- `Worker Step`:
- 負責從消息隊列(如 Kafka / RabbitMQ)接收數據,並進行 **Processor & Writer** 處理。
👉 **適用場景**:
- 當處理量過大時,讓多台機器(Worker)分攤運算。
- 可以結合 **微服務架構**,讓不同 Worker 處理不同類型的數據。
---
## 🔹 **5. Remote Partitioning(遠端分區並行)**
Remote Partitioning 類似於 Partitioning,但將不同的數據區塊分派到 **不同的機器執行**(Worker)。
### **🔹 與 Remote Chunking 的差異**
| 模式 | 適用場景 | Master 角色 | Worker 角色 |
|------|--------|------------|------------|
| **Remote Chunking** | **適合 CPU 密集型**(處理複雜數據) | 讀取並傳遞數據 | 負責處理與寫入 |
| **Remote Partitioning** | **適合資料密集型**(大量資料分區) | 分配不同數據區塊 | 負責讀取、處理、寫入 |
---
# 🔥 **總結**
| 方法 | 並行化方式 | 適用場景 |
|------|----------|--------|
| **TaskExecutor** | 使用多執行緒處理 Step | 提高單個 Step 的吞吐量 |
| **Split Flow** | 並行執行多個獨立的 Step | 適合無相依性的步驟 |
| **Partitioning** | 將數據分割後並行處理 | 適合處理大數據集 |
| **Remote Chunking** | 遠端機器處理 Processor & Writer | 適合 CPU 密集型工作 |
| **Remote Partitioning** | 遠端機器處理 Partition & Chunk | 適合大規模數據處理 |
不同的並行方法適用於不同的批次處理需求,選擇最合適的方法可以提升整體效能 🚀