Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/sources/s3/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (p *Checkpointer) Reset() {
type ResumeInfo struct {
CurrentBucket string `json:"current_bucket"` // Current bucket being scanned
StartAfter string `json:"start_after"` // Last processed object key
Role string `json:"role"` // Role used for scanning
}

// ResumePoint retrieves the last saved checkpoint state if one exists.
Expand All @@ -121,7 +122,7 @@ func (p *Checkpointer) ResumePoint(ctx context.Context) (ResumeInfo, error) {
return resume, nil
}

return ResumeInfo{CurrentBucket: resumeInfo.CurrentBucket, StartAfter: resumeInfo.StartAfter}, nil
return ResumeInfo{CurrentBucket: resumeInfo.CurrentBucket, StartAfter: resumeInfo.StartAfter, Role: resumeInfo.Role}, nil
}

// Complete marks the entire scanning operation as finished and clears the resume state.
Expand Down Expand Up @@ -215,7 +216,7 @@ func (p *Checkpointer) updateCheckpoint(bucket string, role string, lastKey stri
return nil
}

encoded, err := json.Marshal(&ResumeInfo{CurrentBucket: bucket, StartAfter: lastKey})
encoded, err := json.Marshal(&ResumeInfo{CurrentBucket: bucket, StartAfter: lastKey, Role: role})
if err != nil {
return fmt.Errorf("failed to encode resume info: %w", err)
}
Expand Down
180 changes: 180 additions & 0 deletions pkg/sources/s3/checkpointer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,56 @@ func TestCheckpointerResumption(t *testing.T) {
assert.Equal(t, "key-11", finalResumeInfo.StartAfter)
}

func TestCheckpointerResumptionWithRole(t *testing.T) {
ctx := context.Background()

// First scan - process 6 objects then interrupt.
initialProgress := &sources.Progress{}
tracker := NewCheckpointer(ctx, initialProgress)
role := "test-role"

firstPage := &s3.ListObjectsV2Output{
Contents: make([]s3types.Object, 12), // Total of 12 objects
}
for i := range 12 {
key := fmt.Sprintf("key-%d", i)
firstPage.Contents[i] = s3types.Object{Key: &key}
}

// Process first 6 objects.
for i := range 6 {
err := tracker.UpdateObjectCompletion(ctx, i, "test-bucket", role, firstPage.Contents)
assert.NoError(t, err)
}

// Verify resume info is set correctly.
resumeInfo, err := tracker.ResumePoint(ctx)
require.NoError(t, err)
assert.Equal(t, "test-bucket", resumeInfo.CurrentBucket)
assert.Equal(t, "key-5", resumeInfo.StartAfter)
assert.Equal(t, role, resumeInfo.Role)

// Resume scan with existing progress.
resumeTracker := NewCheckpointer(ctx, initialProgress)

resumePage := &s3.ListObjectsV2Output{
Contents: firstPage.Contents[6:], // Remaining 6 objects
}

// Process remaining objects.
for i := range len(resumePage.Contents) {
err := resumeTracker.UpdateObjectCompletion(ctx, i, "test-bucket", role, resumePage.Contents)
assert.NoError(t, err)
}

// Verify final resume info.
finalResumeInfo, err := resumeTracker.ResumePoint(ctx)
require.NoError(t, err)
assert.Equal(t, "test-bucket", finalResumeInfo.CurrentBucket)
assert.Equal(t, "key-11", finalResumeInfo.StartAfter)
assert.Equal(t, role, finalResumeInfo.Role)
}

