From 091797ad1a07abeb513f2c3dea3fa26610036a43 Mon Sep 17 00:00:00 2001 From: postmannen Date: Mon, 25 Jan 2021 15:23:00 +0100 Subject: [PATCH] initial commit --- central/main.go | 87 +++++++++++++++++++++++++++++++++++++++++++++++++ edge/main.go | 58 +++++++++++++++++++++++++++++++++ go.mod | 3 ++ 3 files changed, 148 insertions(+) create mode 100644 central/main.go create mode 100644 edge/main.go create mode 100644 go.mod diff --git a/central/main.go b/central/main.go new file mode 100644 index 0000000..57d8f4d --- /dev/null +++ b/central/main.go @@ -0,0 +1,87 @@ +// Same as 02 example, but using PublishMsg method instead of Request method +// for publishing. + +package main + +import ( + "bytes" + "encoding/gob" + "fmt" + "log" + "time" + + "github.com/nats-io/nats.go" +) + +type Message struct { + ID int // The Unique ID of the message + Data string // The actual data in the message +} + +func main() { + // Create a connection to nats server, and publish a message. + nc, err := nats.Connect("localhost", nil) + if err != nil { + log.Printf("error: nats.Connect failed: %v\n", err) + } + defer nc.Close() + + // The SubscribeSync used in the subscriber, will get messages that + // are sent after it started subscribing, so we start a publisher + // that sends out a message every second. + go func() { + counter := 0 + + for { + m := Message{ + ID: counter, + Data: "just some data", + } + + var buf bytes.Buffer + gobEnc := gob.NewEncoder(&buf) + err := gobEnc.Encode(m) + if err != nil { + fmt.Printf("error: gob.Enode failed: %v\n", err) + } + + //fmt.Printf("The gob encoded message right after encoding: %v\n", buf.Bytes()) + + msg := &nats.Msg{ + Reply: "subjectReply", + Data: buf.Bytes(), + Subject: "subject1", + } + + err = nc.PublishMsg(msg) + if err != nil { + log.Printf("error: publish failed: %v\n", err) + continue + } + + // Create a subscriber for the reply message. + subReply, err := nc.SubscribeSync(msg.Reply) + if err != nil { + log.Printf("error: nc.SubscribeSync failed: %v\n", err) + continue + } + + // Wait up until 10 seconds for a reply, + // continue and resend if to reply received. + msgReply, err := subReply.NextMsg(time.Second * 10) + if err != nil { + log.Printf("error: subRepl.NextMsg failed: %v\n", err) + // did not receive a reply, continuing from top again + continue + } + fmt.Printf("publisher: received: %s\n", msgReply.Data) + + // Increment the counter for the next message to be sent. + counter++ + time.Sleep(time.Second * 1) + } + }() + + select {} + +} diff --git a/edge/main.go b/edge/main.go new file mode 100644 index 0000000..593b717 --- /dev/null +++ b/edge/main.go @@ -0,0 +1,58 @@ +package main + +import ( + "bytes" + "encoding/gob" + "fmt" + "log" + + "github.com/nats-io/nats.go" +) + +type Message struct { + ID int // The Unique ID of the message + Data string // The actual data in the message +} + +func main() { + // Create a connection to nats server, and publish a message. + nc, err := nats.Connect("localhost", nil) + if err != nil { + log.Printf("error: nats.Connect failed: %v\n", err) + } + defer nc.Close() + + // Create a channel to put the data received in the subscriber callback + // function + reqMsgCh := make(chan Message) + + // Subscribe will start up a Go routine calling the callback function + // specified when a new message is received. + _, err = nc.Subscribe("subject1", func(req *nats.Msg) { + message := Message{} + + // Create a buffer to decode the gob encoded binary data back + // to it's original structure. + buf := bytes.NewBuffer(req.Data) + gobDec := gob.NewDecoder(buf) + err := gobDec.Decode(&message) + if err != nil { + fmt.Printf("error: gob decoding failed: %v\n", err) + } + + // Put the data recived on the channel for further processing + reqMsgCh <- message + + // Send a confirmation message back to the publisher + nc.Publish(req.Reply, []byte("confirmed: "+fmt.Sprint(message.ID))) + }) + if err != nil { + fmt.Printf("error: Subscribe failed: %v\n", err) + } + + // Do some further processing of the actual data we received in the + // subscriber callback function. + for { + fmt.Printf("subcriber: received data = %#v\n", <-reqMsgCh) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..eaa6afe --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/RaaLabs/steward + +go 1.15