Stream triggers v5

PGD introduces new types of triggers that you can use for additional data processing on the downstream/target node.

  • Conflict triggers
  • Transform triggers

Together, these types of triggers are known as stream triggers.

Stream triggers are designed to be trigger-like in syntax. They leverage the PostgreSQL BEFORE trigger architecture and are likely to have similar performance characteristics as PostgreSQL BEFORE Triggers.

Multiple trigger definitions can use one trigger function, just as with normal PostgreSQL triggers. A trigger function is a program defined in this form: CREATE FUNCTION ... RETURNS TRIGGER. Creating the trigger doesn't require use of the CREATE TRIGGER command. Instead, create stream triggers using the special PGD functions bdr.create_conflict_trigger() and bdr.create_transform_trigger().

Once created, the trigger is visible in the catalog table pg_trigger. The stream triggers are marked as tgisinternal = true and tgenabled = 'D' and have the name suffix '_bdrc' or '_bdrt'. The view bdr.triggers provides information on the triggers in relation to the table, the name of the procedure that is being executed, the event that triggers it, and the trigger type.

Stream triggers aren't enabled for normal SQL processing. Because of this, the ALTER TABLE ... ENABLE TRIGGER is blocked for stream triggers in both its specific name variant and the ALL variant. This mechanism prevents the trigger from executing as a normal SQL trigger.

These triggers execute on the downstream or target node. There's no option for them to execute on the origin node. However, you might want to consider the use of row_filter expressions on the origin.

Also, any DML that is applied while executing a stream trigger isn't replicated to other PGD nodes and doesn't trigger the execution of standard local triggers. This is intentional. You can use it, for example, to log changes or conflicts captured by a stream trigger into a table that is crash-safe and specific of that node. See Stream triggers examples for a working example.

Trigger execution during Apply

Transform triggers execute firstonce for each incoming change in the triggering table. These triggers fire before we attempt to locate a matching target row, allowing a very wide range of transforms to be applied efficiently and consistently.

Next, for UPDATE and DELETE changes, we locate the target row. If there's no target row, then no further processing occurs for those change types.

We then execute any normal triggers that previously were explicitly enabled as replica triggers at table-level:

ALTER TABLE tablename
ENABLE REPLICA TRIGGER trigger_name;

We then decide whether a potential conflict exists. If so, we then call any conflict trigger that exists for that table.

Missing column conflict resolution

Before transform triggers are executed, PostgreSQL tries to match the incoming tuple against the row-type of the target table.

Any column that exists on the input row but not on the target table triggers a conflict of type target_column_missing. Conversely, a column existing on the target table but not in the incoming row triggers a source_column_missing conflict. The default resolutions for those two conflict types are respectively ignore_if_null and use_default_value.

This is relevant in the context of rolling schema upgrades, for example, if the new version of the schema introduces a new column. When replicating from an old version of the schema to a new one, the source column is missing, and the use_default_value strategy is appropriate, as it populates the newly introduced column with the default value.

However, when replicating from a node having the new schema version to a node having the old one, the column is missing from the target table. The ignore_if_null resolver isn't appropriate for a rolling upgrade because it breaks replication as soon as the user inserts a tuple with a non-NULL value in the new column in any of the upgraded nodes.

In view of this example, the appropriate setting for rolling schema upgrades is to configure each node to apply the ignore resolver in case of a target_column_missing conflict.

You can do this with the following query, which you must execute separately on each node. Replace node1 with the actual node name.

SELECT bdr.alter_node_set_conflict_resolver('node1',
    'target_column_missing', 'ignore');

Data loss and divergence risk

Setting the conflict resolver to ignore can lead to data loss and cluster divergence.

Consider the following example: table t exists on nodes 1 and 2, but its column col exists only on node 1.

If the conflict resolver is set to ignore, then there can be rows on node 1 where c isn't null, for example, (pk=1, col=100). That row is replicated to node 2, and the value in column c is discarded, for example, (pk=1).

If column c is then added to the table on node 2, it is at first set to NULL on all existing rows, and the row considered above becomes (pk=1, col=NULL). The row having pk=1 is no longer identical on all nodes, and the cluster is therefore divergent.

