376 lines
9.7 KiB
Go
376 lines
9.7 KiB
Go
package xxl
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io/ioutil"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
// Executor 执行器
|
|
type Executor interface {
|
|
// Init 初始化
|
|
Init(...Option)
|
|
// LogHandler 日志查询
|
|
LogHandler(handler LogHandler)
|
|
// Use 使用中间件
|
|
Use(middlewares ...Middleware)
|
|
// RegTask 注册任务
|
|
RegTask(pattern string, task TaskFunc)
|
|
// RunTask 运行任务
|
|
RunTask(writer http.ResponseWriter, request *http.Request)
|
|
// KillTask 杀死任务
|
|
KillTask(writer http.ResponseWriter, request *http.Request)
|
|
// TaskLog 任务日志
|
|
TaskLog(writer http.ResponseWriter, request *http.Request)
|
|
// Beat 心跳检测
|
|
Beat(writer http.ResponseWriter, request *http.Request)
|
|
// IdleBeat 忙碌检测
|
|
IdleBeat(writer http.ResponseWriter, request *http.Request)
|
|
// Run 运行服务
|
|
Run() error
|
|
// Stop 停止服务
|
|
Stop()
|
|
}
|
|
|
|
// NewExecutor 创建执行器
|
|
func NewExecutor(opts ...Option) Executor {
|
|
return newExecutor(opts...)
|
|
}
|
|
|
|
func newExecutor(opts ...Option) *executor {
|
|
options := newOptions(opts...)
|
|
e := &executor{
|
|
opts: options,
|
|
}
|
|
return e
|
|
}
|
|
|
|
type executor struct {
|
|
opts Options
|
|
address string
|
|
regList *taskList //注册任务列表
|
|
runList *taskList //正在执行任务列表
|
|
mu sync.RWMutex
|
|
log Logger
|
|
|
|
logHandler LogHandler //日志查询handler
|
|
middlewares []Middleware //中间件
|
|
}
|
|
|
|
func (e *executor) Init(opts ...Option) {
|
|
for _, o := range opts {
|
|
o(&e.opts)
|
|
}
|
|
e.log = e.opts.l
|
|
e.regList = &taskList{
|
|
data: make(map[string]*Task),
|
|
}
|
|
e.runList = &taskList{
|
|
data: make(map[string]*Task),
|
|
}
|
|
e.address = e.opts.ExecutorIp + ":" + e.opts.ExecutorPort
|
|
go e.registry()
|
|
}
|
|
|
|
// LogHandler 日志handler
|
|
func (e *executor) LogHandler(handler LogHandler) {
|
|
e.logHandler = handler
|
|
}
|
|
|
|
func (e *executor) Use(middlewares ...Middleware) {
|
|
e.middlewares = middlewares
|
|
}
|
|
|
|
func (e *executor) Run() (err error) {
|
|
// 创建路由器
|
|
mux := http.NewServeMux()
|
|
// 设置路由规则
|
|
mux.HandleFunc("/run", e.runTask)
|
|
mux.HandleFunc("/kill", e.killTask)
|
|
mux.HandleFunc("/log", e.taskLog)
|
|
mux.HandleFunc("/beat", e.beat)
|
|
mux.HandleFunc("/idleBeat", e.idleBeat)
|
|
// 创建服务器
|
|
server := &http.Server{
|
|
Addr: ":" + e.opts.ExecutorPort,
|
|
WriteTimeout: time.Second * 3,
|
|
Handler: mux,
|
|
}
|
|
// 监听端口并提供服务
|
|
e.log.Info("Starting server at " + e.address)
|
|
go server.ListenAndServe()
|
|
quit := make(chan os.Signal)
|
|
signal.Notify(quit, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM)
|
|
<-quit
|
|
e.registryRemove()
|
|
return nil
|
|
}
|
|
|
|
func (e *executor) Stop() {
|
|
e.registryRemove()
|
|
}
|
|
|
|
// RegTask 注册任务
|
|
func (e *executor) RegTask(pattern string, task TaskFunc) {
|
|
var t = &Task{}
|
|
t.fn = e.chain(task)
|
|
e.regList.Set(pattern, t)
|
|
return
|
|
}
|
|
|
|
// 运行一个任务
|
|
func (e *executor) runTask(writer http.ResponseWriter, request *http.Request) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
req, _ := ioutil.ReadAll(request.Body)
|
|
param := &RunReq{}
|
|
err := json.Unmarshal(req, ¶m)
|
|
if err != nil {
|
|
_, _ = writer.Write(returnCall(param, FailureCode, "params err"))
|
|
e.log.Error("参数解析错误:" + string(req))
|
|
return
|
|
}
|
|
e.log.Info("任务参数:%v", param)
|
|
if !e.regList.Exists(param.ExecutorHandler) {
|
|
_, _ = writer.Write(returnCall(param, FailureCode, "Task not registered"))
|
|
e.log.Error("任务[" + Int64ToStr(param.JobID) + "]没有注册:" + param.ExecutorHandler)
|
|
return
|
|
}
|
|
|
|
//阻塞策略处理
|
|
if e.runList.Exists(Int64ToStr(param.JobID)) {
|
|
if param.ExecutorBlockStrategy == coverEarly { //覆盖之前调度
|
|
oldTask := e.runList.Get(Int64ToStr(param.JobID))
|
|
if oldTask != nil {
|
|
oldTask.Cancel()
|
|
e.runList.Del(Int64ToStr(oldTask.Id))
|
|
}
|
|
} else { //单机串行,丢弃后续调度 都进行阻塞
|
|
_, _ = writer.Write(returnCall(param, FailureCode, "There are tasks running"))
|
|
e.log.Error("任务[" + Int64ToStr(param.JobID) + "]已经在运行了:" + param.ExecutorHandler)
|
|
return
|
|
}
|
|
}
|
|
|
|
cxt := context.Background()
|
|
task := e.regList.Get(param.ExecutorHandler)
|
|
if param.ExecutorTimeout > 0 {
|
|
task.Ext, task.Cancel = context.WithTimeout(cxt, time.Duration(param.ExecutorTimeout)*time.Second)
|
|
} else {
|
|
task.Ext, task.Cancel = context.WithCancel(cxt)
|
|
}
|
|
task.Id = param.JobID
|
|
task.Name = param.ExecutorHandler
|
|
task.Param = param
|
|
task.log = e.log
|
|
|
|
e.runList.Set(Int64ToStr(task.Id), task)
|
|
go task.Run(func(code int64, msg string) {
|
|
e.callback(task, code, msg)
|
|
})
|
|
e.log.Info("任务[" + Int64ToStr(param.JobID) + "]开始执行:" + param.ExecutorHandler)
|
|
_, _ = writer.Write(returnGeneral())
|
|
}
|
|
|
|
// 删除一个任务
|
|
func (e *executor) killTask(writer http.ResponseWriter, request *http.Request) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
req, _ := ioutil.ReadAll(request.Body)
|
|
param := &killReq{}
|
|
_ = json.Unmarshal(req, ¶m)
|
|
if !e.runList.Exists(Int64ToStr(param.JobID)) {
|
|
_, _ = writer.Write(returnKill(param, FailureCode))
|
|
e.log.Error("任务[" + Int64ToStr(param.JobID) + "]没有运行")
|
|
return
|
|
}
|
|
task := e.runList.Get(Int64ToStr(param.JobID))
|
|
task.Cancel()
|
|
e.runList.Del(Int64ToStr(param.JobID))
|
|
_, _ = writer.Write(returnGeneral())
|
|
}
|
|
|
|
// 任务日志
|
|
func (e *executor) taskLog(writer http.ResponseWriter, request *http.Request) {
|
|
var res *LogRes
|
|
data, err := ioutil.ReadAll(request.Body)
|
|
req := &LogReq{}
|
|
if err != nil {
|
|
e.log.Error("日志请求失败:" + err.Error())
|
|
reqErrLogHandler(writer, req, err)
|
|
return
|
|
}
|
|
err = json.Unmarshal(data, &req)
|
|
if err != nil {
|
|
e.log.Error("日志请求解析失败:" + err.Error())
|
|
reqErrLogHandler(writer, req, err)
|
|
return
|
|
}
|
|
e.log.Info("日志请求参数:%+v", req)
|
|
if e.logHandler != nil {
|
|
res = e.logHandler(req)
|
|
} else {
|
|
res = defaultLogHandler(req)
|
|
}
|
|
str, _ := json.Marshal(res)
|
|
_, _ = writer.Write(str)
|
|
}
|
|
|
|
// 心跳检测
|
|
func (e *executor) beat(writer http.ResponseWriter, request *http.Request) {
|
|
e.log.Info("心跳检测")
|
|
_, _ = writer.Write(returnGeneral())
|
|
}
|
|
|
|
// 忙碌检测
|
|
func (e *executor) idleBeat(writer http.ResponseWriter, request *http.Request) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
defer request.Body.Close()
|
|
req, _ := ioutil.ReadAll(request.Body)
|
|
param := &idleBeatReq{}
|
|
err := json.Unmarshal(req, ¶m)
|
|
if err != nil {
|
|
_, _ = writer.Write(returnIdleBeat(FailureCode))
|
|
e.log.Error("参数解析错误:" + string(req))
|
|
return
|
|
}
|
|
if e.runList.Exists(Int64ToStr(param.JobID)) {
|
|
_, _ = writer.Write(returnIdleBeat(FailureCode))
|
|
e.log.Error("idleBeat任务[" + Int64ToStr(param.JobID) + "]正在运行")
|
|
return
|
|
}
|
|
e.log.Info("忙碌检测任务参数:%v", param)
|
|
_, _ = writer.Write(returnGeneral())
|
|
}
|
|
|
|
// 注册执行器到调度中心
|
|
func (e *executor) registry() {
|
|
|
|
t := time.NewTimer(time.Second * 0) //初始立即执行
|
|
defer t.Stop()
|
|
req := &Registry{
|
|
RegistryGroup: "EXECUTOR",
|
|
RegistryKey: e.opts.RegistryKey,
|
|
RegistryValue: "http://" + e.address,
|
|
}
|
|
param, err := json.Marshal(req)
|
|
if err != nil {
|
|
log.Fatal("执行器注册信息解析失败:" + err.Error())
|
|
}
|
|
for {
|
|
<-t.C
|
|
t.Reset(time.Second * time.Duration(20)) //20秒心跳防止过期
|
|
func() {
|
|
result, err := e.post("/api/registry", string(param))
|
|
if err != nil {
|
|
e.log.Error("执行器注册失败1:" + err.Error())
|
|
return
|
|
}
|
|
defer result.Body.Close()
|
|
body, err := ioutil.ReadAll(result.Body)
|
|
if err != nil {
|
|
e.log.Error("执行器注册失败2:" + err.Error())
|
|
return
|
|
}
|
|
res := &res{}
|
|
_ = json.Unmarshal(body, &res)
|
|
if res.Code != SuccessCode {
|
|
e.log.Error("执行器注册失败3:" + string(body))
|
|
return
|
|
}
|
|
//e.log.Info("执行器注册成功:" + string(body))
|
|
}()
|
|
|
|
}
|
|
}
|
|
|
|
// 执行器注册摘除
|
|
func (e *executor) registryRemove() {
|
|
t := time.NewTimer(time.Second * 0) //初始立即执行
|
|
defer t.Stop()
|
|
req := &Registry{
|
|
RegistryGroup: "EXECUTOR",
|
|
RegistryKey: e.opts.RegistryKey,
|
|
RegistryValue: "http://" + e.address,
|
|
}
|
|
param, err := json.Marshal(req)
|
|
if err != nil {
|
|
e.log.Error("执行器摘除失败:" + err.Error())
|
|
return
|
|
}
|
|
res, err := e.post("/api/registryRemove", string(param))
|
|
if err != nil {
|
|
e.log.Error("执行器摘除失败:" + err.Error())
|
|
return
|
|
}
|
|
defer res.Body.Close()
|
|
body, err := ioutil.ReadAll(res.Body)
|
|
e.log.Info("执行器摘除成功:" + string(body))
|
|
}
|
|
|
|
// 回调任务列表
|
|
func (e *executor) callback(task *Task, code int64, msg string) {
|
|
e.runList.Del(Int64ToStr(task.Id))
|
|
res, err := e.post("/api/callback", string(returnCall(task.Param, code, msg)))
|
|
if err != nil {
|
|
e.log.Error("callback err : ", err.Error())
|
|
return
|
|
}
|
|
defer res.Body.Close()
|
|
body, err := ioutil.ReadAll(res.Body)
|
|
if err != nil {
|
|
e.log.Error("callback ReadAll err : ", err.Error())
|
|
return
|
|
}
|
|
e.log.Info("任务回调成功:" + string(body))
|
|
}
|
|
|
|
// post
|
|
func (e *executor) post(action, body string) (resp *http.Response, err error) {
|
|
request, err := http.NewRequest("POST", e.opts.ServerAddr+action, strings.NewReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
request.Header.Set("Content-Type", "application/json;charset=UTF-8")
|
|
request.Header.Set("XXL-JOB-ACCESS-TOKEN", e.opts.AccessToken)
|
|
client := http.Client{
|
|
Timeout: e.opts.Timeout,
|
|
}
|
|
return client.Do(request)
|
|
}
|
|
|
|
// RunTask 运行任务
|
|
func (e *executor) RunTask(writer http.ResponseWriter, request *http.Request) {
|
|
e.runTask(writer, request)
|
|
}
|
|
|
|
// KillTask 删除任务
|
|
func (e *executor) KillTask(writer http.ResponseWriter, request *http.Request) {
|
|
e.killTask(writer, request)
|
|
}
|
|
|
|
// TaskLog 任务日志
|
|
func (e *executor) TaskLog(writer http.ResponseWriter, request *http.Request) {
|
|
e.taskLog(writer, request)
|
|
}
|
|
|
|
// Beat 心跳检测
|
|
func (e *executor) Beat(writer http.ResponseWriter, request *http.Request) {
|
|
e.beat(writer, request)
|
|
}
|
|
|
|
// IdleBeat 忙碌检测
|
|
func (e *executor) IdleBeat(writer http.ResponseWriter, request *http.Request) {
|
|
e.idleBeat(writer, request)
|
|
}
|