1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

added sessions

This commit is contained in:
postmannen 2021-01-27 14:02:57 +01:00
parent 6e9c06b2ec
commit 7bf4b356dd
2 changed files with 77 additions and 31 deletions

View file

@ -8,6 +8,7 @@ import (
"encoding/gob"
"fmt"
"log"
"os"
"time"
"github.com/nats-io/nats.go"
@ -21,17 +22,16 @@ const (
// delivered back in the reply ack message.
// The message should contain the unique ID of the
// command.
shellCommandReturnOutput messageType = iota
commandReturnOutput messageType = iota
// shellCommand, wait for and return the output
// of the command in the ACK message. This means
// that the command should be executed immediately
// and that we should get the confirmation that it
// was successful or not.
shellCommandReturnAck messageType = iota
eventReturnAck messageType = iota
// eventCommand, just wait for the ACK that the
// message is received. What action happens on the
// receiving side is up to the received to decide.
eventCommand messageType = iota
)
type Message struct {
@ -45,38 +45,61 @@ type Message struct {
MessageType messageType
}
func main() {
edgeID := "btship1"
// Create a connection to nats server, and publish a message.
natsConn, err := nats.Connect("localhost", nil)
if err != nil {
log.Printf("error: nats.Connect failed: %v\n", err)
// session are represent the communication to one individual host
type session struct {
messageID int
subject string
hostID hostID
}
type hostID string
type server struct {
natsConn *nats.Conn
sessions map[hostID]session
}
func newServer(brokerAddress string) (*server, error) {
return &server{
sessions: make(map[hostID]session),
}, nil
}
func (s *server) Run() error {
// create the initial configuration for a sessions communicating with 1 host.
session := session{
messageID: 0,
hostID: "btship1",
}
defer natsConn.Close()
// There should be on IDCounter per Subject later on.
IDCounter := 0
s.sessions["btship1"] = session
// Loop creating one new message every second to simulate getting new
// messages to deliver.
for {
m := Message{
ID: IDCounter,
Data: []string{"uname", "-a"},
MessageType: shellCommandReturnAck,
}
// TODO: Have a channel here to return
messageDeliver(edgeID, m, natsConn)
m := getMessageToDeliver()
m.ID = s.sessions["btship1"].messageID
messageDeliver("btship1", m, s.natsConn)
// Increment the counter for the next message to be sent.
IDCounter++
session.messageID++
s.sessions["btship1"] = session
time.Sleep(time.Second * 1)
}
}
// get MessageToDeliver will pick up the next message to be created.
// TODO: read this from local file or rest or....?
func getMessageToDeliver() Message {
return Message{
Data: []string{"uname", "-a"},
MessageType: eventReturnAck,
}
}
func messageDeliver(edgeID string, message Message, natsConn *nats.Conn) {
for {
dataPayload, err := createDataPayload(message)
dataPayload, err := gobEncodePayload(message)
if err != nil {
log.Printf("error: createDataPayload: %v\n", err)
}
@ -118,7 +141,9 @@ func messageDeliver(edgeID string, message Message, natsConn *nats.Conn) {
}
}
func createDataPayload(m Message) ([]byte, error) {
// gobEncodePayload will encode the message structure along with its
// valued in gob binary format.
func gobEncodePayload(m Message) ([]byte, error) {
var buf bytes.Buffer
gobEnc := gob.NewEncoder(&buf)
err := gobEnc.Encode(m)
@ -128,3 +153,19 @@ func createDataPayload(m Message) ([]byte, error) {
return buf.Bytes(), nil
}
func main() {
s, err := newServer("localhost")
if err != nil {
log.Printf("error: failed to connect to broker: %v\n", err)
os.Exit(1)
}
// Create a connection to nats server, and publish a message.
s.natsConn, err = nats.Connect("localhost", nil)
if err != nil {
log.Printf("error: nats.Connect failed: %v\n", err)
}
defer s.natsConn.Close()
s.Run()
}

View file

@ -3,6 +3,7 @@ package main
import (
"bytes"
"encoding/gob"
"flag"
"fmt"
"log"
"os"
@ -19,17 +20,16 @@ const (
// delivered back in the reply ack message.
// The message should contain the unique ID of the
// command.
shellCommandReturnOutput messageType = iota
commandReturnOutput messageType = iota
// shellCommand, wait for and return the output
// of the command in the ACK message. This means
// that the command should be executed immediately
// and that we should get the confirmation that it
// was successful or not.
shellCommandReturnAck messageType = iota
eventReturnAck messageType = iota
// eventCommand, just wait for the ACK that the
// message is received. What action happens on the
// receiving side is up to the received to decide.
eventCommand messageType = iota
)
type Message struct {
@ -42,7 +42,9 @@ type Message struct {
}
func main() {
edgeID := "btship1"
edgeID := flag.String("edgeID", "0", "some unique string to identify this Edge unit")
flag.Parse()
// Create a connection to nats server, and publish a message.
natsConn, err := nats.Connect("localhost", nil)
if err != nil {
@ -56,7 +58,7 @@ func main() {
// Subscribe will start up a Go routine under the hood calling the
// callback function specified when a new message is received.
_, err = natsConn.Subscribe(edgeID, messageHandler(natsConn, reqMsgCh))
_, err = natsConn.Subscribe(*edgeID, listenForMessage(natsConn, reqMsgCh))
if err != nil {
fmt.Printf("error: Subscribe failed: %v\n", err)
}
@ -67,7 +69,7 @@ func main() {
msg := <-reqMsgCh
fmt.Printf("%v\n", msg)
switch msg.MessageType {
case shellCommandReturnAck:
case eventReturnAck:
c := msg.Data[0]
a := msg.Data[1:]
cmd := exec.Command(c, a...)
@ -81,7 +83,10 @@ func main() {
}
}
func messageHandler(natsConn *nats.Conn, reqMsgCh chan Message) func(req *nats.Msg) {
// Listen for message will send an ACK message back to the sender,
// and put the received incomming message on the reqMsg channel
// for further processing.
func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message) func(req *nats.Msg) {
return func(req *nats.Msg) {
message := Message{}