The default ignore_if_null resolver isn't affected by this risk because any row replicated to node 2 has col=NULL.

Based on this example, we recommend running LiveCompare against the whole cluster at the end of a rolling schema upgrade where the ignore resolver was used. This practice helps to ensure that you detect and fix any divergence.

Terminology of row-types

We use these row-types:

  • SOURCE_OLD is the row before update, that is, the key.
  • SOURCE_NEW is the new row coming from another node.
  • TARGET is the row that exists on the node already, that is, the conflicting row.

Conflict triggers

Conflict triggers execute when a conflict is detected by PGD. They decide what happens when the conflict has occurred.

  • If the trigger function returns a row, the action is applied to the target.
  • If the trigger function returns a NULL row, the action is skipped.

For example, if the trigger is called for a DELETE, the trigger returns NULL if it wants to skip the DELETE. If you want the DELETE to proceed, then return a row value: either SOURCE_OLD or TARGET works. When the conflicting operation is either INSERT or UPDATE, and the chosen resolution is the deletion of the conflicting row, the trigger must explicitly perform the deletion and return NULL. The trigger function can perform other SQL actions as it chooses, but those actions are only applied locally, not replicated.

When a real data conflict occurs between two or more nodes, two or more concurrent changes are occurring. When we apply those changes, the conflict resolution occurs independently on each node. This means the conflict resolution occurs once on each node and can occur with a significant time difference between them. As a result, communication between the multiple executions of the conflict trigger isn't possible. It is the responsibility of the author of the conflict trigger to ensure that the trigger gives exactly the same result for all related events. Otherwise, data divergence occurs. Technical Support recommends that you formally test all conflict triggers using the isolationtester tool supplied with PGD.

Warning
  • You can specify multiple conflict triggers on a single table, but they must match a distinct event. That is, each conflict must match only a single conflict trigger.
  • We don't recommend multiple triggers matching the same event on the same table. They might result in inconsistent behavior and will not be allowed in a future release.

If the same conflict trigger matches more than one event, you can use the TG_OP variable in the trigger to identify the operation that produced the conflict.

By default, PGD detects conflicts by observing a change of replication origin for a row. Hence, you can call a conflict trigger even when only one change is occurring. Since, in this case, there's no real conflict, this conflict detection mechanism can generate false-positive conflicts. The conflict trigger must handle all of those identically.

In some cases, timestamp conflict detection doesn't detect a conflict at all. For example, in a concurrent UPDATE/DELETE where the DELETE occurs just after the UPDATE, any nodes that see first the UPDATE and then the DELETE don't see any conflict. If no conflict is seen, the conflict trigger are never called. In the same situation but using row version conflict detection, a conflict is seen, which a conflict trigger can then handle.

The trigger function has access to additional state information as well as the data row involved in the conflict, depending on the operation type:

  • On INSERT, conflict triggers can access the SOURCE_NEW row from the source and TARGET row.
  • On UPDATE, conflict triggers can access the SOURCE_OLD and SOURCE_NEW row from the source and TARGET row.
  • On DELETE, conflict triggers can access the SOURCE_OLD row from the source and TARGET row.

You can use the function bdr.trigger_get_row() to retrieve SOURCE_OLD, SOURCE_NEW, or TARGET rows, if a value exists for that operation.

Changes to conflict triggers happen transactionally and are protected by global DML locks during replication of the configuration change, similarly to how some variants of ALTER TABLE are handled.

If primary keys are updated inside a conflict trigger, it can sometimes lead to unique constraint violations errors due to a difference in timing of execution. Hence, avoid updating primary keys in conflict triggers.

Transform triggers

These triggers are similar to conflict triggers, except they are executed for every row on the data stream against the specific table. The behavior of return values and the exposed variables is similar, but transform triggers execute before a target row is identified, so there is no TARGET row.

You can specify multiple transform triggers on each table in PGD. Transform triggers execute in alphabetical order.

A transform trigger can filter away rows, and it can do additional operations as needed. It can alter the values of any column or set them to NULL. The return value decides the further action taken:

  • If the trigger function returns a row, it's applied to the target.
  • If the trigger function returns a NULL row, there's no further action to perform. Unexecuted triggers never execute.
  • The trigger function can perform other actions as it chooses.

