Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions dkron/concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,117 @@ func TestConcurrencyForbid_MultipleNodes(t *testing.T) {
assert.Equal(t, "node-2", runningExecs[0].NodeName)
}

// TestConcurrencyForbid_StaleExecutionCleanup tests that stale executions in storage
// are automatically cleaned up when they exceed the stale threshold and are not
// found in active in-memory executions. This addresses the issue where a crashed
// node leaves a "running" execution in storage that permanently blocks the job.
func TestConcurrencyForbid_StaleExecutionCleanup(t *testing.T) {
log := getTestLogger()
s, err := NewStore(log, otel.Tracer("test"))
require.NoError(t, err)
defer s.Shutdown() // nolint: errcheck

ctx := context.Background()

// Create a test job with forbid concurrency
testJob := &Job{
Name: "stale-exec-job",
Schedule: "@every 10s",
Executor: "shell",
ExecutorConfig: map[string]string{"command": "/bin/true"},
Disabled: false,
Concurrency: ConcurrencyForbid,
}

err = s.SetJob(ctx, testJob, true)
require.NoError(t, err)

// Simulate a stale execution: started long ago, never finished
staleExecution := &Execution{
JobName: "stale-exec-job",
StartedAt: time.Now().UTC().Add(-5 * time.Hour), // Started 5 hours ago
FinishedAt: time.Time{}, // Never finished
Success: false,
Output: "running",
NodeName: "crashed-node",
Group: time.Now().UTC().Add(-5 * time.Hour).UnixNano(),
Attempt: 1,
}

_, err = s.SetExecution(ctx, staleExecution)
require.NoError(t, err)

// Verify the stale execution is detected as running
runningExecs, err := s.GetRunningExecutions(ctx, "stale-exec-job")
assert.NoError(t, err)
assert.Len(t, runningExecs, 1, "Should detect the stale execution as running")
assert.Equal(t, "crashed-node", runningExecs[0].NodeName)

// Verify the execution exceeds the stale threshold
assert.True(t, time.Since(runningExecs[0].StartedAt) > DefaultStaleExecutionThreshold,
"Stale execution should exceed the default threshold")

// Now simulate cleanup: mark the stale execution as done (as isRunnable would do)
staleExecution.FinishedAt = time.Now()
staleExecution.Success = false
staleExecution.Output += "\nExecution marked as failed: detected as stale (not active on any node)"
_, err = s.SetExecutionDone(ctx, staleExecution)
require.NoError(t, err)

// Verify no more running executions
runningExecs, err = s.GetRunningExecutions(ctx, "stale-exec-job")
assert.NoError(t, err)
assert.Empty(t, runningExecs, "Should have no running executions after cleanup")
}

// TestConcurrencyForbid_RecentExecutionNotCleaned tests that recent "running" executions
// (within the stale threshold) are NOT cleaned up, preserving the concurrency block.
func TestConcurrencyForbid_RecentExecutionNotCleaned(t *testing.T) {
log := getTestLogger()
s, err := NewStore(log, otel.Tracer("test"))
require.NoError(t, err)
defer s.Shutdown() // nolint: errcheck

ctx := context.Background()

// Create a test job with forbid concurrency
testJob := &Job{
Name: "recent-exec-job",
Schedule: "@every 10s",
Executor: "shell",
ExecutorConfig: map[string]string{"command": "/bin/sleep 60"},
Disabled: false,
Concurrency: ConcurrencyForbid,
}

err = s.SetJob(ctx, testJob, true)
require.NoError(t, err)

// Simulate a recent running execution (within threshold)
recentExecution := &Execution{
JobName: "recent-exec-job",
StartedAt: time.Now().UTC().Add(-10 * time.Minute), // Started 10 minutes ago
FinishedAt: time.Time{}, // Not finished
Success: false,
Output: "running",
NodeName: "active-node",
Group: time.Now().UTC().Add(-10 * time.Minute).UnixNano(),
Attempt: 1,
}

_, err = s.SetExecution(ctx, recentExecution)
require.NoError(t, err)

// Verify the execution is detected as running
runningExecs, err := s.GetRunningExecutions(ctx, "recent-exec-job")
assert.NoError(t, err)
assert.Len(t, runningExecs, 1, "Should detect the recent execution as running")

// Verify the execution does NOT exceed the stale threshold
assert.True(t, time.Since(runningExecs[0].StartedAt) < DefaultStaleExecutionThreshold,
"Recent execution should NOT exceed the default threshold")
}

