Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow pg_partman to be installed in any schema #310

Merged
merged 9 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pgmq-extension/pgmq.control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
comment = 'A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.'
default_version = '1.4.3'
default_version = '1.4.4'
module_pathname = '$libdir/pgmq'
schema = 'pgmq'
relocatable = false
Expand Down
310 changes: 310 additions & 0 deletions pgmq-extension/sql/pgmq--1.4.3--1.4.4.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
CREATE OR REPLACE FUNCTION pgmq._get_pg_partman_schema()
RETURNS TEXT AS $$
SELECT
extnamespace::regnamespace::text
FROM
pg_extension
WHERE
extname = 'pg_partman';
$$ LANGUAGE SQL;


CREATE OR REPLACE FUNCTION pgmq.drop_queue(queue_name TEXT, partitioned BOOLEAN DEFAULT FALSE)
RETURNS BOOLEAN AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
fq_qtable TEXT := 'pgmq.' || qtable;
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_atable TEXT := 'pgmq.' || atable;
BEGIN
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
qtable
);

EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
atable
);

EXECUTE FORMAT(
$QUERY$
DROP TABLE IF EXISTS pgmq.%I
$QUERY$,
qtable
);

EXECUTE FORMAT(
$QUERY$
DROP TABLE IF EXISTS pgmq.%I
$QUERY$,
atable
);

IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = 'meta' and table_schema = 'pgmq'
) THEN
EXECUTE FORMAT(
$QUERY$
DELETE FROM pgmq.meta WHERE queue_name = %L
$QUERY$,
queue_name
);
END IF;

IF partitioned THEN
EXECUTE FORMAT(
$QUERY$
DELETE FROM %I.part_config where parent_table in (%L, %L)
$QUERY$,
pgmq._get_pg_partman_schema(), fq_qtable, fq_atable
);
END IF;

RETURN TRUE;
END;
$$ LANGUAGE plpgsql;


CREATE OR REPLACE FUNCTION pgmq.create_partitioned(
queue_name TEXT,
partition_interval TEXT DEFAULT '10000',
retention_interval TEXT DEFAULT '100000'
)
RETURNS void AS $$
DECLARE
partition_col TEXT;
a_partition_col TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_qtable TEXT := 'pgmq.' || qtable;
fq_atable TEXT := 'pgmq.' || atable;
BEGIN
PERFORM pgmq.validate_queue_name(queue_name);
PERFORM pgmq._ensure_pg_partman_installed();
SELECT pgmq._get_partition_col(partition_interval) INTO partition_col;

EXECUTE FORMAT(
$QUERY$
CREATE TABLE IF NOT EXISTS pgmq.%I (
msg_id BIGINT GENERATED ALWAYS AS IDENTITY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
) PARTITION BY RANGE (%I)
$QUERY$,
qtable, partition_col
);

IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
END IF;

-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
-- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema.
EXECUTE FORMAT(
$QUERY$
SELECT %I.create_parent(
p_parent_table := %L,
p_control := %L,
p_interval := %L,
p_type := case
when pgmq._get_pg_partman_major_version() = 5 then 'range'
else 'native'
end
)
$QUERY$,
pgmq._get_pg_partman_schema(),
fq_qtable,
partition_col,
partition_interval
);

EXECUTE FORMAT(
$QUERY$
CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (%I);
$QUERY$,
qtable || '_part_idx', qtable, partition_col
);

EXECUTE FORMAT(
$QUERY$
UPDATE %I.part_config
SET
retention = %L,
retention_keep_table = false,
retention_keep_index = true,
automatic_maintenance = 'on'
WHERE parent_table = %L;
$QUERY$,
pgmq._get_pg_partman_schema(),
retention_interval,
'pgmq.' || qtable
);

EXECUTE FORMAT(
$QUERY$
INSERT INTO pgmq.meta (queue_name, is_partitioned, is_unlogged)
VALUES (%L, true, false)
ON CONFLICT
DO NOTHING;
$QUERY$,
queue_name
);

