mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
method opcommand, embedded processes to process
This commit is contained in:
parent
26b606d3cf
commit
833ce66c00
6 changed files with 81 additions and 14 deletions
11
example/toShip1-OpCommand.json
Normal file
11
example/toShip1-OpCommand.json
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
|
||||||
|
"toNode": "ship1",
|
||||||
|
"data": ["ps"],
|
||||||
|
"method":"OpCommand",
|
||||||
|
"timeout":3,
|
||||||
|
"retries":3,
|
||||||
|
"MethodTimeout": 7
|
||||||
|
}
|
||||||
|
]
|
|
@ -56,6 +56,8 @@ type process struct {
|
||||||
configuration *Configuration
|
configuration *Configuration
|
||||||
// The new messages channel copied from *Server
|
// The new messages channel copied from *Server
|
||||||
toRingbufferCh chan<- []subjectAndMessage
|
toRingbufferCh chan<- []subjectAndMessage
|
||||||
|
// The structure who holds all processes information
|
||||||
|
processes *processes
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepareNewProcess will set the the provided values and the default
|
// prepareNewProcess will set the the provided values and the default
|
||||||
|
@ -83,6 +85,7 @@ func newProcess(processes *processes, toRingbufferCh chan<- []subjectAndMessage,
|
||||||
methodsAvailable: method.GetMethodsAvailable(),
|
methodsAvailable: method.GetMethodsAvailable(),
|
||||||
toRingbufferCh: toRingbufferCh,
|
toRingbufferCh: toRingbufferCh,
|
||||||
configuration: configuration,
|
configuration: configuration,
|
||||||
|
processes: processes,
|
||||||
}
|
}
|
||||||
|
|
||||||
return proc
|
return proc
|
||||||
|
|
|
@ -47,6 +47,8 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
|
||||||
|
|
||||||
// Send the SAM struct to be picked up by the ring buffer.
|
// Send the SAM struct to be picked up by the ring buffer.
|
||||||
toRingbufferCh <- sam
|
toRingbufferCh <- sam
|
||||||
|
|
||||||
|
conn.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
29
server.go
29
server.go
|
@ -29,12 +29,15 @@ type processes struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
// The last processID created
|
// The last processID created
|
||||||
lastProcessID int
|
lastProcessID int
|
||||||
|
// metrics channel
|
||||||
|
metricsCh chan metricType
|
||||||
}
|
}
|
||||||
|
|
||||||
// newProcesses will prepare and return a *processes
|
// newProcesses will prepare and return a *processes
|
||||||
func newProcesses() *processes {
|
func newProcesses(metricsCh chan metricType) *processes {
|
||||||
p := processes{
|
p := processes{
|
||||||
active: make(map[processName]process),
|
active: make(map[processName]process),
|
||||||
|
metricsCh: metricsCh,
|
||||||
}
|
}
|
||||||
|
|
||||||
return &p
|
return &p
|
||||||
|
@ -81,14 +84,16 @@ func NewServer(c *Configuration) (*server, error) {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics := newMetrics(c.PromHostAndPort)
|
||||||
|
|
||||||
s := &server{
|
s := &server{
|
||||||
configuration: c,
|
configuration: c,
|
||||||
nodeName: c.NodeName,
|
nodeName: c.NodeName,
|
||||||
natsConn: conn,
|
natsConn: conn,
|
||||||
netListener: nl,
|
netListener: nl,
|
||||||
processes: newProcesses(),
|
processes: newProcesses(metrics.metricsCh),
|
||||||
toRingbufferCh: make(chan []subjectAndMessage),
|
toRingbufferCh: make(chan []subjectAndMessage),
|
||||||
metrics: newMetrics(c.PromHostAndPort),
|
metrics: metrics,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the default data folder for where subscribers should
|
// Create the default data folder for where subscribers should
|
||||||
|
@ -129,7 +134,7 @@ func (s *server) Start() {
|
||||||
s.ProcessesStart()
|
s.ProcessesStart()
|
||||||
|
|
||||||
time.Sleep(time.Second * 1)
|
time.Sleep(time.Second * 1)
|
||||||
s.printProcessesMap()
|
s.processes.printProcessesMap()
|
||||||
|
|
||||||
// Start the processing of new messages from an input channel.
|
// Start the processing of new messages from an input channel.
|
||||||
s.routeMessagesToProcess("./incommmingBuffer.db", s.toRingbufferCh)
|
s.routeMessagesToProcess("./incommmingBuffer.db", s.toRingbufferCh)
|
||||||
|
@ -138,21 +143,21 @@ func (s *server) Start() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) printProcessesMap() {
|
func (p *processes) printProcessesMap() {
|
||||||
fmt.Println("--------------------------------------------------------------------------------------------")
|
fmt.Println("--------------------------------------------------------------------------------------------")
|
||||||
fmt.Printf("*** Output of processes map :\n")
|
fmt.Printf("*** Output of processes map :\n")
|
||||||
s.processes.mu.Lock()
|
p.mu.Lock()
|
||||||
for _, v := range s.processes.active {
|
for _, v := range p.active {
|
||||||
fmt.Printf("* proc - : %v, id: %v, name: %v, allowed from: %v\n", v.processKind, v.processID, v.subject.name(), v.allowedReceivers)
|
fmt.Printf("* proc - : %v, id: %v, name: %v, allowed from: %v\n", v.processKind, v.processID, v.subject.name(), v.allowedReceivers)
|
||||||
}
|
}
|
||||||
s.processes.mu.Unlock()
|
p.mu.Unlock()
|
||||||
|
|
||||||
s.metrics.metricsCh <- metricType{
|
p.metricsCh <- metricType{
|
||||||
metric: prometheus.NewGauge(prometheus.GaugeOpts{
|
metric: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
Name: "total_running_processes",
|
Name: "total_running_processes",
|
||||||
Help: "The current number of total running processes",
|
Help: "The current number of total running processes",
|
||||||
}),
|
}),
|
||||||
value: float64(len(s.processes.active)),
|
value: float64(len(p.active)),
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("--------------------------------------------------------------------------------------------")
|
fmt.Println("--------------------------------------------------------------------------------------------")
|
||||||
|
@ -280,7 +285,7 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject
|
||||||
|
|
||||||
// REMOVED:
|
// REMOVED:
|
||||||
//time.Sleep(time.Millisecond * 500)
|
//time.Sleep(time.Millisecond * 500)
|
||||||
s.printProcessesMap()
|
//s.printProcessesMap()
|
||||||
// Now when the process is spawned we jump back to the redo: label,
|
// Now when the process is spawned we jump back to the redo: label,
|
||||||
// and send the message to that new process.
|
// and send the message to that new process.
|
||||||
goto redo
|
goto redo
|
||||||
|
|
|
@ -12,6 +12,14 @@ func (s *server) ProcessesStart() {
|
||||||
|
|
||||||
// --- Subscriber services that can be started via flags
|
// --- Subscriber services that can be started via flags
|
||||||
|
|
||||||
|
// Start a subscriber for OPCommand messages
|
||||||
|
{
|
||||||
|
fmt.Printf("Starting OpCommand subscriber: %#v\n", s.nodeName)
|
||||||
|
sub := newSubject(OpCommand, CommandACK, s.nodeName)
|
||||||
|
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
|
||||||
|
go proc.spawnWorker(s)
|
||||||
|
}
|
||||||
|
|
||||||
// Start a subscriber for CLICommand messages
|
// Start a subscriber for CLICommand messages
|
||||||
if s.configuration.StartSubCLICommand.OK {
|
if s.configuration.StartSubCLICommand.OK {
|
||||||
{
|
{
|
||||||
|
|
|
@ -50,6 +50,8 @@ type Method string
|
||||||
// The constants that will be used throughout the system for
|
// The constants that will be used throughout the system for
|
||||||
// when specifying what kind of Method to send or work with.
|
// when specifying what kind of Method to send or work with.
|
||||||
const (
|
const (
|
||||||
|
// Command for client operation of the system
|
||||||
|
OpCommand Method = "OpCommand"
|
||||||
// Execute a CLI command in for example bash or cmd.
|
// Execute a CLI command in for example bash or cmd.
|
||||||
// This is a command type, so the output of the command executed
|
// This is a command type, so the output of the command executed
|
||||||
// will directly showed in the ACK message received.
|
// will directly showed in the ACK message received.
|
||||||
|
@ -114,6 +116,9 @@ const (
|
||||||
func (m Method) GetMethodsAvailable() MethodsAvailable {
|
func (m Method) GetMethodsAvailable() MethodsAvailable {
|
||||||
ma := MethodsAvailable{
|
ma := MethodsAvailable{
|
||||||
methodhandlers: map[Method]methodHandler{
|
methodhandlers: map[Method]methodHandler{
|
||||||
|
OpCommand: methodOpCommand{
|
||||||
|
commandOrEvent: CommandACK,
|
||||||
|
},
|
||||||
CLICommand: methodCLICommand{
|
CLICommand: methodCLICommand{
|
||||||
commandOrEvent: CommandACK,
|
commandOrEvent: CommandACK,
|
||||||
},
|
},
|
||||||
|
@ -187,6 +192,41 @@ type methodHandler interface {
|
||||||
getKind() CommandOrEvent
|
getKind() CommandOrEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// -----
|
||||||
|
type methodOpCommand struct {
|
||||||
|
commandOrEvent CommandOrEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodOpCommand) getKind() CommandOrEvent {
|
||||||
|
return m.commandOrEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
// handler to run a CLI command with timeout context. The handler will
|
||||||
|
// return the output of the command run back to the calling publisher
|
||||||
|
// in the ack message.
|
||||||
|
func (m methodOpCommand) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
out := []byte{}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case message.Data[0] == "ps":
|
||||||
|
proc.processes.mu.Lock()
|
||||||
|
for _, v := range proc.processes.active {
|
||||||
|
s := fmt.Sprintf("* proc - : %v, id: %v, name: %v, allowed from: %s\n", v.processKind, v.processID, v.subject.name(), v.allowedReceivers)
|
||||||
|
sb := []byte(s)
|
||||||
|
out = append(out, sb...)
|
||||||
|
}
|
||||||
|
proc.processes.mu.Unlock()
|
||||||
|
|
||||||
|
default:
|
||||||
|
out = []byte("error: no such OpCommand specified: " + message.Data[0])
|
||||||
|
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v, error: %s\n---\n", node, message.ID, out))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s", node, message.ID, out))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
// -----
|
// -----
|
||||||
|
|
||||||
type methodCLICommand struct {
|
type methodCLICommand struct {
|
||||||
|
@ -232,8 +272,6 @@ func (m methodCLICommand) handler(proc process, message Message, node string) ([
|
||||||
return ackMsg, nil
|
return ackMsg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// -----
|
|
||||||
|
|
||||||
type methodTextLogging struct {
|
type methodTextLogging struct {
|
||||||
commandOrEvent CommandOrEvent
|
commandOrEvent CommandOrEvent
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue