1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

ability to choose publishers on subscriber processes

This commit is contained in:
postmannen 2021-02-26 15:11:20 +01:00
parent dc42ce29e9
commit 82c420bc54
7 changed files with 76 additions and 20 deletions

View file

@ -0,0 +1,11 @@
[
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"commandOrEvent":"CommandACK",
"method":"ShellCommand",
"timeout":3,
"retries":3
}
]

View file

@ -0,0 +1,11 @@
[
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"commandOrEvent":"CommandACK",
"method":"ShellCommand",
"timeout":3,
"retries":3
}
]

Binary file not shown.

View file

@ -4,6 +4,7 @@ import (
"log"
"net"
"net/http"
"os"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -101,6 +102,7 @@ func (s *server) startMetrics() {
n, err := net.Listen("tcp", s.metrics.hostAndPort)
if err != nil {
log.Printf("error: failed to open prometheus listen port: %v\n", err)
os.Exit(1)
}
m := http.NewServeMux()
m.Handle("/metrics", promhttp.Handler())

View file

@ -80,7 +80,7 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM
log.Printf("info: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode)
proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindPublisher)
proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindPublisher, nil)
// fmt.Printf("*** %#v\n", proc)
go s.spawnWorkerProcess(proc)

View file

@ -114,6 +114,7 @@ func (s *server) Start() {
// starting asks to subscribe to TextLogging events.
go s.subscriberServices.startTextLogging()
// if enabled, start the sayHello I'm here service at the given interval
if s.publisherServices.sayHelloPublisher.interval != 0 {
go s.publisherServices.sayHelloPublisher.start(s.newMessagesCh, node(s.nodeName))
}
@ -177,21 +178,29 @@ type process struct {
// NB: Implementing this as an int to report for testing
errorCh chan errProcess
processKind processKind
// Who are we allowed to receive from ?
allowedReceivers map[node]struct{}
}
// prepareNewProcess will set the the provided values and the default
// values for a process.
func (s *server) processPrepareNew(subject Subject, errCh chan errProcess, processKind processKind) process {
func (s *server) processPrepareNew(subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node) process {
// create the initial configuration for a sessions communicating with 1 host process.
s.lastProcessID++
m := make(map[node]struct{})
for _, a := range allowedReceivers {
m[a] = struct{}{}
}
proc := process{
messageID: 0,
subject: subject,
node: node(subject.ToNode),
processID: s.lastProcessID,
errorCh: errCh,
processKind: processKind,
//messageCh: make(chan Message),
messageID: 0,
subject: subject,
node: node(subject.ToNode),
processID: s.lastProcessID,
errorCh: errCh,
processKind: processKind,
allowedReceivers: m,
}
return proc
@ -296,7 +305,7 @@ func (s *server) messageDeliverNats(proc process, message Message) {
continue
}
}
log.Printf("info: publisher: received ACK for message: %s\n", msgReply.Data)
log.Printf("<--- publisher: received ACK for message: %s\n", msgReply.Data)
}
return
}
@ -311,7 +320,7 @@ func (s *server) messageDeliverNats(proc process, message Message) {
// the state of the message being processed, and then reply back to the
// correct sending process's reply, meaning so we ACK back to the correct
// publisher.
func (s *server) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg) {
func (s *server) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg, proc process) {
message := Message{}
@ -338,13 +347,36 @@ func (s *server) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
log.Printf("error: subscriberHandler: method type not available: %v\n", message.CommandOrEvent)
}
fmt.Printf("*** DEBUG: BEFORE CALLING HANDLER: ACK\n")
out, err := mf.handler(s, message, thisNode)
if err != nil {
// TODO: Send to error kernel ?
log.Printf("error: subscriberHandler: failed to execute event: %v\n", err)
out := []byte("not allowed from " + message.FromNode)
var err error
// TESTING: TO ALLOW RECEIVING ONLY FROM SPECIFIC HOSTS
_, arOK1 := proc.allowedReceivers[message.FromNode]
_, arOK2 := proc.allowedReceivers[message.FromNode]
if arOK1 || arOK2 {
out, err = mf.handler(s, message, thisNode)
if err != nil {
// TODO: Send to error kernel ?
log.Printf("error: subscriberHandler: failed to execute event: %v\n", err)
} else {
log.Printf("--- info: we don't allow receiving from: %v\n", message.FromNode)
}
}
// if message.FromNode != "central" {
// log.Printf("--- info: we don't allow receiving from: %v\n", message.FromNode)
//
// out, err = mf.handler(s, message, thisNode)
//
// if err != nil {
// // TODO: Send to error kernel ?
// log.Printf("error: subscriberHandler: failed to execute event: %v\n", err)
// }
// }
// Send a confirmation message back to the publisher
natsConn.Publish(msg.Reply, out)

View file

@ -16,7 +16,7 @@ func (s *server) subscribeMessages(proc process) {
// We start one handler per message received by using go routines here.
// This is for being able to reply back the current publisher who sent
// the message.
go s.subscriberHandler(s.natsConn, s.nodeName, msg)
go s.subscriberHandler(s.natsConn, s.nodeName, msg, proc)
})
if err != nil {
log.Printf("error: Subscribe failed: %v\n", err)
@ -28,7 +28,7 @@ func (s *server) subscribersStart() {
{
fmt.Printf("Starting shellCommand subscriber: %#v\n", s.nodeName)
sub := newSubject(ShellCommand, CommandACK, s.nodeName)
proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber)
proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber, []node{"central", "ship2"})
// fmt.Printf("*** %#v\n", proc)
go s.spawnWorkerProcess(proc)
}
@ -37,7 +37,7 @@ func (s *server) subscribersStart() {
{
fmt.Printf("Starting textlogging subscriber: %#v\n", s.nodeName)
sub := newSubject(TextLogging, EventACK, s.nodeName)
proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber)
proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber, []node{"central"})
// fmt.Printf("*** %#v\n", proc)
go s.spawnWorkerProcess(proc)
}
@ -46,7 +46,7 @@ func (s *server) subscribersStart() {
{
fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName)
sub := newSubject(SayHello, EventNACK, s.nodeName)
proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber)
proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"})
// fmt.Printf("*** %#v\n", proc)
go s.spawnWorkerProcess(proc)
}
@ -56,7 +56,7 @@ func (s *server) subscribersStart() {
{
fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName)
sub := newSubject(ErrorLog, EventNACK, "errorCentral")
proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber)
proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"})
// fmt.Printf("*** %#v\n", proc)
go s.spawnWorkerProcess(proc)
}