The trigger function has access to additional state information as well as rows involved in the conflict:

  • On INSERT, transform triggers can access the SOURCE_NEW row from the source.
  • On UPDATE, transform triggers can access the SOURCE_OLD and SOURCE_NEW row from the source.
  • On DELETE, transform triggers can access the SOURCE_OLD row from the source.

You can use the function bdr.trigger_get_row() to retrieve SOURCE_OLD or SOURCE_NEW rows. TARGET row isn't available, since this type of trigger executes before such a target row is identified, if any.

Transform triggers look very similar to normal BEFORE row triggers but have these important differences:

  • A transform trigger gets called for every incoming change. BEFORE triggers aren't called at all for UPDATE and DELETE changes if a matching row in a table isn't found.

  • Transform triggers are called before partition table routing occurs.

  • Transform triggers have access to the lookup key via SOURCE_OLD, which isn't available to normal SQL triggers.

Stream triggers variables

Both conflict triggers and transform triggers have access to information about rows and metadata by way of the predefined variables provided by the trigger API and additional information functions provided by PGD.

In PL/pgSQL, you can use the predefined variables that follow.

TG_NAME

Data type name. This variable contains the name of the trigger actually fired. The actual trigger name has a '_bdrt' or '_bdrc' suffix (depending on trigger type) compared to the name provided during trigger creation.

TG_WHEN

Data type text. This variable says BEFORE for both conflict and transform triggers. You can get the stream trigger type by calling the bdr.trigger_get_type() information function. See bdr.trigger_get_type.

TG_LEVEL

Data type text: a string of ROW.

TG_OP

Data type text: a string of INSERT, UPDATE, or DELETE identifying the operation for which the trigger was fired.

TG_RELID

Data type oid: the object ID of the table that caused the trigger invocation.

TG_TABLE_NAME

Data type name: the name of the table that caused the trigger invocation.

TG_TABLE_SCHEMA

Data type name: the name of the schema of the table that caused the trigger invocation. For partitioned tables, this is the name of the root table.

TG_NARGS

Data type integer: the number of arguments given to the trigger function in the bdr.create_conflict_trigger() or bdr.create_transform_trigger() statement.

TG_ARGV[]

Data type array of text: the arguments from the bdr.create_conflict_trigger() or bdr.create_transform_trigger() statement. The index counts from 0. Invalid indexes (less than 0 or greater than or equal to TG_NARGS) result in a NULL value.

Information functions

bdr.trigger_get_row

This function returns the contents of a trigger row specified by an identifier as a RECORD. This function returns NULL if called inappropriately, that is, called with SOURCE_NEW when the operation type (TG_OP) is DELETE.

Synopsis

bdr.trigger_get_row(row_id text)

Parameters

  • row_id identifier of the row. Can be any of SOURCE_NEW, SOURCE_OLD, and TARGET, depending on the trigger type and operation (see description of individual trigger types).

bdr.trigger_get_committs

This function returns the commit timestamp of a trigger row specified by an identifier. If not available because a row is frozen or isn't available, returns NULL. Always returns NULL for row identifier SOURCE_OLD.

Synopsis

bdr.trigger_get_committs(row_id text)

Parameters

  • row_id Identifier of the row. Can be any of SOURCE_NEW, SOURCE_OLD, and TARGET, depending on trigger type and operation (see description of individual trigger types).

bdr.trigger_get_xid

This function returns the local transaction id of a TARGET row specified by an identifier. If not available because a row is frozen or isn't available, returns NULL. Always returns NULL for SOURCE_OLD and SOURCE_NEW row identifiers.

Available only for conflict triggers.

Synopsis

bdr.trigger_get_xid(row_id text)

Parameters

  • row_id Identifier of the row. Can be any of SOURCE_NEW, SOURCE_OLD, and TARGET, depending on trigger type and operation (see description of individual trigger types).

bdr.trigger_get_type

This function returns the current trigger type, which can be CONFLICT or TRANSFORM. Returns null if called outside a stream trigger.

Synopsis

