1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

restructuring

This commit is contained in:
postmannen 2021-01-27 09:45:52 +01:00
parent 360089081d
commit 6e9c06b2ec
4 changed files with 197 additions and 87 deletions

View file

@ -16,12 +16,18 @@ import (
type messageType int
const (
// shellCommand, command that will just wait for an
// ack, and nothing of the output of the command are
// delivered back in the reply ack message.
// The message should contain the unique ID of the
// command.
shellCommandReturnOutput messageType = iota
// shellCommand, wait for and return the output
// of the command in the ACK message. This means
// that the command should be executed immediately
// and that we should get the confirmation that it
// was successful or not.
shellCommand messageType = iota
shellCommandReturnAck messageType = iota
// eventCommand, just wait for the ACK that the
// message is received. What action happens on the
// receiving side is up to the received to decide.
@ -34,75 +40,91 @@ type Message struct {
// The actual data in the message
// TODO: Change this to a slice instead...or maybe use an
// interface type here to handle several data types ?
Data string
Data []string
// The type of the message being sent
MessageType messageType
}
func main() {
edgeID := "btship1"
// Create a connection to nats server, and publish a message.
nc, err := nats.Connect("localhost", nil)
natsConn, err := nats.Connect("localhost", nil)
if err != nil {
log.Printf("error: nats.Connect failed: %v\n", err)
}
defer nc.Close()
defer natsConn.Close()
// The SubscribeSync used in the subscriber, will get messages that
// are sent after it started subscribing, so we start a publisher
// that sends out a message every second.
go func() {
counter := 0
// There should be on IDCounter per Subject later on.
IDCounter := 0
for {
m := Message{
ID: counter,
Data: "ls",
}
var buf bytes.Buffer
gobEnc := gob.NewEncoder(&buf)
err := gobEnc.Encode(m)
if err != nil {
fmt.Printf("error: gob.Enode failed: %v\n", err)
}
//fmt.Printf("The gob encoded message right after encoding: %v\n", buf.Bytes())
msg := &nats.Msg{
Reply: "subjectReply",
Data: buf.Bytes(),
Subject: "subject1",
}
err = nc.PublishMsg(msg)
if err != nil {
log.Printf("error: publish failed: %v\n", err)
continue
}
// Create a subscriber for the reply message.
subReply, err := nc.SubscribeSync(msg.Reply)
if err != nil {
log.Printf("error: nc.SubscribeSync failed: %v\n", err)
continue
}
// Wait up until 10 seconds for a reply,
// continue and resend if to reply received.
msgReply, err := subReply.NextMsg(time.Second * 10)
if err != nil {
log.Printf("error: subRepl.NextMsg failed: %v\n", err)
// did not receive a reply, continuing from top again
continue
}
fmt.Printf("publisher: received: %s\n", msgReply.Data)
// Increment the counter for the next message to be sent.
counter++
time.Sleep(time.Second * 1)
for {
m := Message{
ID: IDCounter,
Data: []string{"uname", "-a"},
MessageType: shellCommandReturnAck,
}
}()
select {}
// TODO: Have a channel here to return
messageDeliver(edgeID, m, natsConn)
// Increment the counter for the next message to be sent.
IDCounter++
time.Sleep(time.Second * 1)
}
}
func messageDeliver(edgeID string, message Message, natsConn *nats.Conn) {
for {
dataPayload, err := createDataPayload(message)
if err != nil {
log.Printf("error: createDataPayload: %v\n", err)
}
msg := &nats.Msg{
Subject: edgeID,
Reply: "subjectReply",
Data: dataPayload,
}
// The SubscribeSync used in the subscriber, will get messages that
// are sent after it started subscribing, so we start a publisher
// that sends out a message every second.
//
// Create a subscriber for the reply message.
subReply, err := natsConn.SubscribeSync(msg.Reply)
if err != nil {
log.Printf("error: nc.SubscribeSync failed: %v\n", err)
continue
}
// Publish message
err = natsConn.PublishMsg(msg)
if err != nil {
log.Printf("error: publish failed: %v\n", err)
continue
}
// Wait up until 10 seconds for a reply,
// continue and resend if to reply received.
msgReply, err := subReply.NextMsg(time.Second * 10)
if err != nil {
log.Printf("error: subRepl.NextMsg failed: %v\n", err)
// did not receive a reply, continuing from top again
continue
}
fmt.Printf("publisher: received: %s\n", msgReply.Data)
return
}
}
func createDataPayload(m Message) ([]byte, error) {
var buf bytes.Buffer
gobEnc := gob.NewEncoder(&buf)
err := gobEnc.Encode(m)
if err != nil {
return nil, fmt.Errorf("error: gob.Enode failed: %v", err)
}
return buf.Bytes(), nil
}

View file

@ -14,12 +14,18 @@ import (
type messageType int
const (
// shellCommand, command that will just wait for an
// ack, and nothing of the output of the command are
// delivered back in the reply ack message.
// The message should contain the unique ID of the
// command.
shellCommandReturnOutput messageType = iota
// shellCommand, wait for and return the output
// of the command in the ACK message. This means
// that the command should be executed immediately
// and that we should get the confirmation that it
// was successful or not.
shellCommand messageType = iota
shellCommandReturnAck messageType = iota
// eventCommand, just wait for the ACK that the
// message is received. What action happens on the
// receiving side is up to the received to decide.
@ -30,26 +36,53 @@ type Message struct {
// The Unique ID of the message
ID int
// The actual data in the message
Data string
Data []string
// The type of the message being sent
MessageType messageType
}
func main() {
edgeID := "btship1"
// Create a connection to nats server, and publish a message.
nc, err := nats.Connect("localhost", nil)
natsConn, err := nats.Connect("localhost", nil)
if err != nil {
log.Printf("error: nats.Connect failed: %v\n", err)
}
defer nc.Close()
defer natsConn.Close()
// Create a channel to put the data received in the subscriber callback
// function
reqMsgCh := make(chan Message)
// Subscribe will start up a Go routine calling the callback function
// specified when a new message is received.
_, err = nc.Subscribe("subject1", func(req *nats.Msg) {
// Subscribe will start up a Go routine under the hood calling the
// callback function specified when a new message is received.
_, err = natsConn.Subscribe(edgeID, messageHandler(natsConn, reqMsgCh))
if err != nil {
fmt.Printf("error: Subscribe failed: %v\n", err)
}
// Do some further processing of the actual data we received in the
// subscriber callback function.
for {
msg := <-reqMsgCh
fmt.Printf("%v\n", msg)
switch msg.MessageType {
case shellCommandReturnAck:
c := msg.Data[0]
a := msg.Data[1:]
cmd := exec.Command(c, a...)
cmd.Stdout = os.Stdout
err := cmd.Start()
if err != nil {
fmt.Printf("error: execution of command failed: %v\n", err)
}
}
}
}
func messageHandler(natsConn *nats.Conn, reqMsgCh chan Message) func(req *nats.Msg) {
return func(req *nats.Msg) {
message := Message{}
// Create a buffer to decode the gob encoded binary data back
@ -65,26 +98,6 @@ func main() {
reqMsgCh <- message
// Send a confirmation message back to the publisher
nc.Publish(req.Reply, []byte("confirmed: "+fmt.Sprint(message.ID)))
})
if err != nil {
fmt.Printf("error: Subscribe failed: %v\n", err)
}
// Do some further processing of the actual data we received in the
// subscriber callback function.
for {
msg := <-reqMsgCh
switch msg.MessageType {
case shellCommand:
cmd := exec.Command(msg.Data, "-l")
cmd.Stdout = os.Stdout
err := cmd.Start()
if err != nil {
fmt.Printf("error: execution of command failed: %v\n", err)
}
}
natsConn.Publish(req.Reply, []byte("confirmed: "+fmt.Sprint(message.ID)))
}
}

7
go.mod
View file

@ -2,4 +2,9 @@ module github.com/RaaLabs/steward
go 1.15
require github.com/nats-io/nats.go v1.10.0
require (
github.com/golang/protobuf v1.4.3 // indirect
github.com/nats-io/nats-server/v2 v2.1.9 // indirect
github.com/nats-io/nats.go v1.10.0
google.golang.org/protobuf v1.25.0 // indirect
)

70
go.sum
View file

@ -1,5 +1,33 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v1.1.0 h1:+vOlgtM0ZsF46GbmUoadq0/2rChNS45gtxHEa3H1gqM=
github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/nats-server/v2 v2.1.9 h1:Sxr2zpaapgpBT9ElTxTVe62W+qjnhPcKY/8W5cnA/Qk=
github.com/nats-io/nats-server/v2 v2.1.9/go.mod h1:9qVyoewoYXzG1ME9ox0HwkkzyYvnlBDugfR4Gg/8uHU=
github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
@ -7,11 +35,53 @@ github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e h1:D5TXcfTk7xF7hvieo4QErS3qqCB4teTffacDWr7CI+0=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=