From 1f86b01978e4bbf3fa3a97c74f108fadefe1e4e2 Mon Sep 17 00:00:00 2001 From: zhengya Date: Tue, 20 May 2025 14:20:35 +0800 Subject: [PATCH] feat: refactors remote_excute --- .../grpc/synchronizer/agent_service.go | 2 +- .../controller/http/router/agent/agent_cmd.go | 2 +- .../http/service/agent/agent_cmd.go | 10 +- .../grpc/agentsynchronize/remote_execute.go | 451 +++++++++++------- .../grpc/agentsynchronize/sync_push.go | 7 +- 5 files changed, 283 insertions(+), 189 deletions(-) diff --git a/server/controller/grpc/synchronizer/agent_service.go b/server/controller/grpc/synchronizer/agent_service.go index 4c45181372e..f9e9094176a 100644 --- a/server/controller/grpc/synchronizer/agent_service.go +++ b/server/controller/grpc/synchronizer/agent_service.go @@ -132,5 +132,5 @@ func (s *AgentService) Plugin(r *api.PluginRequest, in api.Synchronizer_PluginSe } func (s *AgentService) RemoteExecute(in api.Synchronizer_RemoteExecuteServer) error { - return s.vTapEvent.RemoteExecute(in) + return s.vTapEvent.RemoteExecute.RemoteExecute(in) } diff --git a/server/controller/http/router/agent/agent_cmd.go b/server/controller/http/router/agent/agent_cmd.go index 918b2f1120f..54502752954 100644 --- a/server/controller/http/router/agent/agent_cmd.go +++ b/server/controller/http/router/agent/agent_cmd.go @@ -173,7 +173,7 @@ func forwardToServerConnectedByAgent() gin.HandlerFunc { } reverseProxy := fmt.Sprintf("http://%s:%d", newHost, common.GConfig.HTTPNodePort) - log.Infof("agnet(key: %s), node ip(%s), reverse proxy(%s), agent current controller ip(%s), controller ip(%s)", + log.Infof("agent(key: %s), node ip(%s), reverse proxy(%s), agent current controller ip(%s), controller ip(%s)", key, common.NodeIP, reverseProxy, agent.CurControllerIP, agent.ControllerIP, db.LogPrefixORGID) proxyURL, err := url.Parse(reverseProxy) diff --git a/server/controller/http/service/agent/agent_cmd.go b/server/controller/http/service/agent/agent_cmd.go index 4f4fc48f587..555ef3ee678 100644 --- a/server/controller/http/service/agent/agent_cmd.go +++ b/server/controller/http/service/agent/agent_cmd.go @@ -51,11 +51,11 @@ var ( type AgentCMDManager map[string]*CMDManager -func AgentCommandLock() { +func LockAgentCMD() { agentCMDMutex.Lock() } -func AgentCommandUnlock() { +func UnlockAgentCMD() { agentCMDMutex.Unlock() } @@ -333,19 +333,19 @@ func GetCMDAndNamespace(timeout, orgID, agentID int) (*RemoteExecResp, error) { return nil, fmt.Errorf("timeout(%vs) to get remote commands and linux namespace", timeout) case _, ok := <-cmdResp.RemoteCMDDoneCH: if !ok { - return nil, fmt.Errorf("%sagent(key: %s, name: %s) command manager is lost", key, agent.Name) + return nil, fmt.Errorf("failed to get remote commands, agent(key: %s, name: %s) command manager is lost", key, agent.Name) } resp.RemoteCommand = GetCommands(key, requestID) namespaceReq := &grpcapi.RemoteExecRequest{RequestId: &requestID, ExecType: grpcapi.ExecutionType_LIST_NAMESPACE.Enum()} manager.ExecCH <- namespaceReq case _, ok := <-cmdResp.LinuxNamespaceDoneCH: if !ok { - return nil, fmt.Errorf("%sagent(key: %s, name: %s) command manager is lost", key, agent.Name) + return nil, fmt.Errorf("failed to get linux namespaces, agent(key: %s, name: %s) command manager is lost", key, agent.Name) } resp.LinuxNamespace = GetNamespaces(key, requestID) case _, ok := <-cmdResp.ExecDoneCH: // error occurred if !ok { - return nil, fmt.Errorf("%sagent(key: %s, name: %s) command manager is lost", key, agent.Name) + return nil, fmt.Errorf("failed to execute command, agent(key: %s, name: %s) command manager is lost", key, agent.Name) } if len(GetCommands(key, requestID)) != 0 { return &RemoteExecResp{RemoteCommand: GetCommands(key, requestID)}, nil diff --git a/server/controller/trisolaris/services/grpc/agentsynchronize/remote_execute.go b/server/controller/trisolaris/services/grpc/agentsynchronize/remote_execute.go index 06f99aa9268..666084aee90 100644 --- a/server/controller/trisolaris/services/grpc/agentsynchronize/remote_execute.go +++ b/server/controller/trisolaris/services/grpc/agentsynchronize/remote_execute.go @@ -32,223 +32,314 @@ import ( ) const ( - CMD_INACTIVITY_TIMEOUT = 1 * time.Minute + cmdInactivityTimeout = 1 * time.Minute ) -func (e *AgentEvent) RemoteExecute(stream api.Synchronizer_RemoteExecuteServer) error { - key := "" - isFisrtRecv := false - var wg sync.WaitGroup - wg.Add(1) +func newRemoteExecContext(streamCtx context.Context) *remoteExecContext { + streamCtx, cancel := context.WithCancel(streamCtx) + return &remoteExecContext{ + streamCtx: streamCtx, + streamCancel: cancel, + errChan: make(chan error, 1), + initDoneChan: make(chan struct{}), + } +} + +type remoteExecContext struct { + streamCtx context.Context // stream context + streamCancel context.CancelFunc + + heartbeatCount uint32 // used to sample the agent's heartbeats + + key string // agent key, format: ip-mac + isFirstRecv bool // whether the first message has been received + manager *service.CMDManager + wg sync.WaitGroup + errChan chan error // channel to handle errors + initDoneChan chan struct{} // channel to signal initialization done +} + +func (e *RemoteExecute) RemoteExecute(stream api.Synchronizer_RemoteExecuteServer) error { + ctx := newRemoteExecContext(stream.Context()) + defer ctx.streamCancel() + + ctx.wg.Add(1) defer func() { - wg.Wait() - service.RemoveAllFromCMDManager(key) + ctx.wg.Wait() + service.RemoveAllFromCMDManager(ctx.key) }() - var manager *service.CMDManager - initDone := make(chan struct{}) + go e.receiveAndHandle(ctx, stream) - ctx, cancel := context.WithCancel(stream.Context()) - defer cancel() + <-ctx.initDoneChan + log.Infof("[remote-exec] agent(key: %s) init done", ctx.key) - errCH := make(chan error, 1) + return e.waitAndSend(ctx, stream) +} - go func() { - defer func() { - log.Infof("agent(key: %s) remote exec stream receive goroutine done", key) - wg.Done() - if r := recover(); r != nil { - buf := make([]byte, 2048) - n := runtime.Stack(buf, false) - errMsg := fmt.Sprintf("recovered in RemoteExecute: %s", buf[:n]) - log.Errorf(errMsg) - errCH <- fmt.Errorf(errMsg) - } - }() +func (e *RemoteExecute) receiveAndHandle( + ctx *remoteExecContext, + stream api.Synchronizer_RemoteExecuteServer, +) { + defer func() { + log.Infof("[remote-exec] agent(key: %s) remote exec stream receive goroutine done", ctx.key) + ctx.wg.Done() + if r := recover(); r != nil { + e.handlePanic(ctx) + } + }() - inactivityTimer := time.NewTimer(CMD_INACTIVITY_TIMEOUT) - defer inactivityTimer.Stop() + inactivityTimer := time.NewTimer(cmdInactivityTimeout) + defer inactivityTimer.Stop() - for { - select { - case <-ctx.Done(): - log.Infof("context done, agent(key: %s), context err: %v", key, ctx.Err()) - return - case <-inactivityTimer.C: - errMsg := fmt.Errorf("no message received for %vs, closing connection for agent(key: %s)", - CMD_INACTIVITY_TIMEOUT.Seconds(), key) - log.Error(errMsg) - errCH <- errMsg + for { + select { + case <-ctx.streamCtx.Done(): + log.Infof("[remote-exec] context done, agent(key: %s), context err: %v", ctx.key, ctx.streamCtx.Err()) + return + case <-inactivityTimer.C: + e.handleInactivityTimeout(ctx) + return + default: + resp, err := stream.Recv() + // Handle any errors that occur during stream reception + // if server restart, an io.EOF error may be received + if err == io.EOF { + e.handleStreamErr(ctx, err) return - default: - resp, err := stream.Recv() - // Handle any errors that occur during stream reception - // if server restart, an io.EOF error may be received - if err == io.EOF { - log.Errorf("agent(key: %s) command stream error: %v", key, err) - errCH <- err - return - } - // Attempt to stop the inactivity timer - if !inactivityTimer.Stop() { - // If the timer has already expired, drain the channel - <-inactivityTimer.C - } - // Reset the inactivity timer to its original duration - inactivityTimer.Reset(CMD_INACTIVITY_TIMEOUT) - - if resp == nil { - continue - } - log.Debugf("agent command response: %s", resp.String()) - if resp.AgentId == nil { - log.Warningf("recevie agent info from remote command is nil") - continue - } - key = resp.AgentId.GetIp() + "-" + resp.AgentId.GetMac() - if !isFisrtRecv { - isFisrtRecv = true - log.Infof("agent(key: %s) call RemoteExecute", key) - } - if manager == nil { - log.Infof("agent(key: %s) remote exec map not found, add to cmd manager", key) - manager = service.AddToCMDManagerIfNotExist(key, uint64(1)) - initDone <- struct{}{} - } - - service.AgentCommandLock() - manager = service.GetAgentCMDManagerWithoutLock(key) - if manager == nil { - log.Errorf("agent(key: %s) remote exec map not found", key) - service.AgentCommandUnlock() - continue - } - - // heartbeat - if resp.CommandResult == nil && resp.LinuxNamespaces == nil && - resp.Commands == nil && resp.Errmsg == nil { - log.Infof("agent heart beat command response: %s", resp.String()) - manager.ExecCH <- &api.RemoteExecRequest{RequestId: proto.Uint64(0)} - service.AgentCommandUnlock() - continue - } - - if err != nil { - err := fmt.Errorf("agent(key: %s) command stream error: %v", key, err) - log.Error(err) - service.AgentCommandUnlock() - continue - } - - handleResponse(resp) - service.AgentCommandUnlock() } + + e.resetTimer(inactivityTimer) + e.initCtxAndHandleResp(ctx, resp, err) } - }() + } +} - <-initDone - log.Infof("agent(key: %s) init done", key) - if manager == nil { - err := fmt.Errorf("get agent(key: %s) remote exec manager nil", key) - log.Error(err) - return err +func (e *RemoteExecute) initCtxAndHandleResp(ctx *remoteExecContext, resp *api.RemoteExecResponse, err error) { + if resp == nil { + return + } + + log.Debugf("[remote-exec] agent command response: %s", resp.String()) + + if resp.AgentId == nil { + log.Warningf("[remote-exec] get null agent info from response: %s", resp.String()) + return + } + + ctx.key = resp.AgentId.GetIp() + "-" + resp.AgentId.GetMac() + + if !ctx.isFirstRecv { + ctx.isFirstRecv = true + log.Infof("[remote-exec] agent(key: %s) called me for the first time", ctx.key) + } + + if ctx.manager == nil { + log.Infof("[remote-exec] agent(key: %s) cmd manager not found, new one manager", ctx.key) + ctx.manager = service.AddToCMDManagerIfNotExist(ctx.key, uint64(1)) + ctx.initDoneChan <- struct{}{} + } + + service.LockAgentCMD() + defer service.UnlockAgentCMD() + + ctx.manager = service.GetAgentCMDManagerWithoutLock(ctx.key) + if ctx.manager == nil { + log.Errorf("[remote-exec] agent(key: %s) cmd manager not found", ctx.key) + return + } + + if e.isHeartbeat(resp) { + e.logHeartbeat(ctx, resp) + ctx.manager.ExecCH <- &api.RemoteExecRequest{RequestId: proto.Uint64(0)} + return + } + + if err != nil { + e.handleStreamErr(ctx, err) + return + } + + e.handleResponse(ctx, resp) +} + +func (e *RemoteExecute) handleResponse(ctx *remoteExecContext, resp *api.RemoteExecResponse) { + if resp.RequestId == nil { + log.Errorf("[remote-exec] agent(key: %s) command resp request id not found", ctx.key, resp.RequestId) + return + } + + cmdResp := service.GetAgentCMDRespWithoutLock(ctx.key, *resp.RequestId) + if cmdResp == nil { + log.Errorf("[remote-exec] agent(key: %s, request id: %v) remote exec map not found", ctx.key, resp.RequestId) + return + } + + e.logResponse(resp, ctx.key) + + if resp.Errmsg != nil { + e.handleRespErrmsg(ctx.key, resp, cmdResp) + return + } + + if len(resp.LinuxNamespaces) > 0 { + e.handleRespLinuxNamespaces(ctx.key, resp, cmdResp) + return + } + + if len(resp.Commands) > 0 { + e.handleRespCommands(ctx.key, resp, cmdResp) + return } + + e.handleRespCommandResult(ctx.key, resp, cmdResp) +} + +func (e *RemoteExecute) handleRespErrmsg(key string, resp *api.RemoteExecResponse, cmdResp *service.CMDResp) { + log.Errorf("[remote-exec] agent(key: %s) run command error: %s", key, *resp.Errmsg) + service.AppendErrorMessage(key, *resp.RequestId, resp.Errmsg) + + result := resp.CommandResult + if result == nil || result.Content == nil { + cmdResp.ExecDoneCH <- struct{}{} + return + } + service.AppendContent(key, *resp.RequestId, result.Content) + + if result.Md5 != nil { + cmdResp.ExecDoneCH <- struct{}{} + return + } +} + +func (e *RemoteExecute) handleRespLinuxNamespaces(key string, resp *api.RemoteExecResponse, cmdResp *service.CMDResp) { + if len(service.GetNamespacesWithoutLock(key, *resp.RequestId)) > 0 { + service.InitNamespaces(key, *resp.RequestId, resp.LinuxNamespaces) + } else { + service.AppendNamespaces(key, *resp.RequestId, resp.LinuxNamespaces) + } + cmdResp.LinuxNamespaceDoneCH <- struct{}{} +} + +func (e *RemoteExecute) handleRespCommands(key string, resp *api.RemoteExecResponse, cmdResp *service.CMDResp) { + if len(service.GetCommandsWithoutLock(key, *resp.RequestId)) > 0 { + service.InitCommands(key, *resp.RequestId, resp.Commands) + } else { + service.AppendCommands(key, *resp.RequestId, resp.Commands) + } + cmdResp.RemoteCMDDoneCH <- struct{}{} +} + +func (e *RemoteExecute) handleRespCommandResult(key string, resp *api.RemoteExecResponse, cmdResp *service.CMDResp) { + result := resp.CommandResult + if resp.CommandResult == nil { + return + } + + if result.Content != nil { + service.AppendContent(key, *resp.RequestId, result.Content) + } + if result.Md5 != nil { + cmdResp.ExecDoneCH <- struct{}{} + return + } +} + +func (e *RemoteExecute) waitAndSend( + ctx *remoteExecContext, + stream api.Synchronizer_RemoteExecuteServer, +) error { for { - if manager == nil { - err := fmt.Errorf("agent(key: %s) remote exec map not found", key) + if ctx.manager == nil { + err := fmt.Errorf("[remote-exec] agent(key: %s) cmd manager not found", ctx.key) log.Error(err) return err } + select { - case <-ctx.Done(): - log.Infof("context done, agent(key: %s), context err: %v", key, ctx.Err()) - return ctx.Err() - case err := <-errCH: + case <-ctx.streamCtx.Done(): + log.Infof("[remote-exec] context done, agent(key: %s), context err: %v", ctx.key, ctx.streamCtx.Err()) + return ctx.streamCtx.Err() + case err := <-ctx.errChan: log.Error(err) return err - case req, ok := <-manager.ExecCH: + case req, ok := <-ctx.manager.ExecCH: if !ok { - err := fmt.Errorf("agent(key: %s) exec channel is closed", key) + err := fmt.Errorf("[remote-exec] agent(key: %s) cmd manager exec channel is closed", ctx.key) log.Error(err) return err } - b, _ := json.Marshal(req) - log.Infof("agent(key: %s) request: %s", key, string(b)) - if err := stream.Send(req); err != nil { - log.Errorf("send cmd to agent error: %s, req: %#v", err.Error(), req) + if err := e.sendRequest(ctx, stream, req); err != nil { return err } } } } -func handleResponse(resp *api.RemoteExecResponse) { - key := resp.AgentId.GetIp() + "-" + resp.AgentId.GetMac() - if resp.RequestId == nil { - log.Errorf("agent(key: %s) command resp request id not found", key, resp.RequestId) - return +func (e *RemoteExecute) handlePanic(ctx *remoteExecContext) { + buf := make([]byte, 2048) + n := runtime.Stack(buf, false) + errMsg := fmt.Sprintf("[remote-exec] recovered in RemoteExecute: %s", buf[:n]) + log.Errorf(errMsg) + ctx.errChan <- fmt.Errorf(errMsg) +} + +func (e *RemoteExecute) handleInactivityTimeout(ctx *remoteExecContext) { + errMsg := fmt.Errorf("[remote-exec] no message received for %vs, closing connection for agent(key: %s)", + cmdInactivityTimeout.Seconds(), ctx.key) + log.Error(errMsg) + ctx.errChan <- errMsg +} + +func (e *RemoteExecute) handleStreamErr(ctx *remoteExecContext, err error) { + if err == io.EOF { + log.Errorf("[remote-exec] agent(key: %s) command stream error: %v", ctx.key, err) + ctx.errChan <- err } - cmdResp := service.GetAgentCMDRespWithoutLock(key, *resp.RequestId) - if cmdResp == nil { - log.Errorf("agent(key: %s, request id: %v) remote exec map not found", key, resp.RequestId) - return +} + +// resetTimer attempts to stop the inactivity timer and reset it to its original duration. +// If the timer has already expired, it drains the channel to prevent blocking. +func (e *RemoteExecute) resetTimer(timer *time.Timer) { + if !timer.Stop() { + <-timer.C + } + timer.Reset(cmdInactivityTimeout) +} + +func (e *RemoteExecute) isHeartbeat(resp *api.RemoteExecResponse) bool { + return resp.CommandResult == nil && resp.LinuxNamespaces == nil && + resp.Commands == nil && resp.Errmsg == nil +} +func (e *RemoteExecute) needLogHeartbeat(ctx *remoteExecContext) bool { + return ctx.heartbeatCount%20 == 0 +} +func (e *RemoteExecute) logHeartbeat(ctx *remoteExecContext, resp *api.RemoteExecResponse) { + ctx.heartbeatCount++ + if e.needLogHeartbeat(ctx) { + log.Infof("[remote-exec] agent(key: %s) heartbeat count: %d", ctx.key, ctx.heartbeatCount) + log.Infof("[remote-exec] agent heartbeat command response: %s", resp.String()) + } +} + +func (e *RemoteExecute) sendRequest(ctx *remoteExecContext, stream api.Synchronizer_RemoteExecuteServer, req *api.RemoteExecRequest) error { + e.logRequest(ctx, req) + if err := stream.Send(req); err != nil { + log.Errorf("[remote-exec] failed to send cmd to agent: %s, req: %#v", err.Error(), req) + return err } + return nil +} +func (e *RemoteExecute) logResponse(resp *api.RemoteExecResponse, key string) { b, _ := json.Marshal(resp) - log.Infof("agent(key: %s) resp: %s", key, string(b)) - - switch { - case resp.Errmsg != nil: - log.Errorf("agent(key: %s) run command error: %s", - key, *resp.Errmsg) - service.AppendErrorMessage(key, *resp.RequestId, resp.Errmsg) - - result := resp.CommandResult - // get commands and linux namespace error - if result == nil { - cmdResp.ExecDoneCH <- struct{}{} - return - } - if result.Content == nil { - cmdResp.ExecDoneCH <- struct{}{} - return - } + log.Infof("[remote-exec] agent(key: %s) response: %s", key, string(b)) +} - // run command error and handle content - if result.Content != nil { - service.AppendContent(key, *resp.RequestId, result.Content) - } - if result.Md5 != nil { - cmdResp.ExecDoneCH <- struct{}{} - return - } +func (e *RemoteExecute) logRequest(ctx *remoteExecContext, req *api.RemoteExecRequest) { + if req.RequestId != nil && *req.RequestId == 0 && !e.needLogHeartbeat(ctx) { // agent heartbeat request id is 0 return - case len(resp.LinuxNamespaces) > 0: - if len(service.GetNamespacesWithoutLock(key, *resp.RequestId)) > 0 { - service.InitNamespaces(key, *resp.RequestId, resp.LinuxNamespaces) - } else { - service.AppendNamespaces(key, *resp.RequestId, resp.LinuxNamespaces) - } - cmdResp.LinuxNamespaceDoneCH <- struct{}{} - case len(resp.Commands) > 0: - if len(service.GetCommandsWithoutLock(key, *resp.RequestId)) > 0 { - service.InitCommands(key, *resp.RequestId, resp.Commands) - } else { - service.AppendCommands(key, *resp.RequestId, resp.Commands) - } - cmdResp.RemoteCMDDoneCH <- struct{}{} - default: - result := resp.CommandResult - if resp.CommandResult == nil { - return - } - - if result.Content != nil { - service.AppendContent(key, *resp.RequestId, result.Content) - } - if result.Md5 != nil { - cmdResp.ExecDoneCH <- struct{}{} - return - } } + b, _ := json.Marshal(req) + log.Infof("[remote-exec] agent(key: %s) request: %s", ctx.key, string(b)) } diff --git a/server/controller/trisolaris/services/grpc/agentsynchronize/sync_push.go b/server/controller/trisolaris/services/grpc/agentsynchronize/sync_push.go index 64dd16384b1..02dbb3d5fa6 100644 --- a/server/controller/trisolaris/services/grpc/agentsynchronize/sync_push.go +++ b/server/controller/trisolaris/services/grpc/agentsynchronize/sync_push.go @@ -52,10 +52,13 @@ var SOCKET_TYPE_TO_MESSAGE = map[string]api.SocketType{ "FILE": FILE_SOCKET, } -type AgentEvent struct{} +type RemoteExecute struct{} +type AgentEvent struct { + RemoteExecute *RemoteExecute +} func NewAgentEvent() *AgentEvent { - return &AgentEvent{} + return &AgentEvent{RemoteExecute: &RemoteExecute{}} } func (e *AgentEvent) generateUserConfig(c *vtap.VTapCache, gAgentInfo *vtap.VTapInfo, isOwnerCluster bool, orgID int) *koanf.Koanf {