bdr.trigger_get_type()

bdr.trigger_get_conflict_type

This function returns the current conflict type if called inside a conflict trigger. Otherwise, returns NULL.

See Conflict types for possible return values of this function.

Synopsis

bdr.trigger_get_conflict_type()

bdr.trigger_get_origin_node_id

This function returns the node id corresponding to the origin for the trigger row_id passed in as argument. If the origin isn't valid (which means the row originated locally), returns the node id of the source or target node, depending on the trigger row argument. Always returns NULL for row identifier SOURCE_OLD. You can use this function to define conflict triggers to always favor a trusted source node.

Synopsis

bdr.trigger_get_origin_node_id(row_id text)

Parameters

  • row_id Identifier of the row. Can be any of SOURCE_NEW, SOURCE_OLD, and TARGET, depending on trigger type and operation (see description of individual trigger types).

bdr.ri_fkey_on_del_trigger

When called as a BEFORE trigger, this function uses FOREIGN KEY information to avoid FK anomalies.

Synopsis

bdr.ri_fkey_on_del_trigger()

Row contents

The SOURCE_NEW, SOURCE_OLD, and TARGET contents depend on the operation, REPLICA IDENTITY setting of a table, and the contents of the target table.

The TARGET row is available only in conflict triggers. The TARGET row contains data only if a row was found when applying UPDATE or DELETE in the target table. If the row isn't found, the TARGET is NULL.

Triggers notes

Execution order for triggers:

  • Transform triggers Execute once for each incoming row on the target.
  • Normal triggers Execute once per row.
  • Conflict triggers Execute once per row where a conflict exists.

Stream triggers manipulation interfaces

You can create stream triggers only on tables with REPLICA IDENTITY FULL or tables without any columns to which TOAST applies.

bdr.create_conflict_trigger

This function creates a new conflict trigger.

Synopsis

bdr.create_conflict_trigger(trigger_name text,
                            events text[],
                            relation regclass,
                            function regprocedure,
                            args text[] DEFAULT '{}')

Parameters

  • trigger_name Name of the new trigger.
  • events Array of events on which to fire this trigger. Valid values are 'INSERT', 'UPDATE', and 'DELETE'.
  • relation Relation to fire this trigger for.
  • function The function to execute.
  • args Optional. Specifies the array of parameters the trigger function receives on execution (contents of TG_ARGV variable).

Notes

This function uses the same replication mechanism as DDL statements. This means that the replication is affected by the ddl filters configuration.

The function takes a global DML lock on the relation on which the trigger is being created.

This function is transactional. You can roll back the effects with the ROLLBACK of the transaction. The changes are visible to the current transaction.

Similar to normal PostgreSQL triggers, the bdr.create_conflict_trigger function requires TRIGGER privilege on the relation and EXECUTE privilege on the function. This applies with a bdr.backwards_compatibility of 30619 or above. Additional security rules apply in PGD to all triggers including conflict triggers. See Security and roles.

bdr.create_transform_trigger

This function creates a transform trigger.

Synopsis

bdr.create_transform_trigger(trigger_name text,
                             events text[],
                             relation regclass,
                             function regprocedure,
                             args text[] DEFAULT '{}')

Parameters

  • trigger_name Name of the new trigger.
  • events Array of events on which to fire this trigger. Valid values are 'INSERT', 'UPDATE', and 'DELETE'.
  • relation Relation to fire this trigger for.
  • function The function to execute.
  • args Optional. Specify array of parameters the trigger function receives on execution (contents of TG_ARGV variable).

Notes

This function uses the same replication mechanism as DDL statements. This means that the replication is affected by the ddl filters configuration.

The function takes a global DML lock on the relation on which the trigger is being created.

This function is transactional. You can roll back the effects with the ROLLBACK of the transaction. The changes are visible to the current transaction.

Similarly to normal PostgreSQL triggers, the bdr.create_transform_trigger function requires the TRIGGER privilege on the relation and EXECUTE privilege on the function. Additional security rules apply in PGD to all triggers including transform triggers. See Security and roles.

bdr.drop_trigger

This function removes an existing stream trigger (both conflict and transform).

Synopsis

