PostgreSQL中利用驱动程序实现故障转移

假设我们已经建立了PostgreSQL双向复制 ,最好检查一下中断的情况,以及如何利用 PostgreSQL 驱动程序的本机故障转移功能。

我们将冲突解决策略更改为last_update_wins。这样,在每个数据库中的两个同时更新之间,具有最大提交时间戳的更新将被选中。

listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3
track_commit_timestamp = on
shared_preload_libraries = 'pglogical'
pglogical.conflict_resolution = 'last_update_wins'

我们需要通过新的变化来启动组合服务:

docker compose up

请注意,根据编程语言和驱动程序的不同,此功能可能并不总是可用。概念是,当您配置连接池以建立与数据库的连接时,您可以配置两个主机。第一个主机将是主主机,而辅助主机将是主主机离线后进行故障转移的主机。故障转移可以互换,本质上驱动程序会尝试找到第一个可用的主机。

1、Python
Python 和驱动程序psycopg2提供此功能。我们将使用 flask api 实现一个应用程序。

该应用程序将提供两个端点:
一个用于获取员工的工资,一个用于将工资增加 :

from flask import Flask 
from psycopg2.pool import SimpleConnectionPool
 
app = Flask(__name__)
 
postgreSQL_pool = SimpleConnectionPool(1, 20, user="postgres",
                                       password=
"postgres",
                                       host=
"localhost,localhost",
                                       port=
"5432,5431",
                                       database=
"postgres",
                                       options=
"-c search_path=test_schema")
 
 
@app.route('/employee/<employee_id>/salary/increment', methods=['POST'])
def increment_salary(employee_id):
    conn = postgreSQL_pool.getconn()
    cur = conn.cursor()
    cur.execute(
"""
        UPDATE employee SET salary=salary + %s WHERE id = %s;
       
""", (1, employee_id))
    conn.commit()
    cur.close()
    postgreSQL_pool.putconn(conn)
    return '', 204
 
 
@app.route('/employee/<employee_id>/salary')
def index(employee_id):
    conn = postgreSQL_pool.getconn()
    cur = conn.cursor()
    cur.execute(
"""
        SELECT salary FROM employee WHERE id=%s;
       
""", employee_id)
    salary = cur.fetchone()[0]
    cur.close()
    postgreSQL_pool.putconn(conn)
    return str(salary), 200

让我们以 SimpleConnectionPool 为例,我们可以看到两个用逗号分隔的主机(它是 localhost,因为它是我们正在运行的本地 docker compose),并且在端口部分,相应的主机端口用逗号分隔。

我们可以运行应用程序

flash run
在另一个终端上发出使用 curl 的调用

$ curl -X POST http://localhost:5000/employee/1/salary/increment
$ curl http:
//localhost:5000/employee/1/salary

总体来说,工资会增加,我们应该在获取请求中看到这一点。

现在让我们关闭一个数据库

docker compose stop postgres-b

此操作后的第一次调用将会失败,但连接将重新初始化并指向辅助主机。

% curl http://localhost:5000/employee/1/salary
<!doctype html>
<html lang=en>
<title>500 Internal Server Error</title>
<h1>Internal Server Error</h1>
<p>The server encountered an internal error and was unable to complete your request. Either the server is overloaded or there is an error in the application.</p>
%  curl http:
//localhost:5000/employee/1/salary
1254.23        
                     

2、 Spring Boot
相同的功能适用于其他驱动程序。以 Spring Boot 应用程序上的 Java 驱动程序配置为例。


spring.datasource.url=jdbc:postgresql://localhost:5432,localhost:5431/postgres?currentSchema=test_schema
spring.datasource.username=postgres
spring.datasource.password=postgres

在 jdbc url 上我们添加两个用逗号分隔的主机localhost:5432,localhost:5431

然后我们可以实现具有相同功能的应用程序。

package com.egkatzioura.psqlfailover.repository;
 
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
 
 
@Repository
public class EmployeeRepository {
 
    private final JdbcTemplate jdbcTemplate;
 
    public EmployeeRepository(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }
 
 
    public void incrementSalary(Long employeeId, float increment) {
        jdbcTemplate.update("UPDATE employee SET salary=salary+? WHERE id=?",increment, employeeId);
    }
 
    public Float fetchSalary(Long employeeId) {
        return jdbcTemplate.queryForObject(
"SELECT salary FROM employee WHERE id=?",new Object[]{employeeId},Float.class);
    }
}

 
import com.egkatzioura.psqlfailover.repository.EmployeeRepository;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
public class EmployeeController {
 
    private final EmployeeRepository employeeRepository;
 
    public EmployeeController(EmployeeRepository employeeRepository) {
        this.employeeRepository = employeeRepository;
    }
 
    @PostMapping("/employee/{id}/salary/increment")
    public void incrementSalary(@PathVariable Long id) {
        employeeRepository.incrementSalary(id,1f);
    }
 
    @GetMapping(
"/employee/{id}/salary")
    public Float fetchSalary(@PathVariable Long id) {
        return employeeRepository.fetchSalary(id);
    }
}

由于复制,更改应该已经到达另一个数据库。您可以以循环方式启动和重新启动组合服务。更改将被复制,因此每次发生故障转移时数据都会在那里。

当我们启动和停止数据库时docker compose stop postgres-b,我们可以使用 curl 发出请求:

$ curl -X POST http://localhost:8080/employee/1/salary/increment
$ curl http:
//localhost:8080/employee/1/salary

最终,Java 驱动程序可以更优雅地处理故障转移。在故障转移期间,它不会在第一个请求时失败,而是会先连接到另一台主机并返回结果。

就是这样。您在 PostgreSQL 上设置了双向复制,并设法利用驱动程序功能将故障转移到不同的主机。

希望您玩得开心!