Skip to content

Commit 87ba1b7

Browse files
authored
Task state machine (diggerhq#1320)
* update state machine states for tasks
1 parent c469a2c commit 87ba1b7

File tree

10 files changed

+395
-13
lines changed

10 files changed

+395
-13
lines changed

backend/ci_backends/github_actions.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ import (
99
)
1010

1111
type GithubActionCi struct {
12-
client *github.Client
12+
Client *github.Client
1313
}
1414

15-
func (g *GithubActionCi) TriggerWorkflow(repoOwner string, repoName string, job models.DiggerJob, jobString string, commentId int64) error {
16-
client := g.client
15+
func (g GithubActionCi) TriggerWorkflow(repoOwner string, repoName string, job models.DiggerJob, jobString string, commentId int64) error {
16+
client := g.Client
1717
log.Printf("TriggerGithubWorkflow: repoOwner: %v, repoName: %v, commentId: %v", repoOwner, repoName, commentId)
1818
_, err := client.Actions.CreateWorkflowDispatchEventByFileName(context.Background(), repoOwner, repoName, "digger_workflow.yml", github.CreateWorkflowDispatchEventRequest{
1919
Ref: job.Batch.BranchName,

backend/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,8 @@ github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki
285285
github.com/agext/levenshtein v1.2.2/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558=
286286
github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7lmo=
287287
github.com/agext/levenshtein v1.2.3/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558=
288+
github.com/alecthomas/kong v0.7.1 h1:azoTh0IOfwlAX3qN9sHWTxACE2oV8Bg2gAwBsMwDQY4=
289+
github.com/alecthomas/kong v0.7.1/go.mod h1:n1iCIO2xS46oE8ZfYCNDqdR0b0wZNrXAIAqro/2132U=
288290
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
289291
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
290292
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# drop the duplicate index to fix the next migration of renaming
2+
DROP INDEX "public"."idx_digger_job_id";
3+
DROP INDEX "idx_digger_run_queues_deleted_at";
4+
DROP INDEX "idx_digger_run_queue_project_id";
5+
DROP INDEX "idx_digger_run_queue_run_id";

backend/migrations/20240403155456.sql

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
-- Create index "idx_digger_job_id" to table: "digger_jobs"
2+
CREATE INDEX "idx_digger_job_id" ON "public"."digger_jobs" ("batch_id");
3+
-- Create index "idx_digger_run_batch_id" to table: "digger_run_stages"
4+
CREATE INDEX "idx_digger_run_batch_id" ON "public"."digger_run_stages" ("batch_id");
5+
-- Create "digger_run_queue_items" table
6+
CREATE TABLE "public"."digger_run_queue_items" (
7+
"id" bigserial NOT NULL,
8+
"created_at" timestamptz NULL,
9+
"updated_at" timestamptz NULL,
10+
"deleted_at" timestamptz NULL,
11+
"project_id" bigint NULL,
12+
"digger_run_id" bigint NULL,
13+
PRIMARY KEY ("id"),
14+
CONSTRAINT "fk_digger_run_queue_items_digger_run" FOREIGN KEY ("digger_run_id") REFERENCES "public"."digger_runs" ("id") ON UPDATE NO ACTION ON DELETE NO ACTION,
15+
CONSTRAINT "fk_digger_run_queue_items_project" FOREIGN KEY ("project_id") REFERENCES "public"."projects" ("id") ON UPDATE NO ACTION ON DELETE NO ACTION
16+
);
17+
-- Create index "idx_digger_run_queue_items_deleted_at" to table: "digger_run_queue_items"
18+
CREATE INDEX "idx_digger_run_queue_items_deleted_at" ON "public"."digger_run_queue_items" ("deleted_at");
19+
-- Create index "idx_digger_run_queue_project_id" to table: "digger_run_queue_items"
20+
CREATE INDEX "idx_digger_run_queue_project_id" ON "public"."digger_run_queue_items" ("project_id");
21+
-- Create index "idx_digger_run_queue_run_id" to table: "digger_run_queue_items"
22+
CREATE INDEX "idx_digger_run_queue_run_id" ON "public"."digger_run_queue_items" ("digger_run_id");
23+
-- Drop "digger_run_queues" table
24+
DROP TABLE "public"."digger_run_queues";

backend/migrations/atlas.sum

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:XKQNc29tdcumex/EDFrmq1Xlus0chqk3B9DYku62Hfs=
1+
h1:AwUIRwmx9E+aRp469t6izImEEgfQt6IHwtAKXumn4jo=
22
20231227132525.sql h1:43xn7XC0GoJsCnXIMczGXWis9d504FAWi4F1gViTIcw=
33
20240115170600.sql h1:IW8fF/8vc40+eWqP/xDK+R4K9jHJ9QBSGO6rN9LtfSA=
44
20240116123649.sql h1:R1JlUIgxxF6Cyob9HdtMqiKmx/BfnsctTl5rvOqssQw=
@@ -9,3 +9,5 @@ h1:XKQNc29tdcumex/EDFrmq1Xlus0chqk3B9DYku62Hfs=
99
20240329100957.sql h1:6IHn/Se6FwdmipMDPAPF0yChNNCuwxrEt4rgn+0gkLQ=
1010
20240329114422.sql h1:chXvrIUFNud2SdbRClWSCKXZ4MrMu0mpgE08Bou3pgk=
1111
20240402110915.sql h1:bG2Dvbzm3ZvFa29Feb0Bwj6KtAtZy1Vyuje6yV31msQ=
12+
20240403155357_drop_dup_idx.sql h1:6LyRtGfutHQompownriYYrq8us+Cdj4FTgWa7VPsXFA=
13+
20240403155456.sql h1:XJgyne416JMAV4xHA3IweHZos0ULrjFEJBqhWFjGNho=

backend/models/runs.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,14 @@ type DiggerRunStatus string
1111

1212
const (
1313
RunQueued DiggerRunStatus = "Queued"
14+
RunPendingPlan DiggerRunStatus = "Pending Plan"
15+
RunPlanning DiggerRunStatus = "Running Plan"
16+
RunPendingApproval DiggerRunStatus = "Pending Approval"
17+
RunApproved DiggerRunStatus = "Approved"
18+
RunPendingApply DiggerRunStatus = "Pending Apply"
19+
RunApplying DiggerRunStatus = "Running Apply"
1420
RunSucceeded DiggerRunStatus = "Succeeded"
1521
RunFailed DiggerRunStatus = "Failed"
16-
RunPendingApproval DiggerRunStatus = "Pending Approval"
1722
)
1823

1924
type RunType string
@@ -23,7 +28,7 @@ const (
2328
PlanOnly RunType = "Plan Only"
2429
)
2530

26-
type DiggerRunQueue struct {
31+
type DiggerRunQueueItem struct {
2732
gorm.Model
2833
ProjectId uint `gorm:"index:idx_digger_run_queue_project_id"`
2934
Project *Project
@@ -52,7 +57,7 @@ type DiggerRunStage struct {
5257
Run *DiggerRun
5358
RunID uint `gorm:"index:idx_digger_run_stage_id"`
5459
Batch *DiggerBatch
55-
BatchID *string `gorm:"index:idx_digger_job_id"`
60+
BatchID *string `gorm:"index:idx_digger_run_batch_id"`
5661
}
5762

5863
type SerializedRunStage struct {

backend/models/storage.go

Lines changed: 86 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,15 @@ func (db *Database) CreateDiggerBatch(githubInstallationId int64, repoOwner stri
594594
return batch, nil
595595
}
596596

597+
func (db *Database) UpdateDiggerBatch(batch *DiggerBatch) error {
598+
result := db.GormDB.Save(batch)
599+
if result.Error != nil {
600+
return result.Error
601+
}
602+
log.Printf("batch %v has been updated successfully\n", batch.ID)
603+
return nil
604+
}
605+
597606
func (db *Database) UpdateBatchStatus(batch *DiggerBatch) error {
598607
if batch.Status == scheduler.BatchJobInvalidated || batch.Status == scheduler.BatchJobFailed || batch.Status == scheduler.BatchJobSucceeded {
599608
return nil
@@ -646,6 +655,59 @@ func (db *Database) CreateDiggerJob(batchId uuid.UUID, serializedJob []byte, wor
646655
return job, nil
647656
}
648657

658+
func (db *Database) CreateDiggerRun(Triggertype string, PrNumber int, Status DiggerRunStatus, CommitId string, DiggerConfig string, GithubInstallationId int64, RepoId uint, ProjectID uint, RunType RunType) (*DiggerRun, error) {
659+
dr := &DiggerRun{
660+
Triggertype: Triggertype,
661+
PrNumber: &PrNumber,
662+
Status: Status,
663+
CommitId: CommitId,
664+
DiggerConfig: DiggerConfig,
665+
GithubInstallationId: GithubInstallationId,
666+
RepoId: RepoId,
667+
ProjectID: ProjectID,
668+
RunType: RunType,
669+
}
670+
result := db.GormDB.Save(dr)
671+
if result.Error != nil {
672+
log.Printf("Failed to create DiggerRun: %v, error: %v\n", dr.ID, result.Error)
673+
return nil, result.Error
674+
}
675+
log.Printf("DiggerRun %v, has been created successfully\n", dr.ID)
676+
return dr, nil
677+
}
678+
679+
func (db *Database) GetDiggerRun(id uint) (*DiggerRun, error) {
680+
dr := &DiggerRun{}
681+
result := db.GormDB.Where("id=? ", id).Find(dr)
682+
if result.Error != nil {
683+
return nil, result.Error
684+
}
685+
return dr, nil
686+
}
687+
688+
func (db *Database) CreateDiggerRunQueueItem(projectId uint, diggeRrunId uint) (*DiggerRunQueueItem, error) {
689+
drq := &DiggerRunQueueItem{
690+
ProjectId: projectId,
691+
DiggerRunId: diggeRrunId,
692+
}
693+
result := db.GormDB.Save(drq)
694+
if result.Error != nil {
695+
log.Printf("Failed to create DiggerRunQueueItem: %v, error: %v\n", drq.ID, result.Error)
696+
return nil, result.Error
697+
}
698+
log.Printf("DiggerRunQueueItem %v, has been created successfully\n", drq.ID)
699+
return drq, nil
700+
}
701+
702+
func (db *Database) GetDiggerRunQueueItem(id uint) (*DiggerRunQueueItem, error) {
703+
dr := &DiggerRunQueueItem{}
704+
result := db.GormDB.Preload("Project").Preload("DiggerRun").Where("id=? ", id).Find(dr)
705+
if result.Error != nil {
706+
return nil, result.Error
707+
}
708+
return dr, nil
709+
}
710+
649711
func (db *Database) GetDiggerJobFromRunStage(stage DiggerRunStage) (*DiggerJob, error) {
650712
job := &DiggerJob{}
651713
result := db.GormDB.Take(job, "batch_id = ?", stage.BatchID)
@@ -659,8 +721,27 @@ func (db *Database) GetDiggerJobFromRunStage(stage DiggerRunStage) (*DiggerJob,
659721
return job, nil
660722
}
661723

662-
func (db *Database) GetFirstRunQueueForEveryProject() ([]DiggerRunQueue, error) {
663-
var runqueues []DiggerRunQueue
724+
func (db *Database) UpdateDiggerRun(diggerRun *DiggerRun) error {
725+
result := db.GormDB.Save(diggerRun)
726+
if result.Error != nil {
727+
return result.Error
728+
}
729+
log.Printf("diggerRun %v has been updated successfully\n", diggerRun.ID)
730+
return nil
731+
}
732+
733+
func (db *Database) DequeueRunItem(queueItem *DiggerRunQueueItem) error {
734+
log.Printf("DiggerRunQueueItem Deleting: %v", queueItem.ID)
735+
result := db.GormDB.Delete(queueItem)
736+
if result.Error != nil {
737+
return result.Error
738+
}
739+
log.Printf("diggerRunQueueItem %v has been deleted successfully\n", queueItem.ID)
740+
return nil
741+
}
742+
743+
func (db *Database) GetFirstRunQueueForEveryProject() ([]DiggerRunQueueItem, error) {
744+
var runqueues []DiggerRunQueueItem
664745
query := `WITH RankedRuns AS (
665746
SELECT
666747
digger_run_queues.digger_run_id,
@@ -690,11 +771,11 @@ WHERE
690771
}
691772

692773
// 2. Preload Project and DiggerRun for every DiggerrunQueue item (front of queue)
693-
var runqueuesWithData []DiggerRunQueue
694-
projectIds := lo.Map(runqueues, func(run DiggerRunQueue, index int) uint {
774+
var runqueuesWithData []DiggerRunQueueItem
775+
projectIds := lo.Map(runqueues, func(run DiggerRunQueueItem, index int) uint {
695776
return run.ProjectId
696777
})
697-
diggerRunIds := lo.Map(runqueues, func(run DiggerRunQueue, index int) uint {
778+
diggerRunIds := lo.Map(runqueues, func(run DiggerRunQueueItem, index int) uint {
698779
return run.DiggerRunId
699780
})
700781

backend/tasks/runs.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package main
2+
3+
import (
4+
"github.com/diggerhq/digger/backend/ci_backends"
5+
"github.com/diggerhq/digger/backend/models"
6+
orchestrator_scheduler "github.com/diggerhq/digger/libs/orchestrator/scheduler"
7+
"log"
8+
)
9+
10+
func RunQueuesStateMachine(queueItem *models.DiggerRunQueueItem, CIBackend ci_backends.CiBackend) {
11+
dr := queueItem.DiggerRun
12+
switch queueItem.DiggerRun.Status {
13+
case models.RunQueued:
14+
// trigger plan workflow (trigger the batch)
15+
// .....
16+
// change status to RunPendingPlan
17+
log.Printf("Updating run queueItem item to planning state")
18+
dr.Status = models.RunPlanning
19+
err := models.DB.UpdateDiggerRun(&dr)
20+
if err != nil {
21+
log.Printf("ERROR: Failed to update Digger Run for queueID: %v [%v %v]", queueItem.ID, queueItem.DiggerRunId, queueItem.ProjectId)
22+
}
23+
case models.RunPlanning:
24+
// Check the status of the batch
25+
batchStatus := orchestrator_scheduler.BatchJobSucceeded
26+
approvalRequired := true
27+
28+
// if failed then go straight to failed
29+
if batchStatus == orchestrator_scheduler.BatchJobFailed {
30+
dr.Status = models.RunFailed
31+
err := models.DB.UpdateDiggerRun(&dr)
32+
if err != nil {
33+
log.Printf("ERROR: Failed to update Digger Run for queueID: %v [%v %v]", queueItem.ID, queueItem.DiggerRunId, queueItem.ProjectId)
34+
}
35+
err = models.DB.DequeueRunItem(queueItem)
36+
if err != nil {
37+
log.Printf("ERROR: Failed to delete queueItem item: %v [%v %v]", queueItem.ID, queueItem.DiggerRunId, queueItem.ProjectId)
38+
}
39+
}
40+
41+
// if successful then
42+
if batchStatus == orchestrator_scheduler.BatchJobSucceeded && approvalRequired {
43+
dr.Status = models.RunPendingApproval
44+
err := models.DB.UpdateDiggerRun(&dr)
45+
if err != nil {
46+
log.Printf("ERROR: Failed to update Digger Run for queueID: %v [%v %v]", queueItem.ID, queueItem.DiggerRunId, queueItem.ProjectId)
47+
}
48+
} else {
49+
dr.Status = models.RunApproved
50+
err := models.DB.UpdateDiggerRun(&dr)
51+
if err != nil {
52+
log.Printf("ERROR: Failed to update Digger Run for queueID: %v [%v %v]", queueItem.ID, queueItem.DiggerRunId, queueItem.ProjectId)
53+
}
54+
}
55+
56+
case models.RunPendingApproval:
57+
// do nothing
58+
case models.RunApproved:
59+
// trigger apply stage workflow
60+
// ...
61+
dr.Status = models.RunApplying
62+
err := models.DB.UpdateDiggerRun(&dr)
63+
if err != nil {
64+
log.Printf("ERROR: Failed to update Digger Run for queueID: %v [%v %v]", queueItem.ID, queueItem.DiggerRunId, queueItem.ProjectId)
65+
}
66+
67+
case models.RunApplying:
68+
// Check the status of the batch
69+
batchStatus := orchestrator_scheduler.BatchJobSucceeded
70+
71+
// if failed then go straight to failed
72+
if batchStatus == orchestrator_scheduler.BatchJobFailed {
73+
dr.Status = models.RunFailed
74+
err := models.DB.UpdateDiggerRun(&dr)
75+
if err != nil {
76+
log.Printf("ERROR: Failed to update Digger Run for queueID: %v [%v %v]", queueItem.ID, queueItem.DiggerRunId, queueItem.ProjectId)
77+
}
78+
err = models.DB.DequeueRunItem(queueItem)
79+
if err != nil {
80+
log.Printf("ERROR: Failed to delete queueItem item: %v [%v %v]", queueItem.ID, queueItem.DiggerRunId, queueItem.ProjectId)
81+
}
82+
}
83+
84+
// if successful then
85+
if batchStatus == orchestrator_scheduler.BatchJobSucceeded {
86+
dr.Status = models.RunSucceeded
87+
err := models.DB.UpdateDiggerRun(&dr)
88+
if err != nil {
89+
log.Printf("ERROR: Failed to update Digger Run for queueID: %v [%v %v]", queueItem.ID, queueItem.DiggerRunId, queueItem.ProjectId)
90+
}
91+
}
92+
93+
case models.RunSucceeded:
94+
// dequeue
95+
err := models.DB.DequeueRunItem(queueItem)
96+
if err != nil {
97+
log.Printf("ERROR: Failed to delete queueItem item: %v [%v %v]", queueItem.ID, queueItem.DiggerRunId, queueItem.ProjectId)
98+
}
99+
case models.RunFailed:
100+
// dequeue
101+
err := models.DB.DequeueRunItem(queueItem)
102+
if err != nil {
103+
log.Printf("ERROR: Failed to delete queueItem item: %v [%v %v]", queueItem.ID, queueItem.DiggerRunId, queueItem.ProjectId)
104+
}
105+
default:
106+
log.Printf("WARN: Recieived unknown DiggerRunStatus: %v", queueItem.DiggerRun.Status)
107+
}
108+
}

0 commit comments

Comments
 (0)