1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

add service for publishing hello I'm here messages

This commit is contained in:
postmannen 2021-02-26 09:02:53 +01:00
parent f736212307
commit dc42ce29e9
5 changed files with 74 additions and 6 deletions

View file

@ -1,8 +1,10 @@
# steward
Async management of Edge Cloud units.
Command And Control anything asynchronously.
The idea is to build and use a pure message passing architecture for the control traffic back and forth from nodes, where a node can be a server, some other host system, or a container living in the cloud, or...other. The message passing backend used is <https://nats.io>
Send shell commands to control your servers by passing a message that will have guaranteed delivery if/when the subsribing node is available. Or for example send logs or metrics from an end node back to a central log subscriber.
The idea is to build and use a pure message passing architecture for the commands back and forth from nodes, with guaranteed delivery. A node can be a server running any host operating system, a container living in the cloud somewhere, a rapsberry pi, or something else that needs to be controlled that have an operating system installed . The message passing backend used is <https://nats.io>
```text
┌─────────────────┐
@ -68,7 +70,6 @@ Example: We probably want an ACK when sending some shellcommand to be executed,
- Prometheus exporters for Metrics
- More will come. In active development.
## Concepts/Ideas
@ -262,6 +263,6 @@ You can save the content to myfile.JSON and append it to `inmsg.txt`
The content of `inmsg.txt` will be erased as messages a processed.
## Overvview
## Overview
![overview](steward.svg)

View file

@ -19,6 +19,7 @@ func main() {
centralErrorLogger := flag.Bool("centralErrorLogger", false, "set to true if this is the node that should receive the error log's from other nodes")
defaultMessageTimeout := flag.Int("defaultMessageTimeout", 10, "default message timeout in seconds. This can be overridden on the message level")
defaultMessageRetries := flag.Int("defaultMessageRetries", 0, "default amount of retries that will be done before a message is thrown away, and out of the system")
publisherServiceSayhello := flag.Int("publisherServiceSayhello", 0, "Make the current node send hello messages to central at given interval in seconds")
flag.Parse()
// Start profiling if profiling port is specified
@ -29,7 +30,7 @@ func main() {
}
s, err := steward.NewServer(*brokerAddress, *nodeName, *promHostAndPort, *centralErrorLogger, *defaultMessageTimeout, *defaultMessageRetries)
s, err := steward.NewServer(*brokerAddress, *nodeName, *promHostAndPort, *centralErrorLogger, *defaultMessageTimeout, *defaultMessageRetries, *publisherServiceSayhello)
if err != nil {
log.Printf("error: failed to connect to broker: %v\n", err)
os.Exit(1)

Binary file not shown.

59
publisher-services.go Normal file
View file

@ -0,0 +1,59 @@
package steward
import (
"fmt"
"time"
)
type publisherServices struct {
sayHelloPublisher sayHelloPublisher
}
func newPublisherServices(sayHelloInterval int) *publisherServices {
ps := publisherServices{
sayHelloPublisher: sayHelloPublisher{
interval: sayHelloInterval,
},
}
return &ps
}
// ---
type sayHelloPublisher struct {
interval int
}
func (s *sayHelloPublisher) start(newMessagesCh chan<- []subjectAndMessage, fromNode node) {
go func() {
for {
sam := s.createMsg(fromNode)
newMessagesCh <- []subjectAndMessage{sam}
time.Sleep(time.Second * time.Duration(s.interval))
}
}()
}
// Will prepare a subject and message with the content
// of the error
func (s *sayHelloPublisher) createMsg(FromNode node) subjectAndMessage {
// TESTING: Creating an error message to send to errorCentral
m := fmt.Sprintf("Hello from %v\n", FromNode)
sam := subjectAndMessage{
Subject: Subject{
ToNode: "errorCentral",
CommandOrEvent: EventNACK,
Method: ErrorLog,
},
Message: Message{
ToNode: "errorCentral",
FromNode: FromNode,
Data: []string{m},
CommandOrEvent: EventNACK,
Method: SayHello,
},
}
return sam
}

View file

@ -55,6 +55,8 @@ type server struct {
// central logger.
subscriberServices *subscriberServices
// Is this the central error logger ?
// collection of the publisher services and the types to control them
publisherServices *publisherServices
centralErrorLogger bool
// default message timeout in seconds. This can be overridden on the message level
defaultMessageTimeout int
@ -63,7 +65,7 @@ type server struct {
}
// newServer will prepare and return a server type
func NewServer(brokerAddress string, nodeName string, promHostAndPort string, centralErrorLogger bool, defaultMessageTimeout int, defaultMessageRetries int) (*server, error) {
func NewServer(brokerAddress string, nodeName string, promHostAndPort string, centralErrorLogger bool, defaultMessageTimeout int, defaultMessageRetries int, sayHelloInterval int) (*server, error) {
conn, err := nats.Connect(brokerAddress, nil)
if err != nil {
log.Printf("error: nats.Connect failed: %v\n", err)
@ -81,6 +83,7 @@ func NewServer(brokerAddress string, nodeName string, promHostAndPort string, ce
commandOrEventAvailable: coe.GetCommandOrEventAvailable(),
metrics: newMetrics(promHostAndPort),
subscriberServices: newSubscriberServices(),
publisherServices: newPublisherServices(sayHelloInterval),
centralErrorLogger: centralErrorLogger,
defaultMessageTimeout: defaultMessageTimeout,
defaultMessageRetries: defaultMessageRetries,
@ -111,6 +114,10 @@ func (s *server) Start() {
// starting asks to subscribe to TextLogging events.
go s.subscriberServices.startTextLogging()
if s.publisherServices.sayHelloPublisher.interval != 0 {
go s.publisherServices.sayHelloPublisher.start(s.newMessagesCh, node(s.nodeName))
}
// Start up the predefined subscribers.
// TODO: What to subscribe on should be handled via flags, or config
// files.