目录

pg使用触发器记录ddl

目录

pg使用触发器记录ddl

postgreSQL开放事件触发器,可以实现DDL回收站、DDL防火墙、DDL增量订阅同步等功能,灵活使用事件触发器可以减少维护成本,保护数据安全。 RDS PostgreSQL开放事件触发器,可以实现DDL回收站、DDL防火墙、DDL增量订阅同步等功能,灵活使用事件触发器可以减少维护成本,保护数据安全。

前提条件 实例版本为PostgreSQL 10、11、12云盘版。 背景信息 如果您对数据库安全有非常高的要求,可以基于事件触发器创建DDL回收站规则、DDL防火墙规则,在多个维度保护数据安全:

事前防御:防止drop table、drop index、drop database等删库删表危险操作。 事后回档:在发生意外删表后,可以从回收站找回。 原理是使用pg_get_ddl_command和pg_get_ddl_drop这两个事件触发器收集并存入DDL语句到表dts_audit.dts_tb_ddl_command中,其具体实现在函数pg_func_ddl_command()中。其中dts_audit.dts_tb_ddl_command表结构如下:

Column      |            Type             | Collation | Nullable |      Default       | Storage  | Stats target | Description 

—————–+—————————–+———–+———-+——————–+———-+————–+————- event | text | | | | extended | |
tag | text | | | | extended | | Command tag classid | oid | | | | plain | | OID of catalog the object belonged in objid | oid | | | | plain | | OID the object had within the catalog objsubid | integer | | | | plain | | Object sub-id (e.g. attribute number for columns) object_type | text | | | | extended | | Type of the object schema_name | text | | | | extended | | Name of the schema the object belonged in, if any; otherwise NULL. No quoting is applied. object_identity | text | | | | extended | | Text rendering of the object identity, schema-qualified. is_extension | boolean | | | | plain | | True if the command is part of an extension script query | text | | | | extended | | sql text username | text | | | CURRENT_USER | extended | |
datname | text | | | current_database() | extended | | client_addr | inet | | | inet_client_addr() | main | | crt_time | timestamp without time zone | | | now() | plain | |

创建语句如下:

CREATE SCHEMA IF NOT EXISTS dts_audit;
CREATE TABLE IF NOT EXISTS dts_audit.dts_tb_ddl_command ( event text,
tag text, classid oid, objid oid, objsubid int,
object_type text, schema_name text, object_identity text, is_extension bool, query text,
username text default current_user, datname text default current_database(),
client_addr inet default inet_client_addr(), crt_time timestamp default now()
);

下文将以示例的方式介绍如何实现DDL回收站、DDL防火墙、DDL增量订阅及同步等功能,您可以根据业务情况修改相关代码。

DDL回收站 执行如下命令创建表、函数和相关触发器。


RDS PostgreSQL开放事件触发器,可以实现DDL回收站、DDL防火墙、DDL增量订阅同步等功能,灵活使用事件触发器可以减少维护成本,保护数据安全。

前提条件
实例版本为PostgreSQL 10、11、12云盘版。
背景信息
如果您对数据库安全有非常高的要求,可以基于事件触发器创建DDL回收站规则、DDL防火墙规则,在多个维度保护数据安全:

事前防御:防止drop table、drop index、drop database等删库删表危险操作。
事后回档:在发生意外删表后,可以从回收站找回。
原理是使用pg_get_ddl_command和pg_get_ddl_drop这两个事件触发器收集并存入DDL语句到表dts_audit.dts_tb_ddl_command中,其具体实现在函数pg_func_ddl_command()中。其中dts_audit.dts_tb_ddl_command表结构如下:

     Column      |            Type             | Collation | Nullable |      Default       | Storage  | Stats target | Description 
-----------------+-----------------------------+-----------+----------+--------------------+----------+--------------+-------------
 event           | text                        |           |          |                    | extended |              |   
 tag             | text                        |           |          |                    | extended |              | Command tag
 classid         | oid                         |           |          |                    | plain    |              | OID of catalog the object belonged in
 objid           | oid                         |           |          |                    | plain    |              | OID the object had within the catalog
 objsubid        | integer                     |           |          |                    | plain    |              | Object sub-id (e.g. attribute number for columns)
 object_type     | text                        |           |          |                    | extended |              | Type of the object
 schema_name     | text                        |           |          |                    | extended |              | Name of the schema the object belonged in, if any; otherwise NULL. No quoting is applied.
 object_identity | text                        |           |          |                    | extended |              | Text rendering of the object identity, schema-qualified.
 is_extension    | boolean                     |           |          |                    | plain    |              | True if the command is part of an extension script
 query           | text                        |           |          |                    | extended |              | sql text
 username        | text                        |           |          | CURRENT_USER       | extended |              |  
 datname         | text                        |           |          | current_database() | extended |              | 
 client_addr     | inet                        |           |          | inet_client_addr() | main     |              | 
 crt_time        | timestamp without time zone |           |          | now()              | plain    |              |
创建语句如下:

CREATE SCHEMA IF NOT EXISTS dts_audit;
CREATE TABLE IF NOT EXISTS dts_audit.dts_tb_ddl_command ( event text,
tag text, classid oid, objid oid, objsubid int,
object_type text, schema_name text, object_identity text, is_extension bool, query text,
username text default current_user, datname text default current_database(),
client_addr inet default inet_client_addr(), crt_time timestamp default now()
);
下文将以示例的方式介绍如何实现DDL回收站、DDL防火墙、DDL增量订阅及同步等功能,您可以根据业务情况修改相关代码。

DDL回收站
执行如下命令创建表、函数和相关触发器。
/* external/rds_ddl_pulication/rds_ddl_pulication--1.0.sql */

--create schema
CREATE SCHEMA IF NOT EXISTS dts_audit;
--create table for ddl record
CREATE TABLE IF NOT EXISTS dts_audit.dts_tb_ddl_command ( event text,
tag text, classid oid, objid oid, objsubid int,
object_type text, schema_name text, object_identity text, is_extension bool, query text,
username text default current_user, datname text default current_database(), client_addr inet default inet_client_addr(), crt_time timestamp default now()
);
-- create function for event triggers
create or replace function dts_audit.dts_func_ddl_command() returns event_trigger as $$
declare v1 text;
is_superuser bool = false;
r record; 
begin
    -- we don't record  ddl command from superusers 
    select u.rolsuper into is_superuser from pg_catalog.pg_roles u where u.rolname = SESSION_USER;
    if is_superuser then
        return;
    end if;
    select query into v1 from pg_stat_activity where pid=pg_backend_pid();
    -- RAISE NOTICE 'ddl event:%, command:%', tg_event, tg_tag;
    -- NB:since ddl_command_end cannot collect the details of the drop statement, we use sql_drop
    if TG_EVENT='ddl_command_end' then
        SELECT * into r FROM pg_event_trigger_ddl_commands(); 
        if r.classid > 0 then
            insert into dts_audit.dts_tb_ddl_command(event, tag, classid, objid, objsubid, object_type, schema_name, object_identity, is_extension, query)
            values(TG_EVENT, TG_TAG, r.classid, r.objid, r.objsubid, r.object_type, r.schema_name, r.object_identity, r.in_extension, v1);
         end if; 
    end if;
    if TG_EVENT='sql_drop' then
        -- To avoid repeated collection, we filtered 'ALTER TABLE' and 'ALTER FOREIGN TABLE'
        if TG_TAG != 'ALTER TABLE' and TG_TAG != 'ALTER FOREIGN TABLE' then
            SELECT * into r FROM pg_event_trigger_dropped_objects();
            insert into dts_audit.dts_tb_ddl_command(event, tag, classid, objid, objsubid, object_type, schema_name, object_identity, is_extension, query)
            values(TG_EVENT, TG_TAG, r.classid, r.objid, r.objsubid, r.object_type, r.schema_name, r.object_identity, 'f', v1);
        end if; 
    end if;
end;
$$ language plpgsql strict;
-- ddl_command_end event trigger
CREATE EVENT TRIGGER pg_get_ddl_command on ddl_command_end EXECUTE PROCEDURE dts_audit.dts_func_ddl_command();
-- pg_get_ddl_drop event trigger
CREATE EVENT TRIGGER pg_get_ddl_drop on sql_drop EXECUTE PROCEDURE dts_audit.dts_func_ddl_command();
-- grant privileges to all user
GRANT USAGE ON SCHEMA dts_audit TO PUBLIC;
GRANT SELECT, INSERT ON TABLE dts_audit.dts_tb_ddl_command TO PUBLIC;

执行一个DDL语句,测试能否记录变更。

create table tb_test(id int);
select * from dts_audit.dts_tb_ddl_command;

DDL防火墙 您可以根据业务需求创建事件触发器,使用ddl_command_start事件类型,可以阻止相应的DDL语句执行。

创建触发器函数

CREATE OR REPLACE FUNCTION abort1()
  RETURNS event_trigger
 LANGUAGE plpgsql
  AS $$
BEGIN
  if current_user = 'test1' then
    RAISE EXCEPTION 'event:%, command:%', tg_event, tg_tag;
  end if;
 END;
$$;

创建触发器阻止创建和删除表的语句

create event trigger b on ddl_command_start when TAG IN ('CREATE TABLE', 'DROP TABLE') execute procedure abort1();

DDL增量订阅同步 在发布端我们将已经执行了的DDL语句存储在dts_audit.dts_tb_ddl_command中。订阅端可以读取记录进行同步。

在发布端执行发布命令。

CREATE PUBLICATION my_ddl_publication FOR TABLE ONLY dts_audit.dts_tb_ddl_command;

在订阅端创建相同的表。

CREATE SCHEMA IF NOT EXISTS dts_audit;
CREATE TABLE IF NOT EXISTS dts_audit.dts_tb_ddl_command ( event text,
tag text, classid oid, objid oid, objsubid int,
object_type text, schema_name text, object_identity text, is_extension bool, query text,
username text, datname text, client_addr inet , crt_time timestamp );

在订阅端创建订阅。

CREATE SUBSCRIPTION my_ddl_subscriptin CONNECTION 'host=*** port=*** user=*** password=***  dbname=**' PUBLICATION my_ddl_publication;

示例

CREATE SUBSCRIPTION my_ddl_subscriptin CONNECTION 'host=pgm-bpxxxxx.pg.rds.aliyuncs.com port=1433 user=test1 password=xxxxx  dbname=testdb1' PUBLICATION my_ddl_publication;

在订阅端针对dts_audit.dts_tb_ddl_command表创建相应的触发器,实现DDL增量同步。