IF partition_col = 'enqueued_at' THEN
a_partition_col := 'archived_at';
ELSE
a_partition_col := partition_col;
END IF;

EXECUTE FORMAT(
$QUERY$
CREATE TABLE IF NOT EXISTS pgmq.%I (
msg_id BIGINT NOT NULL,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
) PARTITION BY RANGE (%I);
$QUERY$,
atable, a_partition_col
);

IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
END IF;

-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
-- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema.
EXECUTE FORMAT(
$QUERY$
SELECT %I.create_parent(
p_parent_table := %L,
p_control := %L,
p_interval := %L,
p_type := case
when pgmq._get_pg_partman_major_version() = 5 then 'range'
else 'native'
end
)
$QUERY$,
pgmq._get_pg_partman_schema(),
fq_atable,
a_partition_col,
partition_interval
);

EXECUTE FORMAT(
$QUERY$
UPDATE %I.part_config
SET
retention = %L,
retention_keep_table = false,
retention_keep_index = true,
automatic_maintenance = 'on'
WHERE parent_table = %L;
$QUERY$,
pgmq._get_pg_partman_schema(),
retention_interval,
'pgmq.' || atable
);

EXECUTE FORMAT(
$QUERY$
CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (archived_at);
$QUERY$,
'archived_at_idx_' || queue_name, atable
);

END;
$$ LANGUAGE plpgsql;


CREATE OR REPLACE FUNCTION pgmq.convert_archive_partitioned(
table_name TEXT,
partition_interval TEXT DEFAULT '10000',
retention_interval TEXT DEFAULT '100000',
leading_partition INT DEFAULT 10
)
RETURNS void AS $$
DECLARE
a_table_name TEXT := pgmq.format_table_name(table_name, 'a');
a_table_name_old TEXT := pgmq.format_table_name(table_name, 'a') || '_old';
qualified_a_table_name TEXT := format('pgmq.%I', a_table_name);
BEGIN

PERFORM c.relkind
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = a_table_name
AND c.relkind = 'p';

IF FOUND THEN
RAISE NOTICE 'Table %s is already partitioned', a_table_name;
RETURN;
END IF;

PERFORM c.relkind
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = a_table_name
AND c.relkind = 'r';

IF NOT FOUND THEN
RAISE NOTICE 'Table %s does not exists', a_table_name;
RETURN;
END IF;

EXECUTE 'ALTER TABLE ' || qualified_a_table_name || ' RENAME TO ' || a_table_name_old;

EXECUTE format( 'CREATE TABLE pgmq.%I (LIKE pgmq.%I including all) PARTITION BY RANGE (msg_id)', a_table_name, a_table_name_old );

EXECUTE 'ALTER INDEX pgmq.archived_at_idx_' || table_name || ' RENAME TO archived_at_idx_' || table_name || '_old';
EXECUTE 'CREATE INDEX archived_at_idx_'|| table_name || ' ON ' || qualified_a_table_name ||'(archived_at)';

-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
-- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema.
EXECUTE FORMAT(
$QUERY$
SELECT %I.create_parent(
p_parent_table := %L,
p_control := 'msg_id',
p_interval := %L,
p_type := case
when pgmq._get_pg_partman_major_version() = 5 then 'range'
else 'native'
end
)
$QUERY$,
pgmq._get_pg_partman_schema(),
qualified_a_table_name,
partition_interval
);

EXECUTE FORMAT(
$QUERY$
UPDATE %I.part_config
SET
retention = %L,
retention_keep_table = false,
retention_keep_index = false,
infinite_time_partitions = true
WHERE
parent_table = %L;
$QUERY$,
pgmq._get_pg_partman_schema(),
retention_interval,
qualified_a_table_name
);

END;
$$ LANGUAGE plpgsql;
Loading
Loading