func TestCheckpointerReset(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -111,6 +161,13 @@ func TestGetResumePoint(t *testing.T) {
},
expectedResumeInfo: ResumeInfo{CurrentBucket: "test-bucket", StartAfter: "test-key"},
},
{
name: "valid resume info with role",
progress: &sources.Progress{
EncodedResumeInfo: `{"current_bucket":"test-bucket","start_after":"test-key","role":"test-role"}`,
},
expectedResumeInfo: ResumeInfo{CurrentBucket: "test-bucket", StartAfter: "test-key", Role: "test-role"},
},
{
name: "empty encoded resume info",
progress: &sources.Progress{EncodedResumeInfo: ""},
Expand All @@ -121,6 +178,13 @@ func TestGetResumePoint(t *testing.T) {
EncodedResumeInfo: `{"current_bucket":"","start_after":"test-key"}`,
},
},
{
name: "no role in resume info",
progress: &sources.Progress{
EncodedResumeInfo: `{"current_bucket":"test-bucket","start_after":"test-key"}`,
},
expectedResumeInfo: ResumeInfo{CurrentBucket: "test-bucket", StartAfter: "test-key", Role: ""},
},
{
name: "unmarshal error",
progress: &sources.Progress{
Expand Down Expand Up @@ -257,6 +321,122 @@ func TestCheckpointerUpdate(t *testing.T) {
})
}
}
func TestCheckpointerUpdateWithRole(t *testing.T) {
role := "test-role"
tests := []struct {
name string
description string
completedIdx int
pageSize int
preCompleted []int
expectedKey string
expectedRole string
expectedLowestIncomplete int
}{
{
name: "first object completed",
description: "Basic case - completing first object",
completedIdx: 0,
pageSize: 3,
expectedKey: "key-0",
expectedRole: role,
expectedLowestIncomplete: 1,
},
{
name: "completing missing middle",
description: "Completing object when previous is done",
completedIdx: 1,
pageSize: 3,
preCompleted: []int{0},
expectedKey: "key-1",
expectedRole: role,
expectedLowestIncomplete: 2,
},
{
name: "all objects completed in order",
description: "Completing final object in sequence",
completedIdx: 2,
pageSize: 3,
preCompleted: []int{0, 1},
expectedKey: "key-2",
expectedRole: role,
expectedLowestIncomplete: 3,
},
{
name: "out of order completion before lowest",
description: "Completing object before current lowest incomplete - should not affect checkpoint",
completedIdx: 1,
pageSize: 4,
preCompleted: []int{0, 2, 3},
expectedKey: "key-3",
expectedRole: role,
expectedLowestIncomplete: 4,
},
{
name: "last index in max page",
description: "Edge case - maximum page size boundary",
completedIdx: 999,
pageSize: 1000,
preCompleted: func() []int {
indices := make([]int, 999)
for i := range indices {
indices[i] = i
}
return indices
}(),
expectedKey: "key-999",
expectedRole: role,
expectedLowestIncomplete: 1000,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx := context.Background()
progress := new(sources.Progress)
tracker := &Checkpointer{
progress: progress,
completedObjects: make([]bool, tt.pageSize),
completionOrder: make([]int, 0, tt.pageSize),
lowestIncompleteIdx: 0,
}

page := &s3.ListObjectsV2Output{Contents: make([]s3types.Object, tt.pageSize)}
for i := range tt.pageSize {
key := fmt.Sprintf("key-%d", i)
page.Contents[i] = s3types.Object{Key: &key}
}

// Setup pre-completed objects.
for _, idx := range tt.preCompleted {
tracker.completedObjects[idx] = true
tracker.completionOrder = append(tracker.completionOrder, idx)
}

// Find the correct lowest incomplete index after pre-completion.
for i := range tt.pageSize {
if !tracker.completedObjects[i] {
tracker.lowestIncompleteIdx = i
break
}
}

err := tracker.UpdateObjectCompletion(ctx, tt.completedIdx, "test-bucket", role, page.Contents)
assert.NoError(t, err, "Unexpected error updating progress")

var info ResumeInfo
err = json.Unmarshal([]byte(progress.EncodedResumeInfo), &info)
assert.NoError(t, err, "Failed to decode resume info")
assert.Equal(t, tt.expectedKey, info.StartAfter, "Incorrect resume point")
assert.Equal(t, tt.expectedRole, info.Role, "Incorrect role")

assert.Equal(t, tt.expectedLowestIncomplete, tracker.lowestIncompleteIdx,
"Incorrect lowest incomplete index")
})
}
}

func TestCheckpointerUpdateUnitScan(t *testing.T) {
ctx := context.Background()
Expand Down
6 changes: 5 additions & 1 deletion pkg/sources/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type resumePosition struct {
startAfter string // The last processed object key within the bucket
isNewScan bool // True if we're starting a fresh scan
exactMatch bool // True if we found the exact bucket we were previously processing
role string // The role used during the previous scan
}

// determineResumePosition calculates where to resume scanning from based on the last saved checkpoint
Expand Down Expand Up @@ -282,6 +283,7 @@ func determineResumePosition(ctx context.Context, tracker *Checkpointer, buckets
startAfter: resumePoint.StartAfter,
index: startIdx,
exactMatch: found,
role: resumePoint.Role,
}
}

Expand All @@ -306,12 +308,14 @@ func (s *Source) scanBuckets(
"Resume bucket no longer available, starting from closest position",
"original_bucket", pos.bucket,
"position", pos.index,
"role", pos.role,
)
default:
ctx.Logger().Info(
"Resuming scan from previous scan's bucket",
"bucket", pos.bucket,
"position", pos.index,
"role", pos.role,
)
}

Expand All @@ -327,7 +331,7 @@ func (s *Source) scanBuckets(
)

var startAfter *string
if bucket == pos.bucket && pos.startAfter != "" {
if bucket == pos.bucket && pos.startAfter != "" && role == pos.role {
startAfter = &pos.startAfter
ctx.Logger().V(3).Info(
"Resuming bucket scan",
Expand Down
Loading