1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 06:46:48 +00:00

added types for ACK and NACK messages

This commit is contained in:
postmannen 2021-02-17 18:59:49 +01:00
parent 529cc48440
commit 8dea13b850
8 changed files with 36 additions and 12 deletions

View file

@ -20,8 +20,10 @@ type CommandOrEvent string
func (c CommandOrEvent) GetCommandOrEventAvailable() CommandOrEventAvailable {
ma := CommandOrEventAvailable{
topics: map[CommandOrEvent]struct{}{
Command: {},
Event: {},
CommandACK: {},
CommandNACK: {},
EventACK: {},
EventNACK: {},
},
}
@ -34,12 +36,17 @@ const (
// delivered back in the reply ack message.
// The message should contain the unique ID of the
// command.
Command CommandOrEvent = "command"
CommandACK CommandOrEvent = "commandACK"
// Same as above, but No ACK.
CommandNACK CommandOrEvent = "commandNACK"
// Same as above, but No ACK
// Event, wait for and return the ACK message. This means
// that the command should be executed immediately
// and that we should get the confirmation if it
// was successful or not.
Event CommandOrEvent = "event"
EventACK CommandOrEvent = "eventACK"
// Same as above, but No ACK.
EventNACK CommandOrEvent = "eventNACK"
// eventCommand, just wait for the ACK that the
// message is received. What action happens on the
// receiving side is up to the received to decide.

View file

@ -2,7 +2,7 @@
{
"toNode": "central",
"data": ["some message sent from a ship for testing\n"],
"commandOrEvent":"event",
"commandOrEvent":"eventACK",
"method":"textLogging"
}
]

View file

@ -3,7 +3,7 @@
"toNode": "ship1",
"data": ["bash","-c","ls -l ../"],
"commandOrEvent":"command",
"commandOrEvent":"commandACK",
"method":"shellCommand"
}

View file

@ -2,7 +2,7 @@
{
"toNode": "ship2",
"data": ["bash","-c","tree ../"],
"commandOrEvent":"command",
"commandOrEvent":"commandACK",
"method":"shellCommand"
}

Binary file not shown.

View file

@ -55,7 +55,8 @@ type server struct {
logCh chan []byte
// used to check if the methods specified in message is valid
methodsAvailable MethodsAvailable
// used to check if the commandOrEvent specified in message is valid
// Map who holds the command and event types available.
// Used to check if the commandOrEvent specified in message is valid
commandOrEventAvailable CommandOrEventAvailable
}
@ -104,7 +105,7 @@ func (s *server) Start() {
// Start a subscriber for shellCommand messages
{
fmt.Printf("nodeName: %#v\n", s.nodeName)
sub := newSubject(s.nodeName, "command", "shellCommand")
sub := newSubject(s.nodeName, CommandACK, "shellCommand")
proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc)
@ -113,7 +114,7 @@ func (s *server) Start() {
// Start a subscriber for textLogging messages
{
fmt.Printf("nodeName: %#v\n", s.nodeName)
sub := newSubject(s.nodeName, "event", "textLogging")
sub := newSubject(s.nodeName, EventACK, "textLogging")
proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc)
@ -454,7 +455,8 @@ func (s *server) subscriberHandler(natsConn *nats.Conn, node string, msg *nats.M
// that there was a problem like missing method to handle a specific
// method etc.
switch {
case message.CommandOrEvent == Command || message.CommandOrEvent == Event:
case message.CommandOrEvent == CommandACK || message.CommandOrEvent == EventACK:
fmt.Printf("* message.CommandOrEvent received was = %v\n", message.CommandOrEvent)
mf, ok := s.methodsAvailable.CheckIfExists(message.Method)
if !ok {
// TODO: Check how errors should be handled here!!!
@ -469,6 +471,8 @@ func (s *server) subscriberHandler(natsConn *nats.Conn, node string, msg *nats.M
// Send a confirmation message back to the publisher
natsConn.Publish(msg.Reply, out)
case message.CommandOrEvent == CommandNACK || message.CommandOrEvent == EventNACK:
fmt.Printf("* message.CommandOrEvent received was = %v\n", message.CommandOrEvent)
default:
log.Printf("info: did not find that specific type of command: %#v\n", message.CommandOrEvent)
}

View file

@ -199,6 +199,9 @@ func (r *ringBuffer) dumpBucket(bucket string) ([]samDBValue, error) {
err := r.db.View(func(tx *bolt.Tx) error {
bu := tx.Bucket([]byte(bucket))
if bu == nil {
return fmt.Errorf("error: dumpBucket: tx.bucket returned nil")
}
// For each element found in the DB, unmarshal, and put on slice.
bu.ForEach(func(k, v []byte) error {
@ -224,6 +227,10 @@ func (r *ringBuffer) dumpBucket(bucket string) ([]samDBValue, error) {
return nil
})
if err != nil {
return nil, err
}
return samDBValues, err
}
@ -238,7 +245,7 @@ func (r *ringBuffer) printBucketContent(bucket string) error {
var vv samDBValue
err := json.Unmarshal(v, &vv)
if err != nil {
log.Printf("error: dumpBucket json.Umarshal failed: %v\n", err)
log.Printf("error: printBucketContent json.Umarshal failed: %v\n", err)
}
fmt.Printf("k: %s, v: %v\n", k, vv)
return nil

View file

@ -20,3 +20,9 @@ some message sent from a ship for testing
some message sent from a ship for testing
some message sent from a ship for testing
some message sent from a ship for testing
some message sent from a ship for testing
some message sent from a ship for testing
some message sent from a ship for testing
some message sent from a ship for testing
some message sent from a ship for testing
some message sent from a ship for testing