Skip to content

Add trigger configuration to backfill.Task #931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions pkg/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"database/sql"
"errors"
"fmt"
"strings"
"time"

"github.com/lib/pq"
Expand All @@ -22,13 +23,16 @@
// Task represents a backfill task for a specific table from an operation.
type Task struct {
table *schema.Table
triggers []TriggerConfig
triggers []OperationTrigger
}

// Job is a collection of all tables that need to be backfilled and their associated triggers.
type Job struct {
Tables []*schema.Table
triggers []TriggerConfig
schemaName string
latestSchema string
triggers map[string]triggerConfig

Tables []*schema.Table
}

type Backfill struct {
Expand All @@ -38,16 +42,56 @@

type CallbackFn func(done int64, total int64)

func NewTask(table *schema.Table, triggers ...TriggerConfig) *Task {

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / code generation

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / dead code check

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 15.3, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 15.3, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 16.4, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 16.4, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 14.8, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 14.8, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 16.4, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 16.4, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: latest, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: latest, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 17.0, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 17.0, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 17.0, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 17.0, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 15.3, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 15.3, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: latest, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: latest, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 14.8, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / test (pg: 14.8, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / examples (pg: 16.4, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / examples (pg: 15.3, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / lint

undefined: TriggerConfig) (typecheck)

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / lint

undefined: TriggerConfig (typecheck)

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / lint

undefined: TriggerConfig) (typecheck)

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / lint

undefined: TriggerConfig) (typecheck)

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / examples (pg: 14.8, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / examples (pg: 14.8, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / examples (pg: latest, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / examples (pg: 17.0, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / examples (pg: 15.3, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / examples (pg: latest, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / examples (pg: 16.4, schema: non_public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / examples (pg: 17.0, schema: public)

undefined: TriggerConfig

Check failure on line 45 in pkg/backfill/backfill.go

View workflow job for this annotation

GitHub Actions / cross-platform build

undefined: TriggerConfig
return &Task{
table: table,
triggers: triggers,
}
}

func NewJob(schemaName, latestSchema string) *Job {
return &Job{
schemaName: schemaName,
latestSchema: latestSchema,
triggers: make(map[string]triggerConfig, 0),
Tables: make([]*schema.Table, 0),
}
}

func (j *Job) AddTask(t *Task) {
j.Tables = append(j.Tables, t.table)
j.triggers = append(j.triggers, t.triggers...)

for _, trigger := range t.triggers {
if tg, exists := j.triggers[trigger.Name]; exists {
tg.SQL = append(tg.SQL, rewriteTriggerSQL(trigger.SQL, findColumnName(tg.Columns, tg.PhysicalColumn), tg.PhysicalColumn))
j.triggers[trigger.Name] = tg
} else {
j.triggers[trigger.Name] = triggerConfig{
Name: trigger.Name,
Direction: trigger.Direction,
Columns: trigger.Columns,
SchemaName: j.schemaName,
TableName: t.table.Name,
PhysicalColumn: trigger.PhysicalColumn,
LatestSchema: j.latestSchema,
SQL: []string{trigger.SQL},
NeedsBackfillColumn: CNeedsBackfillColumn,
}
}
}
}

func rewriteTriggerSQL(sql string, from, to string) string {
return strings.ReplaceAll(sql, from, fmt.Sprintf("NEW.%s", pq.QuoteIdentifier(to)))
}

func findColumnName(columns map[string]*schema.Column, columnName string) string {
for name, col := range columns {
if col.Name == columnName {
return name
}
}
return columnName
}

// New creates a new backfill operation with the given options. The backfill is
Expand All @@ -63,7 +107,15 @@

// CreateTriggers creates the triggers for the tables before starting the backfill.
func (bf *Backfill) CreateTriggers(ctx context.Context, j *Job) error {
// Not yet implemented, triggers are loaded during the Start method.
for _, trigger := range j.triggers {
a := &createTriggerAction{
conn: bf.conn,
cfg: trigger,
}
if err := a.execute(ctx); err != nil {
return fmt.Errorf("creating trigger %q: %w", trigger.Name, err)
}
}
return nil
}

Expand Down
91 changes: 89 additions & 2 deletions pkg/backfill/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
package backfill

import (
"bytes"
"context"
"database/sql"
"fmt"
"html/template"

"github.com/lib/pq"
"github.com/xataio/pgroll/pkg/db"
"github.com/xataio/pgroll/pkg/migrations/templates"
"github.com/xataio/pgroll/pkg/schema"
)

Expand All @@ -13,18 +22,96 @@ const (
TriggerDirectionDown TriggerDirection = "down"
)

type TriggerConfig struct {
type triggerConfig struct {
Name string
Direction TriggerDirection
Columns map[string]*schema.Column
SchemaName string
TableName string
PhysicalColumn string
LatestSchema string
SQL string
SQL []string
NeedsBackfillColumn string
}

type OperationTrigger struct {
Name string
Direction TriggerDirection
Columns map[string]*schema.Column
TableName string
PhysicalColumn string
SQL string
}

type createTriggerAction struct {
conn db.DB
cfg triggerConfig
}

func (a *createTriggerAction) execute(ctx context.Context) error {
// Parenthesize the up/down SQL if it's not parenthesized already
for i, sql := range a.cfg.SQL {
if len(sql) > 0 && sql[0] != '(' {
a.cfg.SQL[i] = "(" + sql + ")"
}
}

a.cfg.NeedsBackfillColumn = CNeedsBackfillColumn

funcSQL, err := buildFunction(a.cfg)
if err != nil {
return err
}

triggerSQL, err := buildTrigger(a.cfg)
if err != nil {
return err
}

return a.conn.WithRetryableTransaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
_, err := a.conn.ExecContext(ctx,
fmt.Sprintf("ALTER TABLE %s ADD COLUMN IF NOT EXISTS %s boolean DEFAULT true",
pq.QuoteIdentifier(a.cfg.TableName),
pq.QuoteIdentifier(CNeedsBackfillColumn)))
if err != nil {
return err
}

_, err = a.conn.ExecContext(ctx, funcSQL)
if err != nil {
return err
}

_, err = a.conn.ExecContext(ctx, triggerSQL)
return err
})
}

func buildFunction(cfg triggerConfig) (string, error) {
return executeTemplate("function", templates.Function, cfg)
}

func buildTrigger(cfg triggerConfig) (string, error) {
return executeTemplate("trigger", templates.Trigger, cfg)
}

func executeTemplate(name, content string, cfg triggerConfig) (string, error) {
tmpl := template.Must(template.
New(name).
Funcs(template.FuncMap{
"ql": pq.QuoteLiteral,
"qi": pq.QuoteIdentifier,
}).
Parse(content))

buf := bytes.Buffer{}
if err := tmpl.Execute(&buf, cfg); err != nil {
return "", err
}

return buf.String(), nil
}

// TriggerFunctionName returns the name of the trigger function
// for a given table and column.
func TriggerFunctionName(tableName, columnName string) string {
Expand Down
14 changes: 5 additions & 9 deletions pkg/migrations/op_add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,18 @@

var task *backfill.Task
if o.Up != "" {
err := NewCreateTriggerAction(conn,
backfill.TriggerConfig{
var triggers []backfill.OperationTrigger
triggers = append(triggers,
backfill.OperationTrigger{
Name: backfill.TriggerName(o.Table, o.Column.Name),
Direction: backfill.TriggerDirectionUp,
Columns: table.Columns,
SchemaName: s.Name,
LatestSchema: latestSchema,
TableName: table.Name,
PhysicalColumn: TemporaryName(o.Column.Name),
SQL: o.Up,
},
).Execute(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create trigger: %w", err)
}
task = backfill.NewTask(table)
)
task = backfill.NewTask(table, triggers...)

Check failure on line 116 in pkg/migrations/op_add_column.go

View workflow job for this annotation

GitHub Actions / dead code check

cannot use triggers (variable of type []backfill.OperationTrigger) as []invalid type value in argument to backfill.NewTask

Check failure on line 116 in pkg/migrations/op_add_column.go

View workflow job for this annotation

GitHub Actions / dead code check

cannot use triggers (variable of type []backfill.OperationTrigger) as []invalid type value in argument to backfill.NewTask
}

tmpColumn := toSchemaColumn(o.Column)
Expand Down
29 changes: 10 additions & 19 deletions pkg/migrations/op_alter_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,17 @@ func (o *OpAlterColumn) Start(ctx context.Context, l Logger, conn db.DB, latestS
}

// Add a trigger to copy values from the old column to the new, rewriting values using the `up` SQL.
err := NewCreateTriggerAction(conn,
backfill.TriggerConfig{
var triggers []backfill.OperationTrigger
triggers = append(triggers,
backfill.OperationTrigger{
Name: backfill.TriggerName(o.Table, o.Column),
Direction: backfill.TriggerDirectionUp,
Columns: table.Columns,
SchemaName: s.Name,
LatestSchema: latestSchema,
TableName: table.Name,
Columns: table.Columns,
PhysicalColumn: TemporaryName(o.Column),
SQL: o.upSQLForOperations(ops),
},
).Execute(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create up trigger: %w", err)
}
)

// Add the new column to the internal schema representation. This is done
// here, before creation of the down trigger, so that the trigger can declare
Expand All @@ -65,21 +61,16 @@ func (o *OpAlterColumn) Start(ctx context.Context, l Logger, conn db.DB, latestS
})

// Add a trigger to copy values from the new column to the old.
err = NewCreateTriggerAction(conn,
backfill.TriggerConfig{
triggers = append(triggers,
backfill.OperationTrigger{
Name: backfill.TriggerName(o.Table, TemporaryName(o.Column)),
Direction: backfill.TriggerDirectionDown,
Columns: table.Columns,
LatestSchema: latestSchema,
SchemaName: s.Name,
TableName: table.Name,
Columns: table.Columns,
PhysicalColumn: oldPhysicalColumn,
SQL: o.downSQLForOperations(ops),
},
).Execute(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create down trigger: %w", err)
}
)

// perform any operation specific start steps
for _, op := range ops {
Expand All @@ -88,7 +79,7 @@ func (o *OpAlterColumn) Start(ctx context.Context, l Logger, conn db.DB, latestS
}
}

return backfill.NewTask(table), nil
return backfill.NewTask(table, triggers...), nil
}

func (o *OpAlterColumn) Complete(ctx context.Context, l Logger, conn db.DB, s *schema.Schema) error {
Expand Down
25 changes: 8 additions & 17 deletions pkg/migrations/op_create_constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,19 @@ func (o *OpCreateConstraint) Start(ctx context.Context, l Logger, conn db.DB, la
}

// Setup triggers
var triggers []backfill.OperationTrigger
for _, colName := range o.Columns {
upSQL := o.Up[colName]
err := NewCreateTriggerAction(conn,
backfill.TriggerConfig{
triggers = append(triggers,
backfill.OperationTrigger{
Name: backfill.TriggerName(o.Table, colName),
Direction: backfill.TriggerDirectionUp,
Columns: table.Columns,
SchemaName: s.Name,
LatestSchema: latestSchema,
TableName: table.Name,
PhysicalColumn: TemporaryName(colName),
SQL: upSQL,
},
).Execute(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create up trigger: %w", err)
}
)

// Add the new column to the internal schema representation. This is done
// here, before creation of the down trigger, so that the trigger can declare
Expand All @@ -70,24 +66,19 @@ func (o *OpCreateConstraint) Start(ctx context.Context, l Logger, conn db.DB, la
})

downSQL := o.Down[colName]
err = NewCreateTriggerAction(conn,
backfill.TriggerConfig{
triggers = append(triggers,
backfill.OperationTrigger{
Name: backfill.TriggerName(o.Table, TemporaryName(colName)),
Direction: backfill.TriggerDirectionDown,
Columns: table.Columns,
LatestSchema: latestSchema,
SchemaName: s.Name,
TableName: table.Name,
PhysicalColumn: oldPhysicalColumn,
SQL: downSQL,
},
).Execute(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create down trigger: %w", err)
}
)
}

task := backfill.NewTask(table)
task := backfill.NewTask(table, triggers...)

switch o.Type {
case OpCreateConstraintTypeUnique, OpCreateConstraintTypePrimaryKey:
Expand Down
17 changes: 8 additions & 9 deletions pkg/migrations/op_drop_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,18 @@
func (o *OpDropColumn) Start(ctx context.Context, l Logger, conn db.DB, latestSchema string, s *schema.Schema) (*backfill.Task, error) {
l.LogOperationStart(o)

var triggers []backfill.OperationTrigger
if o.Down != "" {
err := NewCreateTriggerAction(conn,
backfill.TriggerConfig{
triggers = append(triggers,
backfill.OperationTrigger{
Name: backfill.TriggerName(o.Table, o.Column),
Direction: backfill.TriggerDirectionDown,
Columns: s.GetTable(o.Table).Columns,
SchemaName: s.Name,
LatestSchema: latestSchema,
TableName: s.GetTable(o.Table).Name,
PhysicalColumn: o.Column,
SQL: o.Down,
},
).Execute(ctx)
if err != nil {
return nil, err
}
)
}

table := s.GetTable(o.Table)
Expand All @@ -46,7 +42,10 @@
}

s.GetTable(o.Table).RemoveColumn(o.Column)
return nil, nil

return &backfill.Task{
Triggers: triggers,

Check failure on line 47 in pkg/migrations/op_drop_column.go

View workflow job for this annotation

GitHub Actions / dead code check

unknown field Triggers in struct literal of type backfill.Task, but does have unexported triggers

Check failure on line 47 in pkg/migrations/op_drop_column.go

View workflow job for this annotation

GitHub Actions / dead code check

unknown field Triggers in struct literal of type backfill.Task, but does have unexported triggers
}, nil
}

func (o *OpDropColumn) Complete(ctx context.Context, l Logger, conn db.DB, s *schema.Schema) error {
Expand Down
Loading
Loading