Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion go/vt/sidecardb/schema/onlineddl/schema_migrations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ CREATE TABLE IF NOT EXISTS schema_migrations
`is_immediate_operation` tinyint unsigned NOT NULL DEFAULT '0',
`reviewed_timestamp` timestamp NULL DEFAULT NULL,
`ready_to_complete_timestamp` timestamp NULL DEFAULT NULL,
`shadow_analyzed_timestamp` timestamp NULL DEFAULT NULL,
`shadow_analyzed_timestamp` timestamp NULL DEFAULT NULL,
`removed_foreign_key_names` text NOT NULL,
`last_cutover_attempt_timestamp` timestamp NULL DEFAULT NULL,
`force_cutover` tinyint unsigned NOT NULL DEFAULT '0',
`cutover_threshold_seconds` int unsigned NOT NULL DEFAULT '0',
`dependent_migrations` text NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uuid_idx` (`migration_uuid`),
KEY `keyspace_shard_idx` (`keyspace`(64), `shard`(64)),
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtctl/grpcvtctldserver/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func rowToSchemaMigration(row sqltypes.RowNamedValues) (sm *vtctldatapb.SchemaMi
sm.VitessLivenessIndicator = row.AsInt64("vitess_liveness_indicator", 0)
sm.UserThrottleRatio = float32(row.AsFloat64("user_throttle_ratio", 0))
sm.SpecialPlan = row.AsString("special_plan", "")
sm.DependentMigrations = row.AsString("dependent_migrations", "")

sm.LastThrottledAt, err = valueToVTTime(row.AsString("last_throttled_timestamp", ""))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1807,7 +1807,7 @@ func (s *VtctldServer) GetSchema(ctx context.Context, req *vtctldatapb.GetSchema
}

func (s *VtctldServer) GetSchemaMigrations(ctx context.Context, req *vtctldatapb.GetSchemaMigrationsRequest) (resp *vtctldatapb.GetSchemaMigrationsResponse, err error) {
span, ctx := trace.NewSpan(ctx, "VtctldServer.GetShard")
span, ctx := trace.NewSpan(ctx, "VtctldServer.GetSchemaMigrations")
defer span.Finish()

defer panicHandler(&err)
Expand Down
87 changes: 79 additions & 8 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3098,6 +3098,20 @@ func shouldCutOverAccordingToBackoff(
return false, false
}

// getDependentMigrations returns a slice of migrations that must cut-over in-order, before the provided
// migration.
func getDependentMigrations(onlineDDL *schema.OnlineDDL, pendingMigrationsUUIDs []string) []string {
dependentMigrations := make([]string, 0, len(pendingMigrationsUUIDs))
for _, pendingMigrationsUUID := range pendingMigrationsUUIDs {
if pendingMigrationsUUID == onlineDDL.UUID {
// found all dependencies if we found ourself
break
}
dependentMigrations = append(dependentMigrations, pendingMigrationsUUID)
}
return dependentMigrations
}

// reviewRunningMigrations iterates migrations in 'running' state. Normally there's only one running, which was
// spawned by this tablet; but vreplication migrations could also resume from failure.
func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning int, cancellable []*cancellableMigration, err error) {
Expand Down Expand Up @@ -3245,19 +3259,23 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
// In the case of a postponed migration, we will not complete it, but the user will
// understand whether "now is a good time" or "not there yet"
_ = e.updateMigrationReadyToComplete(ctx, uuid, isReady)
var postponeInOrderCompletion bool
if strategySetting.IsInOrderCompletion() {
if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID {
if err = e.updateDependentMigrations(ctx, onlineDDL.UUID, getDependentMigrations(onlineDDL, pendingMigrationsUUIDs)); err != nil {
return err
}
postponeInOrderCompletion = true
}
}
if !isReady {
// The migration is not ready yet.
return nil
}
if postponeCompletion {
if postponeCompletion || postponeInOrderCompletion {
// override. Even if migration is ready, we do not complete it.
return nil
}
if strategySetting.IsInOrderCompletion() {
if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID {
// wait for earlier pending migrations to complete
return nil
}
}
shouldCutOver, shouldForceCutOver := shouldCutOverAccordingToBackoff(
shouldForceCutOver, forceCutOverAfter, sinceReadyToComplete, sinceLastCutoverAttempt, cutoverAttempts,
)
Expand Down Expand Up @@ -3773,8 +3791,17 @@ func (e *Executor) updateMigrationStatusFailedOrCancelled(ctx context.Context, u
}

func (e *Executor) updateMigrationStatus(ctx context.Context, uuid string, status schema.OnlineDDLStatus) error {
var sqlUpdate string
switch status {
case schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled:
// sqlUpdateMigrationStatusFinal represents a final update to migration status.
sqlUpdate = sqlUpdateMigrationStatusFinal
default:
sqlUpdate = sqlUpdateMigrationStatus
}

log.Infof("updateMigrationStatus: transitioning migration: %s into status: %s", uuid, string(status))
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationStatus,
query, err := sqlparser.ParseAndBind(sqlUpdate,
sqltypes.StringBindVariable(string(status)),
sqltypes.StringBindVariable(uuid),
)
Expand Down Expand Up @@ -4063,6 +4090,25 @@ func (e *Executor) updateMigrationUserThrottleRatio(ctx context.Context, uuid st
return err
}

func (e *Executor) updateDependentMigrations(
ctx context.Context,
uuid string,
dependentMigrations []string,
) error {
if len(dependentMigrations) == 0 {
return nil
}
query, err := sqlparser.ParseAndBind(sqlUpdateDependentMigrations,
sqltypes.StringBindVariable(strings.Join(dependentMigrations, ",")),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

// retryMigrationWhere retries a migration based on a given WHERE clause
func (e *Executor) retryMigrationWhere(ctx context.Context, whereExpr string) (result *sqltypes.Result, err error) {
e.migrationMutex.Lock()
Expand Down Expand Up @@ -4090,8 +4136,23 @@ func (e *Executor) RetryMigration(ctx context.Context, uuid string) (result *sql
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

onlineDDL, _, err := e.readMigration(ctx, uuid)
if err != nil {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, err.Error())
}

var dependentMigrations []string
if onlineDDL.StrategySetting().IsInOrderCompletion() {
pendingMigrationsUUIDs, err := e.readPendingMigrationsUUIDs(ctx)
if err != nil {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, err.Error())
}
dependentMigrations = getDependentMigrations(onlineDDL, pendingMigrationsUUIDs)
}

query, err := sqlparser.ParseAndBind(sqlRetryMigration,
sqltypes.StringBindVariable(e.TabletAliasString()),
sqltypes.StringBindVariable(strings.Join(dependentMigrations, ",")),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
Expand Down Expand Up @@ -4577,6 +4638,15 @@ func (e *Executor) SubmitMigration(
}
log.Infof("SubmitMigration: request to submit migration %s; action=%s, table=%s", onlineDDL.UUID, actionStr, onlineDDL.Table)

var dependentMigrations []string
if onlineDDL.StrategySetting().IsInOrderCompletion() {
pendingMigrationsUUIDs, err := e.readPendingMigrationsUUIDs(ctx)
if err != nil {
return nil, err
}
dependentMigrations = getDependentMigrations(onlineDDL, pendingMigrationsUUIDs)
}

revertedUUID, _ := onlineDDL.GetRevertUUID(e.env.Environment().Parser()) // Empty value if the migration is not actually a REVERT. Safe to ignore error.
retainArtifactsSeconds := int64((retainOnlineDDLTables).Seconds())
if retainArtifacts, _ := onlineDDL.StrategySetting().RetainArtifactsDuration(); retainArtifacts != 0 {
Expand Down Expand Up @@ -4612,6 +4682,7 @@ func (e *Executor) SubmitMigration(
sqltypes.BoolBindVariable(allowConcurrentMigration),
sqltypes.StringBindVariable(revertedUUID),
sqltypes.BoolBindVariable(onlineDDL.IsView(e.env.Environment().Parser())),
sqltypes.StringBindVariable(strings.Join(dependentMigrations, ",")),
)
if err != nil {
return nil, err
Expand Down
22 changes: 18 additions & 4 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ const (
postpone_completion,
allow_concurrent,
reverted_uuid,
is_view
is_view,
dependent_migrations
) VALUES (
%a, %a, %a, %a, %a, %a, %a, %a, %a, NOW(6), %a, %a, %a, %a, %a, %a, %a, %a, %a, %a
%a, %a, %a, %a, %a, %a, %a, %a, %a, NOW(6), %a, %a, %a, %a, %a, %a, %a, %a, %a, %a, %a
)`