bdr.drop_trigger(trigger_name text,
                 relation regclass,
                 ifexists boolean DEFAULT false)

Parameters

  • trigger_name Name of an existing trigger.
  • relation The relation the trigger is defined for.
  • ifexists When set to true, this function ignores missing triggers.

Notes

This function uses the same replication mechanism as DDL statements. This means that the replication is affected by the ddl filters configuration.

The function takes a global DML lock on the relation on which the trigger is being created.

This function is transactional. You can roll back the effects with the ROLLBACK of the transaction. The changes are visible to the current transaction.

Only the owner of the relation can execute the bdr.drop_trigger function.

Stream triggers examples

A conflict trigger that provides similar behavior as the update_if_newer conflict resolver:

CREATE OR REPLACE FUNCTION update_if_newer_trig_func
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
    IF (bdr.trigger_get_committs('TARGET') >
        bdr.trigger_get_committs('SOURCE_NEW')) THEN
    RETURN TARGET;
    ELSIF
        RETURN SOURCE;
    END IF;
END;
$$;

A conflict trigger that applies a delta change on a counter column and uses SOURCE_NEW for all other columns:

CREATE OR REPLACE FUNCTION delta_count_trg_func
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
    DELTA bigint;
    SOURCE_OLD record;
    SOURCE_NEW record;
    TARGET record;
BEGIN
    SOURCE_OLD := bdr.trigger_get_row('SOURCE_OLD');
    SOURCE_NEW := bdr.trigger_get_row('SOURCE_NEW');
    TARGET := bdr.trigger_get_row('TARGET');

    DELTA := SOURCE_NEW.counter - SOURCE_OLD.counter;
    SOURCE_NEW.counter = TARGET.counter + DELTA;

    RETURN SOURCE_NEW;
END;
$$;

A transform trigger that logs all changes to a log table instead of applying them:

CREATE OR REPLACE FUNCTION log_change
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
    SOURCE_NEW record;
    SOURCE_OLD record;
    COMMITTS timestamptz;
BEGIN
    SOURCE_NEW := bdr.trigger_get_row('SOURCE_NEW');
    SOURCE_OLD := bdr.trigger_get_row('SOURCE_OLD');
    COMMITTS := bdr.trigger_get_committs('SOURCE_NEW');

    IF (TG_OP = 'INSERT') THEN
        INSERT INTO log SELECT 'I', COMMITTS, row_to_json(SOURCE_NEW);
    ELSIF (TG_OP = 'UPDATE') THEN
        INSERT INTO log SELECT 'U', COMMITTS, row_to_json(SOURCE_NEW);
    ELSIF (TG_OP = 'DELETE') THEN
        INSERT INTO log SELECT 'D', COMMITTS, row_to_json(SOURCE_OLD);
    END IF;

    RETURN NULL; -- do not apply the change
END;
$$;

This example shows a conflict trigger that implements trusted source conflict detection, also known as trusted site, preferred node, or Always Wins resolution. This uses the bdr.trigger_get_origin_node_id() function to provide a solution that works with three or more nodes.

CREATE OR REPLACE FUNCTION test_conflict_trigger()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
	SOURCE	record;
	TARGET	record;

	TRUSTED_NODE	bigint;
	SOURCE_NODE		bigint;
	TARGET_NODE		bigint;
BEGIN
	TARGET := bdr.trigger_get_row('TARGET');
	IF (TG_OP = 'DELETE')
		SOURCE := bdr.trigger_get_row('SOURCE_OLD');
	ELSE
		SOURCE := bdr.trigger_get_row('SOURCE_NEW');
	END IF;

	TRUSTED_NODE := current_setting('customer.trusted_node_id');

	SOURCE_NODE := bdr.trigger_get_origin_node_id('SOURCE_NEW');
	TARGET_NODE := bdr.trigger_get_origin_node_id('TARGET');

	IF (TRUSTED_NODE = SOURCE_NODE) THEN
		RETURN SOURCE;
	ELSIF (TRUSTED_NODE = TARGET_NODE) THEN
		RETURN TARGET;
	ELSE
		RETURN NULL; -- do not apply the change
	END IF;
END;
$$;