Spring Batch

Posted by Adam on August 24, 2022
# [Introduction to Spring Batch](https://www.baeldung.com/introduction-to-spring-batch) pom.xml ```xml <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> ``` application.yml ``` # 自動建立 Spring Batch Schema spring: batch: jdbc: initialize-schema: "always" ``` # [Spring Batch using Partitioner](https://www.baeldung.com/spring-batch-partitioner) --- Spring Batch 是一個專門處理批次作業的框架,提供 **Reader(讀取)、Processor(處理)和 Writer(寫入)** 來處理大規模數據的讀取、轉換與存儲。這三個組件是 **Step** 的核心,負責處理 **Chunk**(區塊)或 **單筆資料**。 --- # 1️⃣ **ItemReader(讀取)** **ItemReader** 負責從數據源讀取資料,並將其轉換為物件,每次調用 `read()` 方法時返回一筆資料,當數據源耗盡時返回 `null`。 ## **常見 Reader 實作** | 類型 | 說明 | |------|------| | `FlatFileItemReader` | 讀取 **CSV / TXT** 文件 | | `JdbcCursorItemReader` | 使用 **JDBC** 讀取資料庫(基於游標機制) | | `JdbcPagingItemReader` | 使用 **JDBC** 分頁讀取資料庫 | | `JpaItemReader` | 使用 **JPA** 讀取資料庫 | | `MongoItemReader` | 讀取 **MongoDB** | | `StaxEventItemReader` | 讀取 **XML** 文件 | | `JsonItemReader` | 讀取 **JSON** 文件 | | `KafkaItemReader` | 讀取 **Kafka** 訊息 | | `MultiResourceItemReader` | 讀取 **多個文件** | ### **範例:讀取 CSV 文件** ```java @Bean public FlatFileItemReader<User> reader() { return new FlatFileItemReaderBuilder<User>() .name("userItemReader") .resource(new FileSystemResource("users.csv")) .delimited() .names("id", "name", "email") .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{ setTargetType(User.class); }}) .build(); } ``` --- # 2️⃣ **ItemProcessor(處理)** **ItemProcessor** 負責資料的轉換、驗證與過濾,處理後的數據會傳遞給 **ItemWriter**。 ## **常見應用** - 數據格式轉換(如 String 轉 Integer) - 過濾資料(例如跳過無效數據) - 資料加密(如 Hash 密碼) - 調用 API 進行補充數據 ### **範例:過濾 Email 無效的使用者** ```java @Bean public ItemProcessor<User, User> processor() { return user -> user.getEmail().contains("@") ? user : null; } ``` 若 `processor()` 返回 `null`,則該筆資料不會進入 **ItemWriter**。 --- # 3️⃣ **ItemWriter(寫入)** **ItemWriter** 負責將處理後的數據批量寫入目標儲存(如資料庫、文件或 API)。 ## **常見 Writer 實作** | 類型 | 說明 | |------|------| | `FlatFileItemWriter` | 寫入 **CSV / TXT** | | `JdbcBatchItemWriter` | 使用 **JDBC** 寫入資料庫 | | `JpaItemWriter` | 使用 **JPA** 寫入資料庫 | | `MongoItemWriter` | 寫入 **MongoDB** | | `KafkaItemWriter` | 寫入 **Kafka** | | `JsonFileItemWriter` | 寫入 **JSON** 文件 | ### **範例:寫入 MySQL 資料庫** ```java @Bean public JdbcBatchItemWriter<User> writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder<User>() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO users (id, name, email) VALUES (:id, :name, :email)") .dataSource(dataSource) .build(); } ``` --- # 🎯 **完整 Spring Batch 配置** ```java @Bean public Step userStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("userStep", jobRepository) .<User, User>chunk(10, transactionManager) .reader(reader()) .processor(processor()) .writer(writer(null)) .build(); } @Bean public Job importUserJob(JobRepository jobRepository, Step userStep) { return new JobBuilder("importUserJob", jobRepository) .start(userStep) .build(); } ``` --- # ✅ **總結** | 組件 | 作用 | |------|------| | **ItemReader** | 讀取數據(CSV、DB、JSON、Kafka...) | | **ItemProcessor** | 處理數據(轉換、驗證、過濾、調用 API...) | | **ItemWriter** | 將處理後的數據寫入目的地(DB、文件、API...) | Spring Batch 透過 **Reader → Processor → Writer** 三段式處理流程,有效處理大量數據,並支援多種數據來源與目的地。 --- # 並行處理 在 **Spring Batch** 中,可以透過不同的方式來並行執行 **Step**,以提升批次作業的效能。常見的並行化方法包括 **多執行緒處理(Chunk-oriented & Tasklet-based)、多步驟併行(Split Flow)、Partitioning、Remote Chunking** 以及 **Remote Partitioning**。 --- ## 🔹 **1. 多執行緒處理(Chunk-oriented 或 Tasklet-based)** 這種方式讓 **單個 Step 內的讀取(Reader)、處理(Processor)和寫入(Writer)** 同時運行多個執行緒,以提高吞吐量。 ### **使用 `TaskExecutor` 並行處理** Spring Batch 提供 `TaskExecutor` 來啟動多執行緒模式,讓 `ItemReader`、`ItemProcessor` 和 `ItemWriter` 可以在多個執行緒中同時運行。 #### **🔹 設定 `taskExecutor`** ```java @Bean public TaskExecutor taskExecutor() { SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("spring_batch"); executor.setConcurrencyLimit(5); // 設定最大並行執行緒數量 return executor; } ``` #### **🔹 並行化 Step** ```java @Bean public Step parallelStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("parallelStep", jobRepository) .<User, User>chunk(10, transactionManager) // Chunk-based .reader(reader()) .processor(processor()) .writer(writer()) .taskExecutor(taskExecutor()) // 啟用多執行緒 .build(); } ``` 👉 **適用場景**: - 需要提高單一 Step 的吞吐量。 - 但仍受限於單一數據來源(如 Reader 需要線性讀取)。 --- ## 🔹 **2. Split Flow(並行執行多個 Step)** 這種方法允許 **不同的 Step 並行執行**,提高批次作業效率。 ### **🔹 設定 `Flow` 並行執行** ```java @Bean public Flow flow1() { return new FlowBuilder<Flow>("flow1") .start(step1()) .build(); } @Bean public Flow flow2() { return new FlowBuilder<Flow>("flow2") .start(step2()) .build(); } @Bean public Job parallelJob(JobRepository jobRepository) { return new JobBuilder("parallelJob", jobRepository) .start(new FlowBuilder<Flow>("splitFlow") .split(taskExecutor()) // 啟用並行 .add(flow1(), flow2()) // 並行執行 step1 和 step2 .build() ) .end() .build(); } ``` 👉 **適用場景**: - 兩個 **彼此獨立** 的 Step 可以同時執行(例如,一個處理 CSV,另一個處理 API 資料)。 - 可以有效利用 CPU 資源,但 Step 之間 **不可有相依性**。 --- ## 🔹 **3. Partitioning(資料分區並行)** `Partitioning` 允許一個 **Step 被分割為多個子 Step**,每個子 Step 可以在不同執行緒中執行。 ### **🔹 設定 PartitionHandler** ```java @Bean public PartitionHandler partitionHandler(TaskExecutor taskExecutor) { TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler(); handler.setTaskExecutor(taskExecutor); handler.setStep(slaveStep()); handler.setGridSize(5); // 設定並行處理的分區數 return handler; } ``` ### **🔹 設定 Master-Slave Step** ```java @Bean public Step masterStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("masterStep", jobRepository) .partitioner("slaveStep", partitioner()) // 設定分區 .step(slaveStep()) // 設定 slaveStep 負責處理每個分區 .partitionHandler(partitionHandler(taskExecutor())) // 設定 Partition Handler .build(); } @Bean public Step slaveStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("slaveStep", jobRepository) .<User, User>chunk(10, transactionManager) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } ``` 👉 **適用場景**: - 可以將 **大數據集分割成小區塊**,並行處理。 - **每個執行緒負責不同的數據區塊**,適合資料庫查詢、檔案處理等。 --- ## 🔹 **4. Remote Chunking(遠端分散式處理)** **Remote Chunking** 允許 **Master Step(負責讀取 & 分配)**,而 **Slave Step 負責處理 & 寫入**,適合分佈式系統(如 Kafka、RabbitMQ)。 ### **🔹 主要概念** - `Master Step`: - 負責讀取資料,並將其發送到遠端 Worker 進行處理。 - `Worker Step`: - 負責從消息隊列(如 Kafka / RabbitMQ)接收數據,並進行 **Processor & Writer** 處理。 👉 **適用場景**: - 當處理量過大時,讓多台機器(Worker)分攤運算。 - 可以結合 **微服務架構**,讓不同 Worker 處理不同類型的數據。 --- ## 🔹 **5. Remote Partitioning(遠端分區並行)** Remote Partitioning 類似於 Partitioning,但將不同的數據區塊分派到 **不同的機器執行**(Worker)。 ### **🔹 與 Remote Chunking 的差異** | 模式 | 適用場景 | Master 角色 | Worker 角色 | |------|--------|------------|------------| | **Remote Chunking** | **適合 CPU 密集型**(處理複雜數據) | 讀取並傳遞數據 | 負責處理與寫入 | | **Remote Partitioning** | **適合資料密集型**(大量資料分區) | 分配不同數據區塊 | 負責讀取、處理、寫入 | --- # 🔥 **總結** | 方法 | 並行化方式 | 適用場景 | |------|----------|--------| | **TaskExecutor** | 使用多執行緒處理 Step | 提高單個 Step 的吞吐量 | | **Split Flow** | 並行執行多個獨立的 Step | 適合無相依性的步驟 | | **Partitioning** | 將數據分割後並行處理 | 適合處理大數據集 | | **Remote Chunking** | 遠端機器處理 Processor & Writer | 適合 CPU 密集型工作 | | **Remote Partitioning** | 遠端機器處理 Partition & Chunk | 適合大規模數據處理 | 不同的並行方法適用於不同的批次處理需求,選擇最合適的方法可以提升整體效能 🚀