diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 40b1a869977..5845086449f 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -1806,26 +1806,39 @@ func testScheduler(t *testing.T) { sqls := []string{ `alter table t1_test force`, `alter table t2_test force`, + `drop table if exists non_existent_1`, + `drop table if exists non_existent_2`, } sql := strings.Join(sqls, ";") var vuuids []string t.Run("apply schema", func(t *testing.T) { uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion --postpone-completion --allow-concurrent", "vtctl", "", "", true)) // skip wait vuuids = strings.Split(uuidList, "\n") - assert.Len(t, vuuids, 2) + assert.Len(t, vuuids, 4) for _, uuid := range vuuids { waitForReadyToComplete(t, uuid, true) } + for i, uuid := range vuuids { + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + inOrderCompletionPendingCount := row.AsUint64("in_order_completion_pending_count", 0) + assert.EqualValues(t, uint64(i), inOrderCompletionPendingCount) + } + } t.Run("cancel 1st migration", func(t *testing.T) { onlineddl.CheckCancelMigration(t, &vtParams, shards, vuuids[0], true) status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, vuuids[0], normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) onlineddl.CheckMigrationStatus(t, &vtParams, shards, vuuids[0], schema.OnlineDDLStatusCancelled) }) - t.Run("expect 2nd migration to fail", func(t *testing.T) { - status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, vuuids[1], normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) - fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) - onlineddl.CheckMigrationStatus(t, &vtParams, shards, vuuids[1], schema.OnlineDDLStatusFailed) + t.Run("expect following migrations to fail", func(t *testing.T) { + for i := 1; i < len(vuuids); i++ { + uuid := vuuids[i] + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + } }) }) }) diff --git a/go/vt/proto/vtctldata/vtctldata.pb.go b/go/vt/proto/vtctldata/vtctldata.pb.go index 178be66cb36..e13aaaad956 100644 --- a/go/vt/proto/vtctldata/vtctldata.pb.go +++ b/go/vt/proto/vtctldata/vtctldata.pb.go @@ -791,63 +791,64 @@ func (x *Keyspace) GetKeyspace() *topodata.Keyspace { // SchemaMigration represents a row in the schema_migrations sidecar table. type SchemaMigration struct { - state protoimpl.MessageState `protogen:"open.v1"` - Uuid string `protobuf:"bytes,1,opt,name=uuid,proto3" json:"uuid,omitempty"` - Keyspace string `protobuf:"bytes,2,opt,name=keyspace,proto3" json:"keyspace,omitempty"` - Shard string `protobuf:"bytes,3,opt,name=shard,proto3" json:"shard,omitempty"` - Schema string `protobuf:"bytes,4,opt,name=schema,proto3" json:"schema,omitempty"` - Table string `protobuf:"bytes,5,opt,name=table,proto3" json:"table,omitempty"` - MigrationStatement string `protobuf:"bytes,6,opt,name=migration_statement,json=migrationStatement,proto3" json:"migration_statement,omitempty"` - Strategy SchemaMigration_Strategy `protobuf:"varint,7,opt,name=strategy,proto3,enum=vtctldata.SchemaMigration_Strategy" json:"strategy,omitempty"` - Options string `protobuf:"bytes,8,opt,name=options,proto3" json:"options,omitempty"` - AddedAt *vttime.Time `protobuf:"bytes,9,opt,name=added_at,json=addedAt,proto3" json:"added_at,omitempty"` - RequestedAt *vttime.Time `protobuf:"bytes,10,opt,name=requested_at,json=requestedAt,proto3" json:"requested_at,omitempty"` - ReadyAt *vttime.Time `protobuf:"bytes,11,opt,name=ready_at,json=readyAt,proto3" json:"ready_at,omitempty"` - StartedAt *vttime.Time `protobuf:"bytes,12,opt,name=started_at,json=startedAt,proto3" json:"started_at,omitempty"` - LivenessTimestamp *vttime.Time `protobuf:"bytes,13,opt,name=liveness_timestamp,json=livenessTimestamp,proto3" json:"liveness_timestamp,omitempty"` - CompletedAt *vttime.Time `protobuf:"bytes,14,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"` - CleanedUpAt *vttime.Time `protobuf:"bytes,15,opt,name=cleaned_up_at,json=cleanedUpAt,proto3" json:"cleaned_up_at,omitempty"` - Status SchemaMigration_Status `protobuf:"varint,16,opt,name=status,proto3,enum=vtctldata.SchemaMigration_Status" json:"status,omitempty"` - LogPath string `protobuf:"bytes,17,opt,name=log_path,json=logPath,proto3" json:"log_path,omitempty"` - Artifacts string `protobuf:"bytes,18,opt,name=artifacts,proto3" json:"artifacts,omitempty"` - Retries uint64 `protobuf:"varint,19,opt,name=retries,proto3" json:"retries,omitempty"` - Tablet *topodata.TabletAlias `protobuf:"bytes,20,opt,name=tablet,proto3" json:"tablet,omitempty"` - TabletFailure bool `protobuf:"varint,21,opt,name=tablet_failure,json=tabletFailure,proto3" json:"tablet_failure,omitempty"` - Progress float32 `protobuf:"fixed32,22,opt,name=progress,proto3" json:"progress,omitempty"` - MigrationContext string `protobuf:"bytes,23,opt,name=migration_context,json=migrationContext,proto3" json:"migration_context,omitempty"` - DdlAction string `protobuf:"bytes,24,opt,name=ddl_action,json=ddlAction,proto3" json:"ddl_action,omitempty"` - Message string `protobuf:"bytes,25,opt,name=message,proto3" json:"message,omitempty"` - EtaSeconds int64 `protobuf:"varint,26,opt,name=eta_seconds,json=etaSeconds,proto3" json:"eta_seconds,omitempty"` - RowsCopied uint64 `protobuf:"varint,27,opt,name=rows_copied,json=rowsCopied,proto3" json:"rows_copied,omitempty"` - TableRows int64 `protobuf:"varint,28,opt,name=table_rows,json=tableRows,proto3" json:"table_rows,omitempty"` - AddedUniqueKeys uint32 `protobuf:"varint,29,opt,name=added_unique_keys,json=addedUniqueKeys,proto3" json:"added_unique_keys,omitempty"` - RemovedUniqueKeys uint32 `protobuf:"varint,30,opt,name=removed_unique_keys,json=removedUniqueKeys,proto3" json:"removed_unique_keys,omitempty"` - LogFile string `protobuf:"bytes,31,opt,name=log_file,json=logFile,proto3" json:"log_file,omitempty"` - ArtifactRetention *vttime.Duration `protobuf:"bytes,32,opt,name=artifact_retention,json=artifactRetention,proto3" json:"artifact_retention,omitempty"` - PostponeCompletion bool `protobuf:"varint,33,opt,name=postpone_completion,json=postponeCompletion,proto3" json:"postpone_completion,omitempty"` - RemovedUniqueKeyNames string `protobuf:"bytes,34,opt,name=removed_unique_key_names,json=removedUniqueKeyNames,proto3" json:"removed_unique_key_names,omitempty"` - DroppedNoDefaultColumnNames string `protobuf:"bytes,35,opt,name=dropped_no_default_column_names,json=droppedNoDefaultColumnNames,proto3" json:"dropped_no_default_column_names,omitempty"` - ExpandedColumnNames string `protobuf:"bytes,36,opt,name=expanded_column_names,json=expandedColumnNames,proto3" json:"expanded_column_names,omitempty"` - RevertibleNotes string `protobuf:"bytes,37,opt,name=revertible_notes,json=revertibleNotes,proto3" json:"revertible_notes,omitempty"` - AllowConcurrent bool `protobuf:"varint,38,opt,name=allow_concurrent,json=allowConcurrent,proto3" json:"allow_concurrent,omitempty"` - RevertedUuid string `protobuf:"bytes,39,opt,name=reverted_uuid,json=revertedUuid,proto3" json:"reverted_uuid,omitempty"` - IsView bool `protobuf:"varint,40,opt,name=is_view,json=isView,proto3" json:"is_view,omitempty"` - ReadyToComplete bool `protobuf:"varint,41,opt,name=ready_to_complete,json=readyToComplete,proto3" json:"ready_to_complete,omitempty"` - VitessLivenessIndicator int64 `protobuf:"varint,42,opt,name=vitess_liveness_indicator,json=vitessLivenessIndicator,proto3" json:"vitess_liveness_indicator,omitempty"` - UserThrottleRatio float32 `protobuf:"fixed32,43,opt,name=user_throttle_ratio,json=userThrottleRatio,proto3" json:"user_throttle_ratio,omitempty"` - SpecialPlan string `protobuf:"bytes,44,opt,name=special_plan,json=specialPlan,proto3" json:"special_plan,omitempty"` - LastThrottledAt *vttime.Time `protobuf:"bytes,45,opt,name=last_throttled_at,json=lastThrottledAt,proto3" json:"last_throttled_at,omitempty"` - ComponentThrottled string `protobuf:"bytes,46,opt,name=component_throttled,json=componentThrottled,proto3" json:"component_throttled,omitempty"` - CancelledAt *vttime.Time `protobuf:"bytes,47,opt,name=cancelled_at,json=cancelledAt,proto3" json:"cancelled_at,omitempty"` - PostponeLaunch bool `protobuf:"varint,48,opt,name=postpone_launch,json=postponeLaunch,proto3" json:"postpone_launch,omitempty"` - Stage string `protobuf:"bytes,49,opt,name=stage,proto3" json:"stage,omitempty"` // enum? - CutoverAttempts uint32 `protobuf:"varint,50,opt,name=cutover_attempts,json=cutoverAttempts,proto3" json:"cutover_attempts,omitempty"` - IsImmediateOperation bool `protobuf:"varint,51,opt,name=is_immediate_operation,json=isImmediateOperation,proto3" json:"is_immediate_operation,omitempty"` - ReviewedAt *vttime.Time `protobuf:"bytes,52,opt,name=reviewed_at,json=reviewedAt,proto3" json:"reviewed_at,omitempty"` - ReadyToCompleteAt *vttime.Time `protobuf:"bytes,53,opt,name=ready_to_complete_at,json=readyToCompleteAt,proto3" json:"ready_to_complete_at,omitempty"` - RemovedForeignKeyNames string `protobuf:"bytes,54,opt,name=removed_foreign_key_names,json=removedForeignKeyNames,proto3" json:"removed_foreign_key_names,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Uuid string `protobuf:"bytes,1,opt,name=uuid,proto3" json:"uuid,omitempty"` + Keyspace string `protobuf:"bytes,2,opt,name=keyspace,proto3" json:"keyspace,omitempty"` + Shard string `protobuf:"bytes,3,opt,name=shard,proto3" json:"shard,omitempty"` + Schema string `protobuf:"bytes,4,opt,name=schema,proto3" json:"schema,omitempty"` + Table string `protobuf:"bytes,5,opt,name=table,proto3" json:"table,omitempty"` + MigrationStatement string `protobuf:"bytes,6,opt,name=migration_statement,json=migrationStatement,proto3" json:"migration_statement,omitempty"` + Strategy SchemaMigration_Strategy `protobuf:"varint,7,opt,name=strategy,proto3,enum=vtctldata.SchemaMigration_Strategy" json:"strategy,omitempty"` + Options string `protobuf:"bytes,8,opt,name=options,proto3" json:"options,omitempty"` + AddedAt *vttime.Time `protobuf:"bytes,9,opt,name=added_at,json=addedAt,proto3" json:"added_at,omitempty"` + RequestedAt *vttime.Time `protobuf:"bytes,10,opt,name=requested_at,json=requestedAt,proto3" json:"requested_at,omitempty"` + ReadyAt *vttime.Time `protobuf:"bytes,11,opt,name=ready_at,json=readyAt,proto3" json:"ready_at,omitempty"` + StartedAt *vttime.Time `protobuf:"bytes,12,opt,name=started_at,json=startedAt,proto3" json:"started_at,omitempty"` + LivenessTimestamp *vttime.Time `protobuf:"bytes,13,opt,name=liveness_timestamp,json=livenessTimestamp,proto3" json:"liveness_timestamp,omitempty"` + CompletedAt *vttime.Time `protobuf:"bytes,14,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"` + CleanedUpAt *vttime.Time `protobuf:"bytes,15,opt,name=cleaned_up_at,json=cleanedUpAt,proto3" json:"cleaned_up_at,omitempty"` + Status SchemaMigration_Status `protobuf:"varint,16,opt,name=status,proto3,enum=vtctldata.SchemaMigration_Status" json:"status,omitempty"` + LogPath string `protobuf:"bytes,17,opt,name=log_path,json=logPath,proto3" json:"log_path,omitempty"` + Artifacts string `protobuf:"bytes,18,opt,name=artifacts,proto3" json:"artifacts,omitempty"` + Retries uint64 `protobuf:"varint,19,opt,name=retries,proto3" json:"retries,omitempty"` + Tablet *topodata.TabletAlias `protobuf:"bytes,20,opt,name=tablet,proto3" json:"tablet,omitempty"` + TabletFailure bool `protobuf:"varint,21,opt,name=tablet_failure,json=tabletFailure,proto3" json:"tablet_failure,omitempty"` + Progress float32 `protobuf:"fixed32,22,opt,name=progress,proto3" json:"progress,omitempty"` + MigrationContext string `protobuf:"bytes,23,opt,name=migration_context,json=migrationContext,proto3" json:"migration_context,omitempty"` + DdlAction string `protobuf:"bytes,24,opt,name=ddl_action,json=ddlAction,proto3" json:"ddl_action,omitempty"` + Message string `protobuf:"bytes,25,opt,name=message,proto3" json:"message,omitempty"` + EtaSeconds int64 `protobuf:"varint,26,opt,name=eta_seconds,json=etaSeconds,proto3" json:"eta_seconds,omitempty"` + RowsCopied uint64 `protobuf:"varint,27,opt,name=rows_copied,json=rowsCopied,proto3" json:"rows_copied,omitempty"` + TableRows int64 `protobuf:"varint,28,opt,name=table_rows,json=tableRows,proto3" json:"table_rows,omitempty"` + AddedUniqueKeys uint32 `protobuf:"varint,29,opt,name=added_unique_keys,json=addedUniqueKeys,proto3" json:"added_unique_keys,omitempty"` + RemovedUniqueKeys uint32 `protobuf:"varint,30,opt,name=removed_unique_keys,json=removedUniqueKeys,proto3" json:"removed_unique_keys,omitempty"` + LogFile string `protobuf:"bytes,31,opt,name=log_file,json=logFile,proto3" json:"log_file,omitempty"` + ArtifactRetention *vttime.Duration `protobuf:"bytes,32,opt,name=artifact_retention,json=artifactRetention,proto3" json:"artifact_retention,omitempty"` + PostponeCompletion bool `protobuf:"varint,33,opt,name=postpone_completion,json=postponeCompletion,proto3" json:"postpone_completion,omitempty"` + RemovedUniqueKeyNames string `protobuf:"bytes,34,opt,name=removed_unique_key_names,json=removedUniqueKeyNames,proto3" json:"removed_unique_key_names,omitempty"` + DroppedNoDefaultColumnNames string `protobuf:"bytes,35,opt,name=dropped_no_default_column_names,json=droppedNoDefaultColumnNames,proto3" json:"dropped_no_default_column_names,omitempty"` + ExpandedColumnNames string `protobuf:"bytes,36,opt,name=expanded_column_names,json=expandedColumnNames,proto3" json:"expanded_column_names,omitempty"` + RevertibleNotes string `protobuf:"bytes,37,opt,name=revertible_notes,json=revertibleNotes,proto3" json:"revertible_notes,omitempty"` + AllowConcurrent bool `protobuf:"varint,38,opt,name=allow_concurrent,json=allowConcurrent,proto3" json:"allow_concurrent,omitempty"` + RevertedUuid string `protobuf:"bytes,39,opt,name=reverted_uuid,json=revertedUuid,proto3" json:"reverted_uuid,omitempty"` + IsView bool `protobuf:"varint,40,opt,name=is_view,json=isView,proto3" json:"is_view,omitempty"` + ReadyToComplete bool `protobuf:"varint,41,opt,name=ready_to_complete,json=readyToComplete,proto3" json:"ready_to_complete,omitempty"` + VitessLivenessIndicator int64 `protobuf:"varint,42,opt,name=vitess_liveness_indicator,json=vitessLivenessIndicator,proto3" json:"vitess_liveness_indicator,omitempty"` + UserThrottleRatio float32 `protobuf:"fixed32,43,opt,name=user_throttle_ratio,json=userThrottleRatio,proto3" json:"user_throttle_ratio,omitempty"` + SpecialPlan string `protobuf:"bytes,44,opt,name=special_plan,json=specialPlan,proto3" json:"special_plan,omitempty"` + LastThrottledAt *vttime.Time `protobuf:"bytes,45,opt,name=last_throttled_at,json=lastThrottledAt,proto3" json:"last_throttled_at,omitempty"` + ComponentThrottled string `protobuf:"bytes,46,opt,name=component_throttled,json=componentThrottled,proto3" json:"component_throttled,omitempty"` + CancelledAt *vttime.Time `protobuf:"bytes,47,opt,name=cancelled_at,json=cancelledAt,proto3" json:"cancelled_at,omitempty"` + PostponeLaunch bool `protobuf:"varint,48,opt,name=postpone_launch,json=postponeLaunch,proto3" json:"postpone_launch,omitempty"` + Stage string `protobuf:"bytes,49,opt,name=stage,proto3" json:"stage,omitempty"` // enum? + CutoverAttempts uint32 `protobuf:"varint,50,opt,name=cutover_attempts,json=cutoverAttempts,proto3" json:"cutover_attempts,omitempty"` + IsImmediateOperation bool `protobuf:"varint,51,opt,name=is_immediate_operation,json=isImmediateOperation,proto3" json:"is_immediate_operation,omitempty"` + ReviewedAt *vttime.Time `protobuf:"bytes,52,opt,name=reviewed_at,json=reviewedAt,proto3" json:"reviewed_at,omitempty"` + ReadyToCompleteAt *vttime.Time `protobuf:"bytes,53,opt,name=ready_to_complete_at,json=readyToCompleteAt,proto3" json:"ready_to_complete_at,omitempty"` + RemovedForeignKeyNames string `protobuf:"bytes,54,opt,name=removed_foreign_key_names,json=removedForeignKeyNames,proto3" json:"removed_foreign_key_names,omitempty"` + InOrderCompletionPendingCount uint64 `protobuf:"varint,55,opt,name=in_order_completion_pending_count,json=inOrderCompletionPendingCount,proto3" json:"in_order_completion_pending_count,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *SchemaMigration) Reset() { @@ -1258,6 +1259,13 @@ func (x *SchemaMigration) GetRemovedForeignKeyNames() string { return "" } +func (x *SchemaMigration) GetInOrderCompletionPendingCount() uint64 { + if x != nil { + return x.InOrderCompletionPendingCount + } + return 0 +} + type Shard struct { state protoimpl.MessageState `protogen:"open.v1"` Keyspace string `protobuf:"bytes,1,opt,name=keyspace,proto3" json:"keyspace,omitempty"` @@ -17457,7 +17465,7 @@ const file_vtctldata_proto_rawDesc = "" + "\x10reference_tables\x18\x12 \x03(\tR\x0freferenceTables\"N\n" + "\bKeyspace\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12.\n" + - "\bkeyspace\x18\x02 \x01(\v2\x12.topodata.KeyspaceR\bkeyspace\"\xb6\x13\n" + + "\bkeyspace\x18\x02 \x01(\v2\x12.topodata.KeyspaceR\bkeyspace\"\x80\x14\n" + "\x0fSchemaMigration\x12\x12\n" + "\x04uuid\x18\x01 \x01(\tR\x04uuid\x12\x1a\n" + "\bkeyspace\x18\x02 \x01(\tR\bkeyspace\x12\x14\n" + @@ -17519,7 +17527,8 @@ const file_vtctldata_proto_rawDesc = "" + "\vreviewed_at\x184 \x01(\v2\f.vttime.TimeR\n" + "reviewedAt\x12=\n" + "\x14ready_to_complete_at\x185 \x01(\v2\f.vttime.TimeR\x11readyToCompleteAt\x129\n" + - "\x19removed_foreign_key_names\x186 \x01(\tR\x16removedForeignKeyNames\"I\n" + + "\x19removed_foreign_key_names\x186 \x01(\tR\x16removedForeignKeyNames\x12H\n" + + "!in_order_completion_pending_count\x187 \x01(\x04R\x1dinOrderCompletionPendingCount\"I\n" + "\bStrategy\x12\n" + "\n" + "\x06VITESS\x10\x00\x12\n" + diff --git a/go/vt/proto/vtctldata/vtctldata_vtproto.pb.go b/go/vt/proto/vtctldata/vtctldata_vtproto.pb.go index e80b1059e67..6f189a63a0e 100644 --- a/go/vt/proto/vtctldata/vtctldata_vtproto.pb.go +++ b/go/vt/proto/vtctldata/vtctldata_vtproto.pb.go @@ -214,6 +214,7 @@ func (m *SchemaMigration) CloneVT() *SchemaMigration { r.ReviewedAt = m.ReviewedAt.CloneVT() r.ReadyToCompleteAt = m.ReadyToCompleteAt.CloneVT() r.RemovedForeignKeyNames = m.RemovedForeignKeyNames + r.InOrderCompletionPendingCount = m.InOrderCompletionPendingCount if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -6452,6 +6453,13 @@ func (m *SchemaMigration) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.InOrderCompletionPendingCount != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.InOrderCompletionPendingCount)) + i-- + dAtA[i] = 0x3 + i-- + dAtA[i] = 0xb8 + } if len(m.RemovedForeignKeyNames) > 0 { i -= len(m.RemovedForeignKeyNames) copy(dAtA[i:], m.RemovedForeignKeyNames) @@ -22928,6 +22936,9 @@ func (m *SchemaMigration) SizeVT() (n int) { if l > 0 { n += 2 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.InOrderCompletionPendingCount != 0 { + n += 2 + protohelpers.SizeOfVarint(uint64(m.InOrderCompletionPendingCount)) + } n += len(m.unknownFields) return n } @@ -31369,6 +31380,25 @@ func (m *SchemaMigration) UnmarshalVT(dAtA []byte) error { } m.RemovedForeignKeyNames = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 55: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field InOrderCompletionPendingCount", wireType) + } + m.InOrderCompletionPendingCount = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.InOrderCompletionPendingCount |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/go/vt/sidecardb/schema/onlineddl/schema_migrations.sql b/go/vt/sidecardb/schema/onlineddl/schema_migrations.sql index 3e2a72d6ae5..15c0bd3fb69 100644 --- a/go/vt/sidecardb/schema/onlineddl/schema_migrations.sql +++ b/go/vt/sidecardb/schema/onlineddl/schema_migrations.sql @@ -16,68 +16,69 @@ limitations under the License. CREATE TABLE IF NOT EXISTS schema_migrations ( - `id` bigint unsigned NOT NULL AUTO_INCREMENT, - `migration_uuid` varchar(64) NOT NULL, - `keyspace` varchar(256) NOT NULL, - `shard` varchar(255) NOT NULL, - `mysql_schema` varchar(128) NOT NULL, - `mysql_table` varchar(128) NOT NULL, - `migration_statement` text NOT NULL, - `strategy` varchar(128) NOT NULL, - `options` varchar(8192) NOT NULL, - `added_timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - `requested_timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - `ready_timestamp` timestamp NULL DEFAULT NULL, - `started_timestamp` timestamp NULL DEFAULT NULL, - `liveness_timestamp` timestamp NULL DEFAULT NULL, - `completed_timestamp` timestamp(6) NULL DEFAULT NULL, - `cleanup_timestamp` timestamp NULL DEFAULT NULL, - `migration_status` varchar(128) NOT NULL, - `log_path` varchar(1024) NOT NULL, - `artifacts` text NOT NULL, - `retries` int unsigned NOT NULL DEFAULT '0', - `tablet` varchar(128) NOT NULL DEFAULT '', - `tablet_failure` tinyint unsigned NOT NULL DEFAULT '0', - `progress` float NOT NULL DEFAULT '0', - `migration_context` varchar(1024) NOT NULL DEFAULT '', - `ddl_action` varchar(16) NOT NULL DEFAULT '', - `message` text NOT NULL, - `message_timestamp` timestamp(6) NULL DEFAULT NULL, - `eta_seconds` bigint NOT NULL DEFAULT '-1', - `rows_copied` bigint unsigned NOT NULL DEFAULT '0', - `vreplication_lag_seconds` bigint unsigned NOT NULL DEFAULT '0', - `table_rows` bigint NOT NULL DEFAULT '0', - `added_unique_keys` int unsigned NOT NULL DEFAULT '0', - `removed_unique_keys` int unsigned NOT NULL DEFAULT '0', - `log_file` varchar(1024) NOT NULL DEFAULT '', - `retain_artifacts_seconds` bigint NOT NULL DEFAULT '0', - `postpone_completion` tinyint unsigned NOT NULL DEFAULT '0', - `removed_unique_key_names` text NOT NULL, - `dropped_no_default_column_names` text NOT NULL, - `expanded_column_names` text NOT NULL, - `revertible_notes` text NOT NULL, - `allow_concurrent` tinyint unsigned NOT NULL DEFAULT '0', - `reverted_uuid` varchar(64) NOT NULL DEFAULT '', - `is_view` tinyint unsigned NOT NULL DEFAULT '0', - `ready_to_complete` tinyint unsigned NOT NULL DEFAULT '0', - `vitess_liveness_indicator` bigint NOT NULL DEFAULT '0', - `user_throttle_ratio` float NOT NULL DEFAULT '0', - `special_plan` text NOT NULL, - `last_throttled_timestamp` timestamp NULL DEFAULT NULL, - `component_throttled` tinytext NOT NULL, - `reason_throttled` text NOT NULL, - `cancelled_timestamp` timestamp NULL DEFAULT NULL, - `postpone_launch` tinyint unsigned NOT NULL DEFAULT '0', - `stage` text NOT NULL, - `cutover_attempts` int unsigned NOT NULL DEFAULT '0', - `is_immediate_operation` tinyint unsigned NOT NULL DEFAULT '0', - `reviewed_timestamp` timestamp NULL DEFAULT NULL, - `ready_to_complete_timestamp` timestamp NULL DEFAULT NULL, - `shadow_analyzed_timestamp` timestamp NULL DEFAULT NULL, - `removed_foreign_key_names` text NOT NULL, - `last_cutover_attempt_timestamp` timestamp NULL DEFAULT NULL, - `force_cutover` tinyint unsigned NOT NULL DEFAULT '0', - `cutover_threshold_seconds` int unsigned NOT NULL DEFAULT '0', + `id` bigint unsigned NOT NULL AUTO_INCREMENT, + `migration_uuid` varchar(64) NOT NULL, + `keyspace` varchar(256) NOT NULL, + `shard` varchar(255) NOT NULL, + `mysql_schema` varchar(128) NOT NULL, + `mysql_table` varchar(128) NOT NULL, + `migration_statement` text NOT NULL, + `strategy` varchar(128) NOT NULL, + `options` varchar(8192) NOT NULL, + `added_timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `requested_timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `ready_timestamp` timestamp NULL DEFAULT NULL, + `started_timestamp` timestamp NULL DEFAULT NULL, + `liveness_timestamp` timestamp NULL DEFAULT NULL, + `completed_timestamp` timestamp(6) NULL DEFAULT NULL, + `cleanup_timestamp` timestamp NULL DEFAULT NULL, + `migration_status` varchar(128) NOT NULL, + `log_path` varchar(1024) NOT NULL, + `artifacts` text NOT NULL, + `retries` int unsigned NOT NULL DEFAULT '0', + `tablet` varchar(128) NOT NULL DEFAULT '', + `tablet_failure` tinyint unsigned NOT NULL DEFAULT '0', + `progress` float NOT NULL DEFAULT '0', + `migration_context` varchar(1024) NOT NULL DEFAULT '', + `ddl_action` varchar(16) NOT NULL DEFAULT '', + `message` text NOT NULL, + `message_timestamp` timestamp(6) NULL DEFAULT NULL, + `eta_seconds` bigint NOT NULL DEFAULT '-1', + `rows_copied` bigint unsigned NOT NULL DEFAULT '0', + `vreplication_lag_seconds` bigint unsigned NOT NULL DEFAULT '0', + `table_rows` bigint NOT NULL DEFAULT '0', + `added_unique_keys` int unsigned NOT NULL DEFAULT '0', + `removed_unique_keys` int unsigned NOT NULL DEFAULT '0', + `log_file` varchar(1024) NOT NULL DEFAULT '', + `retain_artifacts_seconds` bigint NOT NULL DEFAULT '0', + `postpone_completion` tinyint unsigned NOT NULL DEFAULT '0', + `removed_unique_key_names` text NOT NULL, + `dropped_no_default_column_names` text NOT NULL, + `expanded_column_names` text NOT NULL, + `revertible_notes` text NOT NULL, + `allow_concurrent` tinyint unsigned NOT NULL DEFAULT '0', + `reverted_uuid` varchar(64) NOT NULL DEFAULT '', + `is_view` tinyint unsigned NOT NULL DEFAULT '0', + `ready_to_complete` tinyint unsigned NOT NULL DEFAULT '0', + `vitess_liveness_indicator` bigint NOT NULL DEFAULT '0', + `user_throttle_ratio` float NOT NULL DEFAULT '0', + `special_plan` text NOT NULL, + `last_throttled_timestamp` timestamp NULL DEFAULT NULL, + `component_throttled` tinytext NOT NULL, + `reason_throttled` text NOT NULL, + `cancelled_timestamp` timestamp NULL DEFAULT NULL, + `postpone_launch` tinyint unsigned NOT NULL DEFAULT '0', + `stage` text NOT NULL, + `cutover_attempts` int unsigned NOT NULL DEFAULT '0', + `is_immediate_operation` tinyint unsigned NOT NULL DEFAULT '0', + `reviewed_timestamp` timestamp NULL DEFAULT NULL, + `ready_to_complete_timestamp` timestamp NULL DEFAULT NULL, + `shadow_analyzed_timestamp` timestamp NULL DEFAULT NULL, + `removed_foreign_key_names` text NOT NULL, + `last_cutover_attempt_timestamp` timestamp NULL DEFAULT NULL, + `force_cutover` tinyint unsigned NOT NULL DEFAULT '0', + `cutover_threshold_seconds` int unsigned NOT NULL DEFAULT '0', + `in_order_completion_pending_count` int unsigned NOT NULL DEFAULT '0', PRIMARY KEY (`id`), UNIQUE KEY `uuid_idx` (`migration_uuid`), KEY `keyspace_shard_idx` (`keyspace`(64), `shard`(64)), diff --git a/go/vt/vtctl/grpcvtctldserver/query.go b/go/vt/vtctl/grpcvtctldserver/query.go index 100e71b92c5..9b36f7729bb 100644 --- a/go/vt/vtctl/grpcvtctldserver/query.go +++ b/go/vt/vtctl/grpcvtctldserver/query.go @@ -150,6 +150,7 @@ func rowToSchemaMigration(row sqltypes.RowNamedValues) (sm *vtctldatapb.SchemaMi sm.VitessLivenessIndicator = row.AsInt64("vitess_liveness_indicator", 0) sm.UserThrottleRatio = float32(row.AsFloat64("user_throttle_ratio", 0)) sm.SpecialPlan = row.AsString("special_plan", "") + sm.InOrderCompletionPendingCount = row.AsUint64("in_order_completion_pending_count", 0) sm.LastThrottledAt, err = valueToVTTime(row.AsString("last_throttled_timestamp", "")) if err != nil { diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index 520d322ba67..614b28a3b07 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -1809,7 +1809,7 @@ func (s *VtctldServer) GetSchema(ctx context.Context, req *vtctldatapb.GetSchema } func (s *VtctldServer) GetSchemaMigrations(ctx context.Context, req *vtctldatapb.GetSchemaMigrationsRequest) (resp *vtctldatapb.GetSchemaMigrationsResponse, err error) { - span, ctx := trace.NewSpan(ctx, "VtctldServer.GetShard") + span, ctx := trace.NewSpan(ctx, "VtctldServer.GetSchemaMigrations") defer span.Finish() defer panicHandler(&err) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index aa214bd146b..d0f12ecae78 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -3098,6 +3098,25 @@ func shouldCutOverAccordingToBackoff( return false, false } +// getInOrderCompletionPendingCount returns a count of migrations that must cut-over in-order, before the +// provided migration is able to proceed. This count is relevant only if the migration uses the +// --in-order-completion option. +func getInOrderCompletionPendingCount(onlineDDL *schema.OnlineDDL, pendingMigrationsUUIDs []string) uint64 { + if len(pendingMigrationsUUIDs) == 0 { + return 0 + } + var pendingCount uint64 + for _, pendingMigrationsUUID := range pendingMigrationsUUIDs { + if pendingMigrationsUUID == onlineDDL.UUID { + // found all migrations we must wait for if + // we found ourself in the pending list. + break + } + pendingCount++ + } + return pendingCount +} + // reviewRunningMigrations iterates migrations in 'running' state. Normally there's only one running, which was // spawned by this tablet; but vreplication migrations could also resume from failure. func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning int, cancellable []*cancellableMigration, err error) { @@ -3246,18 +3265,23 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i // understand whether "now is a good time" or "not there yet" _ = e.updateMigrationReadyToComplete(ctx, uuid, isReady) if !isReady { + // The migration is not ready yet. return nil } + if strategySetting.IsInOrderCompletion() { + pendingMigrationsCount := getInOrderCompletionPendingCount(onlineDDL, pendingMigrationsUUIDs) + if pendingMigrationsCount > 0 { + postponeCompletion = true + } + // Update in_order_completion_pending_count state if we are waiting or if we find we are no longer waiting (0). + if err = e.updateInOrderCompletionPendingCount(ctx, onlineDDL.UUID, pendingMigrationsCount); err != nil { + return err + } + } if postponeCompletion { // override. Even if migration is ready, we do not complete it. return nil } - if strategySetting.IsInOrderCompletion() { - if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID { - // wait for earlier pending migrations to complete - return nil - } - } shouldCutOver, shouldForceCutOver := shouldCutOverAccordingToBackoff( shouldForceCutOver, forceCutOverAfter, sinceReadyToComplete, sinceLastCutoverAttempt, cutoverAttempts, ) @@ -4063,6 +4087,22 @@ func (e *Executor) updateMigrationUserThrottleRatio(ctx context.Context, uuid st return err } +func (e *Executor) updateInOrderCompletionPendingCount( + ctx context.Context, + uuid string, + pendingCompletions uint64, +) error { + query, err := sqlparser.ParseAndBind(sqlUpdateInOrderCompletionPendingCount, + sqltypes.Uint64BindVariable(pendingCompletions), + sqltypes.StringBindVariable(uuid), + ) + if err != nil { + return err + } + _, err = e.execQuery(ctx, query) + return err +} + // retryMigrationWhere retries a migration based on a given WHERE clause func (e *Executor) retryMigrationWhere(ctx context.Context, whereExpr string) (result *sqltypes.Result, err error) { e.migrationMutex.Lock() diff --git a/go/vt/vttablet/onlineddl/executor_test.go b/go/vt/vttablet/onlineddl/executor_test.go index 91807165f93..6373b715763 100644 --- a/go/vt/vttablet/onlineddl/executor_test.go +++ b/go/vt/vttablet/onlineddl/executor_test.go @@ -26,6 +26,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/schema" ) func TestShouldCutOverAccordingToBackoff(t *testing.T) { @@ -229,3 +231,21 @@ func TestSafeMigrationCutOverThreshold(t *testing.T) { }) } } + +func TestGetInOrderCompletionPendingCount(t *testing.T) { + onlineDDL := &schema.OnlineDDL{UUID: t.Name()} + { + require.Zero(t, getInOrderCompletionPendingCount(onlineDDL, nil)) + } + { + require.Zero(t, getInOrderCompletionPendingCount(onlineDDL, []string{})) + } + { + pendingMigrationsUUIDs := []string{t.Name()} + require.Zero(t, getInOrderCompletionPendingCount(onlineDDL, pendingMigrationsUUIDs)) + } + { + pendingMigrationsUUIDs := []string{"a", "b", "c", t.Name(), "x"} + require.Equal(t, uint64(3), getInOrderCompletionPendingCount(onlineDDL, pendingMigrationsUUIDs)) + } +} diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 0c2f6c3cb3c..f267daf1535 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -139,6 +139,11 @@ const ( WHERE migration_uuid=%a ` + sqlUpdateInOrderCompletionPendingCount = `UPDATE _vt.schema_migrations + SET in_order_completion_pending_count=%a + WHERE + migration_uuid=%a + ` sqlUpdateArtifacts = `UPDATE _vt.schema_migrations SET artifacts=concat(%a, ',', artifacts), cleanup_timestamp=NULL WHERE @@ -288,7 +293,8 @@ const ( completed_timestamp=NULL, last_cutover_attempt_timestamp=NULL, shadow_analyzed_timestamp=NULL, - cleanup_timestamp=NULL + cleanup_timestamp=NULL, + in_order_completion_pending_count=0 WHERE migration_status IN ('failed', 'cancelled') AND (%s) @@ -310,7 +316,8 @@ const ( completed_timestamp=NULL, last_cutover_attempt_timestamp=NULL, shadow_analyzed_timestamp=NULL, - cleanup_timestamp=NULL + cleanup_timestamp=NULL, + in_order_completion_pending_count=0 WHERE migration_status IN ('failed', 'cancelled') AND migration_uuid=%a diff --git a/proto/vtctldata.proto b/proto/vtctldata.proto index e8d03f2e41b..c506a5ea569 100644 --- a/proto/vtctldata.proto +++ b/proto/vtctldata.proto @@ -175,6 +175,7 @@ message SchemaMigration { vttime.Time reviewed_at = 52; vttime.Time ready_to_complete_at = 53; string removed_foreign_key_names = 54; + uint64 in_order_completion_pending_count = 55; enum Strategy { option allow_alias = true; diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 419e558161f..0c0e6654709 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -52737,6 +52737,9 @@ export namespace vtctldata { /** SchemaMigration removed_foreign_key_names */ removed_foreign_key_names?: (string|null); + + /** SchemaMigration in_order_completion_pending_count */ + in_order_completion_pending_count?: (number|Long|null); } /** Represents a SchemaMigration. */ @@ -52910,6 +52913,9 @@ export namespace vtctldata { /** SchemaMigration removed_foreign_key_names. */ public removed_foreign_key_names: string; + /** SchemaMigration in_order_completion_pending_count. */ + public in_order_completion_pending_count: (number|Long); + /** * Creates a new SchemaMigration instance using the specified properties. * @param [properties] Properties to set diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index 854d7430112..2220483d9a7 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -128997,6 +128997,7 @@ export const vtctldata = $root.vtctldata = (() => { * @property {vttime.ITime|null} [reviewed_at] SchemaMigration reviewed_at * @property {vttime.ITime|null} [ready_to_complete_at] SchemaMigration ready_to_complete_at * @property {string|null} [removed_foreign_key_names] SchemaMigration removed_foreign_key_names + * @property {number|Long|null} [in_order_completion_pending_count] SchemaMigration in_order_completion_pending_count */ /** @@ -129446,6 +129447,14 @@ export const vtctldata = $root.vtctldata = (() => { */ SchemaMigration.prototype.removed_foreign_key_names = ""; + /** + * SchemaMigration in_order_completion_pending_count. + * @member {number|Long} in_order_completion_pending_count + * @memberof vtctldata.SchemaMigration + * @instance + */ + SchemaMigration.prototype.in_order_completion_pending_count = $util.Long ? $util.Long.fromBits(0,0,true) : 0; + /** * Creates a new SchemaMigration instance using the specified properties. * @function create @@ -129578,6 +129587,8 @@ export const vtctldata = $root.vtctldata = (() => { $root.vttime.Time.encode(message.ready_to_complete_at, writer.uint32(/* id 53, wireType 2 =*/426).fork()).ldelim(); if (message.removed_foreign_key_names != null && Object.hasOwnProperty.call(message, "removed_foreign_key_names")) writer.uint32(/* id 54, wireType 2 =*/434).string(message.removed_foreign_key_names); + if (message.in_order_completion_pending_count != null && Object.hasOwnProperty.call(message, "in_order_completion_pending_count")) + writer.uint32(/* id 55, wireType 0 =*/440).uint64(message.in_order_completion_pending_count); return writer; }; @@ -129828,6 +129839,10 @@ export const vtctldata = $root.vtctldata = (() => { message.removed_foreign_key_names = reader.string(); break; } + case 55: { + message.in_order_completion_pending_count = reader.uint64(); + break; + } default: reader.skipType(tag & 7); break; @@ -130069,6 +130084,9 @@ export const vtctldata = $root.vtctldata = (() => { if (message.removed_foreign_key_names != null && message.hasOwnProperty("removed_foreign_key_names")) if (!$util.isString(message.removed_foreign_key_names)) return "removed_foreign_key_names: string expected"; + if (message.in_order_completion_pending_count != null && message.hasOwnProperty("in_order_completion_pending_count")) + if (!$util.isInteger(message.in_order_completion_pending_count) && !(message.in_order_completion_pending_count && $util.isInteger(message.in_order_completion_pending_count.low) && $util.isInteger(message.in_order_completion_pending_count.high))) + return "in_order_completion_pending_count: integer|Long expected"; return null; }; @@ -130326,6 +130344,15 @@ export const vtctldata = $root.vtctldata = (() => { } if (object.removed_foreign_key_names != null) message.removed_foreign_key_names = String(object.removed_foreign_key_names); + if (object.in_order_completion_pending_count != null) + if ($util.Long) + (message.in_order_completion_pending_count = $util.Long.fromValue(object.in_order_completion_pending_count)).unsigned = true; + else if (typeof object.in_order_completion_pending_count === "string") + message.in_order_completion_pending_count = parseInt(object.in_order_completion_pending_count, 10); + else if (typeof object.in_order_completion_pending_count === "number") + message.in_order_completion_pending_count = object.in_order_completion_pending_count; + else if (typeof object.in_order_completion_pending_count === "object") + message.in_order_completion_pending_count = new $util.LongBits(object.in_order_completion_pending_count.low >>> 0, object.in_order_completion_pending_count.high >>> 0).toNumber(true); return message; }; @@ -130417,6 +130444,11 @@ export const vtctldata = $root.vtctldata = (() => { object.reviewed_at = null; object.ready_to_complete_at = null; object.removed_foreign_key_names = ""; + if ($util.Long) { + let long = new $util.Long(0, 0, true); + object.in_order_completion_pending_count = options.longs === String ? long.toString() : options.longs === Number ? long.toNumber() : long; + } else + object.in_order_completion_pending_count = options.longs === String ? "0" : 0; } if (message.uuid != null && message.hasOwnProperty("uuid")) object.uuid = message.uuid; @@ -130541,6 +130573,11 @@ export const vtctldata = $root.vtctldata = (() => { object.ready_to_complete_at = $root.vttime.Time.toObject(message.ready_to_complete_at, options); if (message.removed_foreign_key_names != null && message.hasOwnProperty("removed_foreign_key_names")) object.removed_foreign_key_names = message.removed_foreign_key_names; + if (message.in_order_completion_pending_count != null && message.hasOwnProperty("in_order_completion_pending_count")) + if (typeof message.in_order_completion_pending_count === "number") + object.in_order_completion_pending_count = options.longs === String ? String(message.in_order_completion_pending_count) : message.in_order_completion_pending_count; + else + object.in_order_completion_pending_count = options.longs === String ? $util.Long.prototype.toString.call(message.in_order_completion_pending_count) : options.longs === Number ? new $util.LongBits(message.in_order_completion_pending_count.low >>> 0, message.in_order_completion_pending_count.high >>> 0).toNumber(true) : message.in_order_completion_pending_count; return object; };