假设我们已经建立了PostgreSQL双向复制 ,最好检查一下中断的情况,以及如何利用 PostgreSQL 驱动程序的本机故障转移功能。
我们将冲突解决策略更改为last_update_wins。这样,在每个数据库中的两个同时更新之间,具有最大提交时间戳的更新将被选中。
listen_addresses = '*' port = 5432 max_connections = 20 shared_buffers = 128MB temp_buffers = 8MB work_mem = 4MB wal_level = logical max_wal_senders = 3 track_commit_timestamp = on shared_preload_libraries = 'pglogical' pglogical.conflict_resolution = 'last_update_wins'
|
我们需要通过新的变化来启动组合服务:
docker compose up
请注意,根据编程语言和驱动程序的不同,此功能可能并不总是可用。概念是,当您配置连接池以建立与数据库的连接时,您可以配置两个主机。第一个主机将是主主机,而辅助主机将是主主机离线后进行故障转移的主机。故障转移可以互换,本质上驱动程序会尝试找到第一个可用的主机。
1、Python
Python 和驱动程序psycopg2提供此功能。我们将使用 flask api 实现一个应用程序。
该应用程序将提供两个端点:
一个用于获取员工的工资,一个用于将工资增加 :
from flask import Flask from psycopg2.pool import SimpleConnectionPool app = Flask(__name__) postgreSQL_pool = SimpleConnectionPool(1, 20, user="postgres", password="postgres", host="localhost,localhost", port="5432,5431", database="postgres", options="-c search_path=test_schema") @app.route('/employee/<employee_id>/salary/increment', methods=['POST']) def increment_salary(employee_id): conn = postgreSQL_pool.getconn() cur = conn.cursor() cur.execute(""" UPDATE employee SET salary=salary + %s WHERE id = %s; """, (1, employee_id)) conn.commit() cur.close() postgreSQL_pool.putconn(conn) return '', 204 @app.route('/employee/<employee_id>/salary') def index(employee_id): conn = postgreSQL_pool.getconn() cur = conn.cursor() cur.execute(""" SELECT salary FROM employee WHERE id=%s; """, employee_id) salary = cur.fetchone()[0] cur.close() postgreSQL_pool.putconn(conn) return str(salary), 200
|
让我们以 SimpleConnectionPool 为例,我们可以看到两个用逗号分隔的主机(它是 localhost,因为它是我们正在运行的本地 docker compose),并且在端口部分,相应的主机端口用逗号分隔。
我们可以运行应用程序
flash run
在另一个终端上发出使用 curl 的调用
$ curl -X POST http://localhost:5000/employee/1/salary/increment $ curl http://localhost:5000/employee/1/salary
|
总体来说,工资会增加,我们应该在获取请求中看到这一点。现在让我们关闭一个数据库
docker compose stop postgres-b
此操作后的第一次调用将会失败,但连接将重新初始化并指向辅助主机。
% curl http://localhost:5000/employee/1/salary <!doctype html> <html lang=en> <title>500 Internal Server Error</title> <h1>Internal Server Error</h1> <p>The server encountered an internal error and was unable to complete your request. Either the server is overloaded or there is an error in the application.</p> % curl http://localhost:5000/employee/1/salary 1254.23
|
2、 Spring Boot
相同的功能适用于其他驱动程序。以 Spring Boot 应用程序上的 Java 驱动程序配置为例。
spring.datasource.url=jdbc:postgresql://localhost:5432,localhost:5431/postgres?currentSchema=test_schema spring.datasource.username=postgres spring.datasource.password=postgres
|
在 jdbc url 上我们添加两个用逗号分隔的主机localhost:5432,localhost:5431然后我们可以实现具有相同功能的应用程序。
package com.egkatzioura.psqlfailover.repository; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; @Repository public class EmployeeRepository { private final JdbcTemplate jdbcTemplate; public EmployeeRepository(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } public void incrementSalary(Long employeeId, float increment) { jdbcTemplate.update("UPDATE employee SET salary=salary+? WHERE id=?",increment, employeeId); } public Float fetchSalary(Long employeeId) { return jdbcTemplate.queryForObject("SELECT salary FROM employee WHERE id=?",new Object[]{employeeId},Float.class); } }
|
import com.egkatzioura.psqlfailover.repository.EmployeeRepository; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class EmployeeController { private final EmployeeRepository employeeRepository; public EmployeeController(EmployeeRepository employeeRepository) { this.employeeRepository = employeeRepository; } @PostMapping("/employee/{id}/salary/increment") public void incrementSalary(@PathVariable Long id) { employeeRepository.incrementSalary(id,1f); } @GetMapping("/employee/{id}/salary") public Float fetchSalary(@PathVariable Long id) { return employeeRepository.fetchSalary(id); } }
|
由于复制,更改应该已经到达另一个数据库。您可以以循环方式启动和重新启动组合服务。更改将被复制,因此每次发生故障转移时数据都会在那里。当我们启动和停止数据库时docker compose stop postgres-b,我们可以使用 curl 发出请求:
$ curl -X POST http://localhost:8080/employee/1/salary/increment $ curl http://localhost:8080/employee/1/salary
|
最终,Java 驱动程序可以更优雅地处理故障转移。在故障转移期间,它不会在第一个请求时失败,而是会先连接到另一台主机并返回结果。就是这样。您在 PostgreSQL 上设置了双向复制,并设法利用驱动程序功能将故障转移到不同的主机。
希望您玩得开心!