Skip to content

Commit 0a003bc

Browse files
author
willzhen
committed
Migrate code
1 parent 000217c commit 0a003bc

17 files changed

+1381
-0
lines changed

container/combination_container.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package container
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
lighttaskscheduler "github.com/memory-overflow/light-task-scheduler"
8+
memeorycontainer "github.com/memory-overflow/light-task-scheduler/container/memory_container"
9+
persistcontainer "github.com/memory-overflow/light-task-scheduler/container/persist_container"
10+
)
11+
12+
// combinationContainer 组合 MemeoryContainer 和 Persistcontainer 的容器
13+
// 整合两种容器的优点,既能够通过内存实现快写快读,又能够通过DB实现可持久化
14+
type combinationContainer struct {
15+
memeoryContainer memeorycontainer.MemeoryContainer
16+
persistContainer persistcontainer.PersistContainer
17+
}
18+
19+
// MakeCombinationContainer 构造队列型任务容器
20+
func MakeCombinationContainer(memeoryContainer memeorycontainer.MemeoryContainer,
21+
persistContainer persistcontainer.PersistContainer) *combinationContainer {
22+
com := &combinationContainer{
23+
memeoryContainer: memeoryContainer,
24+
persistContainer: persistContainer,
25+
}
26+
ctx := context.Background()
27+
if tasks, err := persistContainer.GetRunningTask(ctx); err == nil {
28+
for _, t := range tasks {
29+
memeoryContainer.AddRunningTask(ctx, t)
30+
}
31+
}
32+
for {
33+
batchSize := 1000
34+
if tasks, err := persistContainer.GetWaitingTask(ctx, int32(batchSize)); err == nil {
35+
for _, t := range tasks {
36+
memeoryContainer.AddTask(ctx, t)
37+
}
38+
if len(tasks) < batchSize {
39+
break
40+
}
41+
} else {
42+
break
43+
}
44+
}
45+
return com
46+
}
47+
48+
// AddTask 添加任务
49+
func (c *combinationContainer) AddTask(ctx context.Context, task lighttaskscheduler.Task) (err error) {
50+
if err = c.memeoryContainer.AddTask(ctx, task); err != nil {
51+
return fmt.Errorf("memeoryContainer AddTask error: %v", err)
52+
}
53+
defer func() {
54+
if err != nil {
55+
c.memeoryContainer.ToDeleteStatus(ctx, &task)
56+
}
57+
}()
58+
if err = c.persistContainer.AddTask(ctx, task); err != nil {
59+
return fmt.Errorf("persistContainer AddTask error: %v", err)
60+
}
61+
return nil
62+
}
63+
64+
// GetRunningTask 获取运行中的任务
65+
func (c *combinationContainer) GetRunningTask(ctx context.Context) (tasks []lighttaskscheduler.Task, err error) {
66+
return c.memeoryContainer.GetRunningTask(ctx)
67+
}
68+
69+
// GetRunningTaskCount 获取运行中的任务数
70+
func (c *combinationContainer) GetRunningTaskCount(ctx context.Context) (count int32, err error) {
71+
return c.memeoryContainer.GetRunningTaskCount(ctx)
72+
}
73+
74+
// GetWaitingTask 获取等待中的任务
75+
func (c *combinationContainer) GetWaitingTask(ctx context.Context, limit int32) (tasks []lighttaskscheduler.Task, err error) {
76+
return c.memeoryContainer.GetWaitingTask(ctx, limit)
77+
}
78+
79+
// ToRunningStatus 转移到运行中的状态
80+
func (c *combinationContainer) ToRunningStatus(ctx context.Context, task *lighttaskscheduler.Task) (
81+
newTask *lighttaskscheduler.Task, err error) {
82+
if newTask, err = c.persistContainer.ToRunningStatus(ctx, task); err != nil {
83+
return newTask, fmt.Errorf("persistContainer ToRunningStatus error: %v", err)
84+
}
85+
if newTask, err = c.memeoryContainer.ToRunningStatus(ctx, task); err != nil {
86+
return newTask, fmt.Errorf("memeoryContainer ToRunningStatus error: %v", err)
87+
}
88+
return newTask, nil
89+
}
90+
91+
// ToExportStatus 转移到停止状态
92+
func (c *combinationContainer) ToStopStatus(ctx context.Context, task *lighttaskscheduler.Task) (
93+
newTask *lighttaskscheduler.Task, err error) {
94+
95+
if newTask, err = c.persistContainer.ToStopStatus(ctx, task); err != nil {
96+
return newTask, fmt.Errorf("persistContainer ToStopStatus error: %v", err)
97+
}
98+
if newTask, err = c.memeoryContainer.ToStopStatus(ctx, task); err != nil {
99+
return newTask, fmt.Errorf("memeoryContainer ToRunningStatus error: %v", err)
100+
}
101+
return newTask, nil
102+
}
103+
104+
// ToExportStatus 转移到删除状态
105+
func (c *combinationContainer) ToDeleteStatus(ctx context.Context, task *lighttaskscheduler.Task) (
106+
newTask *lighttaskscheduler.Task, err error) {
107+
if newTask, err = c.persistContainer.ToDeleteStatus(ctx, task); err != nil {
108+
return newTask, fmt.Errorf("persistContainer ToStopStatus error: %v", err)
109+
}
110+
if newTask, err = c.memeoryContainer.ToDeleteStatus(ctx, task); err != nil {
111+
return newTask, fmt.Errorf("memeoryContainer ToRunningStatus error: %v", err)
112+
}
113+
return newTask, nil
114+
}
115+
116+
// ToFailedStatus 转移到失败状态
117+
func (c *combinationContainer) ToFailedStatus(ctx context.Context, task *lighttaskscheduler.Task, reason error) (
118+
newTask *lighttaskscheduler.Task, err error) {
119+
if newTask, err = c.persistContainer.ToFailedStatus(ctx, task, reason); err != nil {
120+
return newTask, fmt.Errorf("persistContainer ToFailedStatus error: %v", err)
121+
}
122+
if newTask, err = c.memeoryContainer.ToFailedStatus(ctx, task, reason); err != nil {
123+
return newTask, fmt.Errorf("memeoryContainer ToFailedStatus error: %v", err)
124+
}
125+
return newTask, nil
126+
}
127+
128+
// ToExportStatus 转移到数据导出状态
129+
func (c *combinationContainer) ToExportStatus(ctx context.Context, task *lighttaskscheduler.Task) (
130+
newTask *lighttaskscheduler.Task, err error) {
131+
if newTask, err = c.persistContainer.ToExportStatus(ctx, task); err != nil {
132+
return newTask, fmt.Errorf("persistContainer ToExportStatus error: %v", err)
133+
}
134+
if newTask, err = c.memeoryContainer.ToExportStatus(ctx, task); err != nil {
135+
return newTask, fmt.Errorf("memeoryContainer ToExportStatus error: %v", err)
136+
}
137+
return newTask, nil
138+
}
139+
140+
// ToSuccessStatus 转移到执行成功状态
141+
func (c *combinationContainer) ToSuccessStatus(ctx context.Context, task *lighttaskscheduler.Task) (
142+
newTask *lighttaskscheduler.Task, err error) {
143+
if newTask, err = c.persistContainer.ToSuccessStatus(ctx, task); err != nil {
144+
return newTask, fmt.Errorf("persistContainer ToSuccessStatus error: %v", err)
145+
}
146+
if newTask, err = c.memeoryContainer.ToSuccessStatus(ctx, task); err != nil {
147+
return newTask, fmt.Errorf("memeoryContainer ToSuccessStatus error: %v", err)
148+
}
149+
return newTask, nil
150+
}
151+
152+
// UpdateRunningTaskStatus 更新执行中的任务状态
153+
func (c *combinationContainer) UpdateRunningTaskStatus(ctx context.Context,
154+
task *lighttaskscheduler.Task, status lighttaskscheduler.AsyncTaskStatus) error {
155+
if err := c.persistContainer.UpdateRunningTaskStatus(ctx, task, status); err != nil {
156+
return fmt.Errorf("persistContainer UpdateRunningTaskStatus error: %v", err)
157+
}
158+
if err := c.memeoryContainer.UpdateRunningTaskStatus(ctx, task, status); err != nil {
159+
return fmt.Errorf("memeoryContainer UpdateRunningTaskStatus error: %v", err)
160+
}
161+
return nil
162+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package memeorycontainer
2+
3+
import (
4+
"context"
5+
6+
lighttaskscheduler "github.com/memory-overflow/light-task-scheduler"
7+
)
8+
9+
// MemeoryContainer 内存型任务容器,优先:快读快写,缺点:不可持久化,
10+
type MemeoryContainer interface {
11+
lighttaskscheduler.TaskContainer
12+
13+
// AddRunningTask 向容器添加正在运行中的任务
14+
// 对于某些可持久化任务,调度器如果因为某些原因退出,需要从 db 中恢复状态,这个接口用来向容器中添加恢复前还在执行中的任务
15+
AddRunningTask(ctx context.Context, task lighttaskscheduler.Task) (err error)
16+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package memeorycontainer
2+
3+
import (
4+
"context"
5+
6+
lighttaskscheduler "github.com/memory-overflow/light-task-scheduler"
7+
)
8+
9+
// orderedMapContainer OrderedMap 作为容器,支持任务优先级,多进程数据无法共享数据
10+
type orderedMapContainer struct {
11+
MemeoryContainer
12+
// TODO
13+
}
14+
15+
// MakeOrderedMapContainer 构造队列型任务容器
16+
func MakeOrderedMapContainer() *orderedMapContainer {
17+
// TODO
18+
return &orderedMapContainer{}
19+
}
20+
21+
// AddTask 添加任务
22+
func (o *orderedMapContainer) AddTask(ctx context.Context, task lighttaskscheduler.Task) (err error) {
23+
// TODO
24+
return nil
25+
}
26+
27+
// GetRunningTask 获取运行中的任务
28+
func (o *orderedMapContainer) GetRunningTask(ctx context.Context) (tasks []lighttaskscheduler.Task, err error) {
29+
// TODO
30+
return tasks, err
31+
}
32+
33+
// GetRunningTaskCount 获取运行中的任务数
34+
func (o *orderedMapContainer) GetRunningTaskCount(ctx context.Context) (count int32, err error) {
35+
// TODO
36+
return 0, nil
37+
}
38+
39+
// GetWaitingTask 获取等待中的任务
40+
func (o *orderedMapContainer) GetWaitingTask(ctx context.Context, limit int32) (tasks []lighttaskscheduler.Task, err error) {
41+
// TODO
42+
return tasks, nil
43+
}
44+
45+
// ToRunningStatus 转移到运行中的状态
46+
func (o *orderedMapContainer) ToRunningStatus(ctx context.Context, task *lighttaskscheduler.Task) (
47+
newTask *lighttaskscheduler.Task, err error) {
48+
// TODO
49+
return task, nil
50+
}
51+
52+
// ToExportStatus 转移到停止状态
53+
func (o *orderedMapContainer) ToStopStatus(ctx context.Context, task *lighttaskscheduler.Task) (
54+
newTask *lighttaskscheduler.Task, err error) {
55+
// TODO
56+
return task, nil
57+
}
58+
59+
// ToExportStatus 转移到删除状态
60+
func (o *orderedMapContainer) ToDeleteStatus(ctx context.Context, task *lighttaskscheduler.Task) (
61+
newTask *lighttaskscheduler.Task, err error) {
62+
// TODO
63+
return task, nil
64+
}
65+
66+
// ToFailedStatus 转移到失败状态
67+
func (o *orderedMapContainer) ToFailedStatus(ctx context.Context, task *lighttaskscheduler.Task, reason error) (
68+
newTask *lighttaskscheduler.Task, err error) {
69+
// TODO
70+
return
71+
}
72+
73+
// ToExportStatus 转移到数据导出状态
74+
func (o *orderedMapContainer) ToExportStatus(ctx context.Context, task *lighttaskscheduler.Task) (
75+
newTask *lighttaskscheduler.Task, err error) {
76+
// TODO
77+
return task, nil
78+
}
79+
80+
// ToSuccessStatus 转移到执行成功状态
81+
func (o *orderedMapContainer) ToSuccessStatus(ctx context.Context, task *lighttaskscheduler.Task) (
82+
newTask *lighttaskscheduler.Task, err error) {
83+
// TODO
84+
return task, nil
85+
}
86+
87+
// UpdateRunningTaskStatus 更新执行中的任务状态
88+
func (o *orderedMapContainer) UpdateRunningTaskStatus(ctx context.Context,
89+
task *lighttaskscheduler.Task, status lighttaskscheduler.AsyncTaskStatus) error {
90+
// TODO
91+
return nil
92+
}

0 commit comments

Comments
 (0)