diff --git a/central/main.go b/central/main.go index 752911b..e538041 100644 --- a/central/main.go +++ b/central/main.go @@ -8,6 +8,7 @@ import ( "encoding/gob" "fmt" "log" + "os" "time" "github.com/nats-io/nats.go" @@ -21,17 +22,16 @@ const ( // delivered back in the reply ack message. // The message should contain the unique ID of the // command. - shellCommandReturnOutput messageType = iota + commandReturnOutput messageType = iota // shellCommand, wait for and return the output // of the command in the ACK message. This means // that the command should be executed immediately // and that we should get the confirmation that it // was successful or not. - shellCommandReturnAck messageType = iota + eventReturnAck messageType = iota // eventCommand, just wait for the ACK that the // message is received. What action happens on the // receiving side is up to the received to decide. - eventCommand messageType = iota ) type Message struct { @@ -45,38 +45,61 @@ type Message struct { MessageType messageType } -func main() { - edgeID := "btship1" - // Create a connection to nats server, and publish a message. - natsConn, err := nats.Connect("localhost", nil) - if err != nil { - log.Printf("error: nats.Connect failed: %v\n", err) +// session are represent the communication to one individual host +type session struct { + messageID int + subject string + hostID hostID +} + +type hostID string + +type server struct { + natsConn *nats.Conn + sessions map[hostID]session +} + +func newServer(brokerAddress string) (*server, error) { + return &server{ + sessions: make(map[hostID]session), + }, nil +} + +func (s *server) Run() error { + // create the initial configuration for a sessions communicating with 1 host. + session := session{ + messageID: 0, + hostID: "btship1", } - defer natsConn.Close() - - // There should be on IDCounter per Subject later on. - IDCounter := 0 + s.sessions["btship1"] = session + // Loop creating one new message every second to simulate getting new + // messages to deliver. for { - m := Message{ - ID: IDCounter, - Data: []string{"uname", "-a"}, - MessageType: shellCommandReturnAck, - } - - // TODO: Have a channel here to return - messageDeliver(edgeID, m, natsConn) + m := getMessageToDeliver() + m.ID = s.sessions["btship1"].messageID + messageDeliver("btship1", m, s.natsConn) // Increment the counter for the next message to be sent. - IDCounter++ + session.messageID++ + s.sessions["btship1"] = session time.Sleep(time.Second * 1) } } +// get MessageToDeliver will pick up the next message to be created. +// TODO: read this from local file or rest or....? +func getMessageToDeliver() Message { + return Message{ + Data: []string{"uname", "-a"}, + MessageType: eventReturnAck, + } +} + func messageDeliver(edgeID string, message Message, natsConn *nats.Conn) { for { - dataPayload, err := createDataPayload(message) + dataPayload, err := gobEncodePayload(message) if err != nil { log.Printf("error: createDataPayload: %v\n", err) } @@ -118,7 +141,9 @@ func messageDeliver(edgeID string, message Message, natsConn *nats.Conn) { } } -func createDataPayload(m Message) ([]byte, error) { +// gobEncodePayload will encode the message structure along with its +// valued in gob binary format. +func gobEncodePayload(m Message) ([]byte, error) { var buf bytes.Buffer gobEnc := gob.NewEncoder(&buf) err := gobEnc.Encode(m) @@ -128,3 +153,19 @@ func createDataPayload(m Message) ([]byte, error) { return buf.Bytes(), nil } + +func main() { + s, err := newServer("localhost") + if err != nil { + log.Printf("error: failed to connect to broker: %v\n", err) + os.Exit(1) + } + // Create a connection to nats server, and publish a message. + s.natsConn, err = nats.Connect("localhost", nil) + if err != nil { + log.Printf("error: nats.Connect failed: %v\n", err) + } + defer s.natsConn.Close() + + s.Run() +} diff --git a/edge/main.go b/edge/main.go index e0773d9..3c4c337 100644 --- a/edge/main.go +++ b/edge/main.go @@ -3,6 +3,7 @@ package main import ( "bytes" "encoding/gob" + "flag" "fmt" "log" "os" @@ -19,17 +20,16 @@ const ( // delivered back in the reply ack message. // The message should contain the unique ID of the // command. - shellCommandReturnOutput messageType = iota + commandReturnOutput messageType = iota // shellCommand, wait for and return the output // of the command in the ACK message. This means // that the command should be executed immediately // and that we should get the confirmation that it // was successful or not. - shellCommandReturnAck messageType = iota + eventReturnAck messageType = iota // eventCommand, just wait for the ACK that the // message is received. What action happens on the // receiving side is up to the received to decide. - eventCommand messageType = iota ) type Message struct { @@ -42,7 +42,9 @@ type Message struct { } func main() { - edgeID := "btship1" + edgeID := flag.String("edgeID", "0", "some unique string to identify this Edge unit") + flag.Parse() + // Create a connection to nats server, and publish a message. natsConn, err := nats.Connect("localhost", nil) if err != nil { @@ -56,7 +58,7 @@ func main() { // Subscribe will start up a Go routine under the hood calling the // callback function specified when a new message is received. - _, err = natsConn.Subscribe(edgeID, messageHandler(natsConn, reqMsgCh)) + _, err = natsConn.Subscribe(*edgeID, listenForMessage(natsConn, reqMsgCh)) if err != nil { fmt.Printf("error: Subscribe failed: %v\n", err) } @@ -67,7 +69,7 @@ func main() { msg := <-reqMsgCh fmt.Printf("%v\n", msg) switch msg.MessageType { - case shellCommandReturnAck: + case eventReturnAck: c := msg.Data[0] a := msg.Data[1:] cmd := exec.Command(c, a...) @@ -81,7 +83,10 @@ func main() { } } -func messageHandler(natsConn *nats.Conn, reqMsgCh chan Message) func(req *nats.Msg) { +// Listen for message will send an ACK message back to the sender, +// and put the received incomming message on the reqMsg channel +// for further processing. +func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message) func(req *nats.Msg) { return func(req *nats.Msg) { message := Message{}