1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

added type naming standard for subject and moved subscriber into the same package

This commit is contained in:
postmannen 2021-01-29 14:22:36 +01:00
parent b543ec589a
commit db3d907749
3 changed files with 127 additions and 5 deletions

View file

@ -1,2 +1,37 @@
# steward
Async management of Edge units.
## Concepts/Ideas
### Terminology
- Node: An installation of an operating system with an ip address
- Process: One message handler running in it's own thread with 1 subject for sending and 1 for reply.
- Message:
- Command: Something to be executed on the message received. An example can be a shell command.
- Event: Something that have happened. An example can be transfer of syslog data from a host.
### Naming
#### Subject
Subject naming are case sensitive, and can not contain the space are the tab character.
`<nodename>.<command/event>.<method>`
Nodename: Are the hostname of the device. This do not have to be resolvable via DNS, it is just a unique name for the host to receive the message.
Command/Event: Are type of message sent. `command` or `event`. Description of the differences are mentioned earlier.
Method: Are the functionality the message provide. Example could be `shellcommand` or `syslogforwarding`
##### Complete subject example
For syslog of type event to a host named "ship1"
`ship1.event.syslogforwarding`
and for a shell command of type command to a host named "ship2"
`ship2.command.shellcommand`

View file

@ -4,6 +4,7 @@ package main
import (
"bytes"
"encoding/gob"
"flag"
"fmt"
"log"
"os"
@ -57,6 +58,7 @@ type server struct {
processes map[node]process
// The last processID created
lastProcessID int
thisNodeName string
}
// newServer will prepare and return a server type
@ -66,7 +68,7 @@ func newServer(brokerAddress string) (*server, error) {
}, nil
}
func (s *server) Run() {
func (s *server) RunPublisher() {
proc := s.prepareNewProcess("btship1")
// fmt.Printf("*** %#v\n", proc)
go s.spawnProcess(proc)
@ -167,10 +169,10 @@ func messageDeliver(proc process, message Message, natsConn *nats.Conn) {
}
msg := &nats.Msg{
Subject: string(proc.node),
Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "shellcommand"),
// Structure of the reply message are:
// reply-<node name>-pid<pid nr>
Reply: "reply-" + string(proc.node) + "-pid" + fmt.Sprint(proc.processID),
// reply.<nodename>.<message type>.<method>
Reply: "reply." + string(proc.node) + "command.shellcommand",
Data: dataPayload,
}
@ -220,11 +222,19 @@ func gobEncodePayload(m Message) ([]byte, error) {
}
func main() {
node := flag.String("node", "0", "some unique string to identify this Edge unit")
modePublisher := flag.Bool("modePublisher", false, "set to true if it should be able to publish")
modeSubscriber := flag.Bool("modeSubscriber", false, "set to true if it should be able to subscribe")
flag.Parse()
s, err := newServer("localhost")
if err != nil {
log.Printf("error: failed to connect to broker: %v\n", err)
os.Exit(1)
}
s.thisNodeName = *node
// Create a connection to nats server
s.natsConn, err = nats.Connect("localhost", nil)
if err != nil {
@ -232,5 +242,13 @@ func main() {
}
defer s.natsConn.Close()
s.Run()
if *modePublisher {
go s.RunPublisher()
}
if *modeSubscriber {
go s.RunSubscriber()
}
select {}
}

69
central/subscriber.go Normal file
View file

@ -0,0 +1,69 @@
package main
import (
"bytes"
"encoding/gob"
"fmt"
"os"
"os/exec"
"github.com/nats-io/nats.go"
)
func (s *server) RunSubscriber() {
// Create a channel to put the data received in the subscriber callback
// function
reqMsgCh := make(chan Message)
// Subscribe will start up a Go routine under the hood calling the
// callback function specified when a new message is received.
subject := fmt.Sprintf("%s.%s.%s", s.thisNodeName, "command", "shellcommand")
_, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.thisNodeName))
if err != nil {
fmt.Printf("error: Subscribe failed: %v\n", err)
}
// Do some further processing of the actual data we received in the
// subscriber callback function.
for {
msg := <-reqMsgCh
fmt.Printf("%v\n", msg)
switch msg.MessageType {
case eventReturnAck:
c := msg.Data[0]
a := msg.Data[1:]
cmd := exec.Command(c, a...)
cmd.Stdout = os.Stdout
err := cmd.Start()
if err != nil {
fmt.Printf("error: execution of command failed: %v\n", err)
}
}
}
}
// Listen for message will send an ACK message back to the sender,
// and put the received incomming message on the reqMsg channel
// for further processing.
func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message, node string) func(req *nats.Msg) {
return func(req *nats.Msg) {
message := Message{}
// Create a buffer to decode the gob encoded binary data back
// to it's original structure.
buf := bytes.NewBuffer(req.Data)
gobDec := gob.NewDecoder(buf)
err := gobDec.Decode(&message)
if err != nil {
fmt.Printf("error: gob decoding failed: %v\n", err)
}
// Put the data recived on the channel for further processing
reqMsgCh <- message
// Send a confirmation message back to the publisher
natsConn.Publish(req.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID)))
}
}