| package internal |
|
|
| import ( |
| "bufio" |
| "bytes" |
| "fmt" |
| "io" |
| "log/slog" |
| "net/http" |
| "os" |
| "os/exec" |
| "strconv" |
| "strings" |
| "sync/atomic" |
| "syscall" |
| "time" |
|
|
| "github.com/go-resty/resty/v2" |
| "github.com/gogf/gf/container/gmap" |
| "github.com/google/uuid" |
| ) |
|
|
| type Worker struct { |
| ChannelName string |
| HttpServerPort int32 |
| LogFile string |
| Log2Stdout bool |
| PropertyJsonFile string |
| GraphName string |
| Pid int |
| QuitTimeoutSeconds int |
| CreateTs int64 |
| UpdateTs int64 |
| } |
|
|
| type WorkerUpdateReq struct { |
| RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` |
| ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` |
| Collection string `form:"collection,omitempty" json:"collection"` |
| FileName string `form:"filename,omitempty" json:"filename"` |
| Path string `form:"path,omitempty" json:"path,omitempty"` |
| Ten *WorkerUpdateReqTen `form:"_ten,omitempty" json:"_ten,omitempty"` |
| } |
|
|
| type WorkerUpdateReqTen struct { |
| Name string `form:"name,omitempty" json:"name,omitempty"` |
| Type string `form:"type,omitempty" json:"type,omitempty"` |
| } |
|
|
| const ( |
| workerCleanSleepSeconds = 5 |
| workerExec = "/app/agents/bin/start" |
| workerHttpServerUrl = "http://127.0.0.1" |
| ) |
|
|
| var ( |
| workers = gmap.New(true) |
| httpServerPort = httpServerPortMin |
| httpServerPortMin = int32(10000) |
| httpServerPortMax = int32(30000) |
| ) |
|
|
| func newWorker(channelName string, logFile string, log2Stdout bool, propertyJsonFile string) *Worker { |
| return &Worker{ |
| ChannelName: channelName, |
| LogFile: logFile, |
| Log2Stdout: log2Stdout, |
| PropertyJsonFile: propertyJsonFile, |
| QuitTimeoutSeconds: 60, |
| CreateTs: time.Now().Unix(), |
| UpdateTs: time.Now().Unix(), |
| } |
| } |
|
|
| func getHttpServerPort() int32 { |
| if atomic.LoadInt32(&httpServerPort) > httpServerPortMax { |
| atomic.StoreInt32(&httpServerPort, httpServerPortMin) |
| } |
|
|
| atomic.AddInt32(&httpServerPort, 1) |
| return httpServerPort |
| } |
|
|
| |
| type PrefixWriter struct { |
| prefix string |
| writer io.Writer |
| } |
|
|
| |
| func (pw *PrefixWriter) Write(p []byte) (n int, err error) { |
| |
| scanner := bufio.NewScanner(strings.NewReader(string(p))) |
| var totalWritten int |
|
|
| for scanner.Scan() { |
| |
| line := fmt.Sprintf("[%s] %s", pw.prefix, scanner.Text()) |
| |
| n, err := pw.writer.Write([]byte(line + "\n")) |
| totalWritten += n |
|
|
| if err != nil { |
| return totalWritten, err |
| } |
| } |
|
|
| |
| if err := scanner.Err(); err != nil { |
| return totalWritten, err |
| } |
|
|
| return len(p), nil |
| } |
|
|
| |
| func isInProcessGroup(pid, pgid int) bool { |
| actualPgid, err := syscall.Getpgid(pid) |
| if err != nil { |
| |
| return false |
| } |
| return actualPgid == pgid |
| } |
|
|
| func (w *Worker) start(req *StartReq) (err error) { |
| shell := fmt.Sprintf("cd /app/agents && %s --property %s", workerExec, w.PropertyJsonFile) |
| slog.Info("Worker start", "requestId", req.RequestId, "shell", shell, logTag) |
| cmd := exec.Command("sh", "-c", shell) |
| cmd.SysProcAttr = &syscall.SysProcAttr{ |
| Setpgid: true, |
| } |
|
|
| var stdoutWriter, stderrWriter io.Writer |
| var logFile *os.File |
|
|
| if w.Log2Stdout { |
| |
| stdoutWriter = os.Stdout |
| stderrWriter = os.Stderr |
| } else { |
| |
| logFile, err := os.OpenFile(w.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) |
| if err != nil { |
| slog.Error("Failed to open log file", "err", err, "requestId", req.RequestId, logTag) |
| |
| } |
|
|
| |
| stdoutWriter = logFile |
| stderrWriter = logFile |
| } |
|
|
| |
| stdoutPrefixWriter := &PrefixWriter{ |
| prefix: "-", |
| writer: stdoutWriter, |
| } |
| stderrPrefixWriter := &PrefixWriter{ |
| prefix: "-", |
| writer: stderrWriter, |
| } |
|
|
| cmd.Stdout = stdoutPrefixWriter |
| cmd.Stderr = stderrPrefixWriter |
|
|
| if err = cmd.Start(); err != nil { |
| slog.Error("Worker start failed", "err", err, "requestId", req.RequestId, logTag) |
| return |
| } |
|
|
| pid := cmd.Process.Pid |
|
|
| |
| shell = fmt.Sprintf("pgrep -P %d", pid) |
| slog.Info("Worker get pid", "requestId", req.RequestId, "shell", shell, logTag) |
|
|
| var subprocessPid int |
| for i := 0; i < 10; i++ { |
| output, err := exec.Command("sh", "-c", shell).CombinedOutput() |
| if err == nil { |
| subprocessPid, err = strconv.Atoi(strings.TrimSpace(string(output))) |
| if err == nil && subprocessPid > 0 && isInProcessGroup(subprocessPid, cmd.Process.Pid) { |
| break |
| } |
| } |
| slog.Warn("Worker get pid failed, retrying...", "attempt", i+1, "pid", pid, "subpid", subprocessPid, "requestId", req.RequestId, logTag) |
| time.Sleep(1000 * time.Millisecond) |
| } |
|
|
| |
| stdoutPrefixWriter.prefix = w.ChannelName |
| stderrPrefixWriter.prefix = w.ChannelName |
| w.Pid = pid |
|
|
| |
| go func() { |
| err := cmd.Wait() |
| if err != nil { |
| slog.Error("Worker process failed", "err", err, "requestId", req.RequestId, logTag) |
| } else { |
| slog.Info("Worker process completed successfully", "requestId", req.RequestId, logTag) |
| } |
| |
| if logFile != nil { |
| logFile.Close() |
| } |
|
|
| |
| workers.Remove(w.ChannelName) |
|
|
| }() |
|
|
| return |
| } |
|
|
| func (w *Worker) stop(requestId string, channelName string) (err error) { |
| slog.Info("Worker stop start", "channelName", channelName, "requestId", requestId, "pid", w.Pid, logTag) |
|
|
| |
| |
| err = syscall.Kill(-w.Pid, syscall.SIGKILL) |
| if err != nil { |
| slog.Error("Worker kill failed", "err", err, "channelName", channelName, "worker", w, "requestId", requestId, logTag) |
| return |
| } |
|
|
| workers.Remove(channelName) |
|
|
| slog.Info("Worker stop end", "channelName", channelName, "worker", w, "requestId", requestId, logTag) |
| return |
| } |
|
|
| func (w *Worker) update(req *WorkerUpdateReq) (err error) { |
| slog.Info("Worker update start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
|
|
| var res *resty.Response |
|
|
| defer func() { |
| if err != nil { |
| slog.Error("Worker update error", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
| } |
| }() |
|
|
| workerUpdateUrl := fmt.Sprintf("%s:%d/cmd", workerHttpServerUrl, w.HttpServerPort) |
| res, err = HttpClient.R(). |
| SetHeader("Content-Type", "application/json"). |
| SetBody(req). |
| Post(workerUpdateUrl) |
| if err != nil { |
| return |
| } |
|
|
| if res.StatusCode() != http.StatusOK { |
| return fmt.Errorf("%s, status: %d", codeErrHttpStatusNotOk.msg, res.StatusCode()) |
| } |
|
|
| slog.Info("Worker update end", "channelName", req.ChannelName, "worker", w, "requestId", req.RequestId, logTag) |
| return |
| } |
|
|
| |
| func getRunningWorkerPIDs() map[int]struct{} { |
| |
| cmd := exec.Command("sh", "-c", `ps aux | grep "bin/worker --property" | grep -v grep`) |
|
|
| |
| var out bytes.Buffer |
| cmd.Stdout = &out |
| err := cmd.Run() |
| if err != nil { |
| return nil |
| } |
|
|
| |
| lines := strings.Split(out.String(), "\n") |
| runningPIDs := make(map[int]struct{}) |
| for _, line := range lines { |
| fields := strings.Fields(line) |
| if len(fields) > 1 { |
| pid, err := strconv.Atoi(fields[1]) |
| if err == nil { |
| runningPIDs[pid] = struct{}{} |
| } |
| } |
| } |
| return runningPIDs |
| } |
|
|
| |
| func killProcess(pid int) { |
| err := syscall.Kill(pid, syscall.SIGKILL) |
| if err != nil { |
| slog.Info("Failed to kill process", "pid", pid, "error", err) |
| } else { |
| slog.Info("Successfully killed process", "pid", pid) |
| } |
| } |
|
|
| func timeoutWorkers() { |
| for { |
| for _, channelName := range workers.Keys() { |
| worker := workers.Get(channelName).(*Worker) |
|
|
| |
| if worker.QuitTimeoutSeconds == WORKER_TIMEOUT_INFINITY { |
| continue |
| } |
|
|
| nowTs := time.Now().Unix() |
| if worker.UpdateTs+int64(worker.QuitTimeoutSeconds) < nowTs { |
| if err := worker.stop(uuid.New().String(), channelName.(string)); err != nil { |
| slog.Error("Timeout worker stop failed", "err", err, "channelName", channelName, logTag) |
| continue |
| } |
|
|
| slog.Info("Timeout worker stop success", "channelName", channelName, "worker", worker, "nowTs", nowTs, logTag) |
| } |
| } |
|
|
| slog.Debug("Worker timeout check", "sleep", workerCleanSleepSeconds, logTag) |
| time.Sleep(workerCleanSleepSeconds * time.Second) |
| } |
| } |
|
|
| func CleanWorkers() { |
| |
| for _, channelName := range workers.Keys() { |
| worker := workers.Get(channelName).(*Worker) |
| if err := worker.stop(uuid.New().String(), channelName.(string)); err != nil { |
| slog.Error("Worker cleanWorker failed", "err", err, "channelName", channelName, logTag) |
| continue |
| } |
|
|
| slog.Info("Worker cleanWorker success", "channelName", channelName, "worker", worker, logTag) |
| } |
|
|
| |
| runningPIDs := getRunningWorkerPIDs() |
|
|
| |
| workerMap := make(map[int]*Worker) |
| for _, channelName := range workers.Keys() { |
| worker := workers.Get(channelName).(*Worker) |
| workerMap[worker.Pid] = worker |
| } |
|
|
| |
| for pid := range runningPIDs { |
| if _, exists := workerMap[pid]; !exists { |
| slog.Info("Killing redundant process", "pid", pid) |
| killProcess(pid) |
| } |
| } |
| } |
|
|