// TestConcurrencyAllow_NotAffected tests that allow concurrency is not affected by our changes
func TestConcurrencyAllow_NotAffected(t *testing.T) {
log := getTestLogger()
Expand Down
92 changes: 71 additions & 21 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ const (

// HashSymbol is the "magic" character used in scheduled to be replaced with a value based on job name
HashSymbol = "~"

// DefaultStaleExecutionThreshold is the duration after which a "running" execution
// in storage that is not found in active in-memory executions is considered stale.
// Stale executions are automatically cleaned up to prevent permanently blocking
// jobs with concurrency=forbid.
DefaultStaleExecutionThreshold = 4 * time.Hour
)

var (
Expand Down Expand Up @@ -421,27 +427,7 @@ func (j *Job) isRunnable(logger *logrus.Entry) bool {
}

if j.Concurrency == ConcurrencyForbid {
// Check for running executions in persistent storage first
// This is the source of truth and survives node restarts
ctx := context.Background()
runningExecs, err := j.Agent.Store.GetRunningExecutions(ctx, j.Name)
if err != nil {
logger.WithError(err).Error("job: Error querying for running executions in storage")
return false
}

if len(runningExecs) > 0 {
logger.WithFields(logrus.Fields{
"job": j.Name,
"concurrency": j.Concurrency,
"job_status": j.Status,
"running_count": len(runningExecs),
}).Info("job: Skipping concurrent execution (found running executions in storage)")
return false
}

// Also check in-memory activeExecutions as a secondary check
// This catches executions that just started and may not be in storage yet
// Check in-memory active executions first - these are definitely running
exs, err := j.Agent.GetActiveExecutions()
if err != nil {
logger.WithError(err).Error("job: Error querying for active executions")
Expand All @@ -458,6 +444,70 @@ func (j *Job) isRunnable(logger *logrus.Entry) bool {
return false
}
}

// Check persistent storage for running executions
// This catches executions that might be running on nodes after a leader change
ctx := context.Background()
runningExecs, err := j.Agent.Store.GetRunningExecutions(ctx, j.Name)
if err != nil {
logger.WithError(err).Error("job: Error querying for running executions in storage")
return false
}

for _, exec := range runningExecs {
// Execution is in storage but not in active memory.
// If it has been running longer than the stale threshold, clean it up.
runningFor := time.Now().UTC().Sub(exec.StartedAt)
if runningFor > DefaultStaleExecutionThreshold {
logger.WithFields(logrus.Fields{
"job": j.Name,
"execution": exec.Key(),
"node": exec.NodeName,
"started_at": exec.StartedAt,
"running_for": runningFor.String(),
}).Warn("job: Cleaning up stale execution from storage")

exec.FinishedAt = time.Now().UTC()
exec.Success = false
exec.Output += "\nExecution marked as failed: detected as stale (not active on any node)"

execDoneReq := &proto.ExecutionDoneRequest{
Execution: exec.ToProto(),
}
cmd, err := Encode(ExecutionDoneType, execDoneReq)
if err != nil {
logger.WithError(err).WithFields(logrus.Fields{
"execution": exec.Key(),
"node": exec.NodeName,
}).Error("job: Error encoding stale execution cleanup")
continue
}
af := j.Agent.RaftApply(cmd)
if af != nil {
if err := af.Error(); err != nil {
logger.WithError(err).WithFields(logrus.Fields{
"execution": exec.Key(),
"node": exec.NodeName,
}).Error("job: Error applying stale execution cleanup")
}
}
continue
}

// Execution is not in active memory but hasn't exceeded the stale threshold.
// Conservatively block to avoid potential concurrent execution.
logger.WithFields(logrus.Fields{
"job": j.Name,
"concurrency": j.Concurrency,
"job_status": j.Status,
"running_count": len(runningExecs),
"execution": exec.Key(),
"node": exec.NodeName,
"started_at": exec.StartedAt,
"running_for": runningFor.String(),
}).Info("job: Skipping concurrent execution (found running execution in storage)")
return false
}
}

return true
Expand Down
Loading