1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

new req types for proccesslist and processstart

This commit is contained in:
postmannen 2021-09-20 06:40:34 +02:00
parent c9eb267f4e
commit 389e263c59
3 changed files with 118 additions and 2 deletions

View file

@ -55,6 +55,10 @@ type Configuration struct {
ErrorMessageTimeout int
// Retries for error messages.
ErrorMessageRetries int
// NOTE:
// Op commands will not be specified as a flag since they can't be turned off.
// Make the current node send hello messages to central at given interval in seconds
StartPubREQHello int
// Start the central error logger.

View file

@ -55,7 +55,7 @@ func (p *processes) Start(proc process) {
// --- Subscriber services that can be started via flags
// Allways start an REQOpCommand subscriber
// Allways start the listeners for Op commands
{
log.Printf("Starting REQOpCommand subscriber: %#v\n", proc.node)
sub := newSubject(REQOpCommand, string(proc.node))
@ -63,6 +63,20 @@ func (p *processes) Start(proc process) {
go proc.spawnWorker(proc.processes, proc.natsConn)
}
{
log.Printf("Starting REQOpProcessList subscriber: %#v\n", proc.node)
sub := newSubject(REQOpProcessList, string(proc.node))
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(proc.processes, proc.natsConn)
}
{
log.Printf("Starting REQOpProcessStart subscriber: %#v\n", proc.node)
sub := newSubject(REQOpProcessStart, string(proc.node))
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(proc.processes, proc.natsConn)
}
// Start a subscriber for textLogging messages
if proc.configuration.StartSubREQToFileAppend {
proc.startup.subREQToFileAppend(proc)

View file

@ -64,6 +64,10 @@ const (
// command to execute shall be given in the data field of the
// message as string value. For example "ps".
REQOpCommand Method = "REQOpCommand"
// Get a list of all the running processes.
REQOpProcessList Method = "REQOpProcessList"
// Start up a process.
REQOpProcessStart Method = "REQOpProcessStart"
// Execute a CLI command in for example bash or cmd.
// This is an event type, where a message will be sent to a
// node with the command to execute and an ACK will be replied
@ -134,6 +138,12 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
REQOpCommand: methodREQOpCommand{
commandOrEvent: CommandACK,
},
REQOpProcessList: methodREQOpProcessList{
commandOrEvent: CommandACK,
},
REQOpProcessStart: methodREQOpProcessStart{
commandOrEvent: CommandACK,
},
REQCliCommand: methodREQCliCommand{
commandOrEvent: CommandACK,
},
@ -488,7 +498,95 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
return ackMsg, nil
}
//----
// ---- New operations
// --- OpProcessList
type methodREQOpProcessList struct {
commandOrEvent CommandOrEvent
}
func (m methodREQOpProcessList) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle Op Process List
func (m methodREQOpProcessList) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
out := []byte{}
proc.processes.mu.Lock()
// Loop the the processes map, and find all that is active to
// be returned in the reply message.
for _, idMap := range proc.processes.active {
for _, v := range idMap {
s := fmt.Sprintf("%v, process: %v, id: %v, name: %v, allowed from: %s\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v.processKind, v.processID, v.subject.name(), v.allowedReceivers)
sb := []byte(s)
out = append(out, sb...)
}
}
proc.processes.mu.Unlock()
newReplyMessage(proc, message, out)
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// --- OpProcessStart
type methodREQOpProcessStart struct {
commandOrEvent CommandOrEvent
}
func (m methodREQOpProcessStart) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handle Op Process Start
func (m methodREQOpProcessStart) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
out := []byte{}
// We need to create a tempory method type to look up the kind for the
// real method for the message.
var mt Method
m := message.MethodArgs[0]
method := Method(m)
tmpH := mt.getHandler(Method(method))
if tmpH == nil {
er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
}
// Create the process and start it.
sub := newSubject(method, proc.configuration.NodeName)
procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
go procNew.spawnWorker(proc.processes, proc.natsConn)
er := fmt.Errorf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
// TODO: How should this look like ?
out = []byte(er.Error())
newReplyMessage(proc, message, out)
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
type methodREQToFileAppend struct {
commandOrEvent CommandOrEvent