diff --git a/central/main.go b/central/main.go index ab0f9d7..752911b 100644 --- a/central/main.go +++ b/central/main.go @@ -16,12 +16,18 @@ import ( type messageType int const ( + // shellCommand, command that will just wait for an + // ack, and nothing of the output of the command are + // delivered back in the reply ack message. + // The message should contain the unique ID of the + // command. + shellCommandReturnOutput messageType = iota // shellCommand, wait for and return the output // of the command in the ACK message. This means // that the command should be executed immediately // and that we should get the confirmation that it // was successful or not. - shellCommand messageType = iota + shellCommandReturnAck messageType = iota // eventCommand, just wait for the ACK that the // message is received. What action happens on the // receiving side is up to the received to decide. @@ -34,75 +40,91 @@ type Message struct { // The actual data in the message // TODO: Change this to a slice instead...or maybe use an // interface type here to handle several data types ? - Data string + Data []string // The type of the message being sent MessageType messageType } func main() { + edgeID := "btship1" // Create a connection to nats server, and publish a message. - nc, err := nats.Connect("localhost", nil) + natsConn, err := nats.Connect("localhost", nil) if err != nil { log.Printf("error: nats.Connect failed: %v\n", err) } - defer nc.Close() + defer natsConn.Close() - // The SubscribeSync used in the subscriber, will get messages that - // are sent after it started subscribing, so we start a publisher - // that sends out a message every second. - go func() { - counter := 0 + // There should be on IDCounter per Subject later on. + IDCounter := 0 - for { - m := Message{ - ID: counter, - Data: "ls", - } - - var buf bytes.Buffer - gobEnc := gob.NewEncoder(&buf) - err := gobEnc.Encode(m) - if err != nil { - fmt.Printf("error: gob.Enode failed: %v\n", err) - } - - //fmt.Printf("The gob encoded message right after encoding: %v\n", buf.Bytes()) - - msg := &nats.Msg{ - Reply: "subjectReply", - Data: buf.Bytes(), - Subject: "subject1", - } - - err = nc.PublishMsg(msg) - if err != nil { - log.Printf("error: publish failed: %v\n", err) - continue - } - - // Create a subscriber for the reply message. - subReply, err := nc.SubscribeSync(msg.Reply) - if err != nil { - log.Printf("error: nc.SubscribeSync failed: %v\n", err) - continue - } - - // Wait up until 10 seconds for a reply, - // continue and resend if to reply received. - msgReply, err := subReply.NextMsg(time.Second * 10) - if err != nil { - log.Printf("error: subRepl.NextMsg failed: %v\n", err) - // did not receive a reply, continuing from top again - continue - } - fmt.Printf("publisher: received: %s\n", msgReply.Data) - - // Increment the counter for the next message to be sent. - counter++ - time.Sleep(time.Second * 1) + for { + m := Message{ + ID: IDCounter, + Data: []string{"uname", "-a"}, + MessageType: shellCommandReturnAck, } - }() - select {} + // TODO: Have a channel here to return + messageDeliver(edgeID, m, natsConn) + + // Increment the counter for the next message to be sent. + IDCounter++ + time.Sleep(time.Second * 1) + } } + +func messageDeliver(edgeID string, message Message, natsConn *nats.Conn) { + for { + dataPayload, err := createDataPayload(message) + if err != nil { + log.Printf("error: createDataPayload: %v\n", err) + } + + msg := &nats.Msg{ + Subject: edgeID, + Reply: "subjectReply", + Data: dataPayload, + } + + // The SubscribeSync used in the subscriber, will get messages that + // are sent after it started subscribing, so we start a publisher + // that sends out a message every second. + // + // Create a subscriber for the reply message. + subReply, err := natsConn.SubscribeSync(msg.Reply) + if err != nil { + log.Printf("error: nc.SubscribeSync failed: %v\n", err) + continue + } + + // Publish message + err = natsConn.PublishMsg(msg) + if err != nil { + log.Printf("error: publish failed: %v\n", err) + continue + } + + // Wait up until 10 seconds for a reply, + // continue and resend if to reply received. + msgReply, err := subReply.NextMsg(time.Second * 10) + if err != nil { + log.Printf("error: subRepl.NextMsg failed: %v\n", err) + // did not receive a reply, continuing from top again + continue + } + fmt.Printf("publisher: received: %s\n", msgReply.Data) + return + } +} + +func createDataPayload(m Message) ([]byte, error) { + var buf bytes.Buffer + gobEnc := gob.NewEncoder(&buf) + err := gobEnc.Encode(m) + if err != nil { + return nil, fmt.Errorf("error: gob.Enode failed: %v", err) + } + + return buf.Bytes(), nil +} diff --git a/edge/main.go b/edge/main.go index bd8fb5e..e0773d9 100644 --- a/edge/main.go +++ b/edge/main.go @@ -14,12 +14,18 @@ import ( type messageType int const ( + // shellCommand, command that will just wait for an + // ack, and nothing of the output of the command are + // delivered back in the reply ack message. + // The message should contain the unique ID of the + // command. + shellCommandReturnOutput messageType = iota // shellCommand, wait for and return the output // of the command in the ACK message. This means // that the command should be executed immediately // and that we should get the confirmation that it // was successful or not. - shellCommand messageType = iota + shellCommandReturnAck messageType = iota // eventCommand, just wait for the ACK that the // message is received. What action happens on the // receiving side is up to the received to decide. @@ -30,26 +36,53 @@ type Message struct { // The Unique ID of the message ID int // The actual data in the message - Data string + Data []string // The type of the message being sent MessageType messageType } func main() { + edgeID := "btship1" // Create a connection to nats server, and publish a message. - nc, err := nats.Connect("localhost", nil) + natsConn, err := nats.Connect("localhost", nil) if err != nil { log.Printf("error: nats.Connect failed: %v\n", err) } - defer nc.Close() + defer natsConn.Close() // Create a channel to put the data received in the subscriber callback // function reqMsgCh := make(chan Message) - // Subscribe will start up a Go routine calling the callback function - // specified when a new message is received. - _, err = nc.Subscribe("subject1", func(req *nats.Msg) { + // Subscribe will start up a Go routine under the hood calling the + // callback function specified when a new message is received. + _, err = natsConn.Subscribe(edgeID, messageHandler(natsConn, reqMsgCh)) + if err != nil { + fmt.Printf("error: Subscribe failed: %v\n", err) + } + + // Do some further processing of the actual data we received in the + // subscriber callback function. + for { + msg := <-reqMsgCh + fmt.Printf("%v\n", msg) + switch msg.MessageType { + case shellCommandReturnAck: + c := msg.Data[0] + a := msg.Data[1:] + cmd := exec.Command(c, a...) + cmd.Stdout = os.Stdout + err := cmd.Start() + if err != nil { + fmt.Printf("error: execution of command failed: %v\n", err) + } + } + + } +} + +func messageHandler(natsConn *nats.Conn, reqMsgCh chan Message) func(req *nats.Msg) { + return func(req *nats.Msg) { message := Message{} // Create a buffer to decode the gob encoded binary data back @@ -65,26 +98,6 @@ func main() { reqMsgCh <- message // Send a confirmation message back to the publisher - nc.Publish(req.Reply, []byte("confirmed: "+fmt.Sprint(message.ID))) - }) - if err != nil { - fmt.Printf("error: Subscribe failed: %v\n", err) - } - - // Do some further processing of the actual data we received in the - // subscriber callback function. - for { - msg := <-reqMsgCh - - switch msg.MessageType { - case shellCommand: - cmd := exec.Command(msg.Data, "-l") - cmd.Stdout = os.Stdout - err := cmd.Start() - if err != nil { - fmt.Printf("error: execution of command failed: %v\n", err) - } - } - + natsConn.Publish(req.Reply, []byte("confirmed: "+fmt.Sprint(message.ID))) } } diff --git a/go.mod b/go.mod index 22e564e..09e4830 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,9 @@ module github.com/RaaLabs/steward go 1.15 -require github.com/nats-io/nats.go v1.10.0 +require ( + github.com/golang/protobuf v1.4.3 // indirect + github.com/nats-io/nats-server/v2 v2.1.9 // indirect + github.com/nats-io/nats.go v1.10.0 + google.golang.org/protobuf v1.25.0 // indirect +) diff --git a/go.sum b/go.sum index 3dd11f0..5a36776 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,33 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= +github.com/nats-io/jwt v1.1.0 h1:+vOlgtM0ZsF46GbmUoadq0/2rChNS45gtxHEa3H1gqM= +github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= +github.com/nats-io/nats-server/v2 v2.1.9 h1:Sxr2zpaapgpBT9ElTxTVe62W+qjnhPcKY/8W5cnA/Qk= +github.com/nats-io/nats-server/v2 v2.1.9/go.mod h1:9qVyoewoYXzG1ME9ox0HwkkzyYvnlBDugfR4Gg/8uHU= github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= @@ -7,11 +35,53 @@ github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e h1:D5TXcfTk7xF7hvieo4QErS3qqCB4teTffacDWr7CI+0= +golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=