sqlSelectQueuedMigrations = `SELECT
Expand All @@ -66,9 +67,16 @@ const (
WHERE
migration_uuid=%a
`
sqlUpdateMigrationStatusFinal = `UPDATE _vt.schema_migrations
SET migration_status=%a,
dependent_migrations=''
WHERE
migration_uuid=%a
`
sqlUpdateMigrationStatusFailedOrCancelled = `UPDATE _vt.schema_migrations
SET migration_status=IF(cancelled_timestamp IS NULL, 'failed', 'cancelled'),
completed_timestamp=NOW(6)
completed_timestamp=NOW(6),
dependent_migrations=''
WHERE
migration_uuid=%a
`
Expand Down Expand Up @@ -139,6 +147,11 @@ const (
WHERE
migration_uuid=%a
`
sqlUpdateDependentMigrations = `UPDATE _vt.schema_migrations
SET dependent_migrations=%a
WHERE
migration_uuid=%a
`
sqlUpdateArtifacts = `UPDATE _vt.schema_migrations
SET artifacts=concat(%a, ',', artifacts), cleanup_timestamp=NULL
WHERE
Expand Down Expand Up @@ -310,7 +323,8 @@ const (
completed_timestamp=NULL,
last_cutover_attempt_timestamp=NULL,
shadow_analyzed_timestamp=NULL,
cleanup_timestamp=NULL
cleanup_timestamp=NULL,
dependent_migrations=%a
WHERE
migration_status IN ('failed', 'cancelled')
AND migration_uuid=%a
Expand Down
1 change: 1 addition & 0 deletions proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ message SchemaMigration {
vttime.Time reviewed_at = 52;
vttime.Time ready_to_complete_at = 53;
string removed_foreign_key_names = 54;
string dependent_migrations = 55;

enum Strategy {
option allow_alias = true;
Expand Down
6 changes: 6 additions & 0 deletions web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading