From 82c420bc54a59037d0960054509087a173799985 Mon Sep 17 00:00:00 2001
From: postmannen <postmannen@gmail.com>
Date: Fri, 26 Feb 2021 15:11:20 +0100
Subject: [PATCH] ability to choose publishers on subscriber processes

---
 example/toShip1FromShip2.json |  11 +++++++
 example/toShip2FromShip1.json |  11 +++++++
 incommmingBuffer.db           | Bin 32768 -> 32768 bytes
 prometheus.go                 |   2 ++
 publisher.go                  |   2 +-
 server.go                     |  60 ++++++++++++++++++++++++++--------
 subscribers.go                |  10 +++---
 7 files changed, 76 insertions(+), 20 deletions(-)
 create mode 100644 example/toShip1FromShip2.json
 create mode 100644 example/toShip2FromShip1.json

diff --git a/example/toShip1FromShip2.json b/example/toShip1FromShip2.json
new file mode 100644
index 0000000..26ac4f4
--- /dev/null
+++ b/example/toShip1FromShip2.json
@@ -0,0 +1,11 @@
+[
+    {
+        
+        "toNode": "ship1",
+        "data": ["bash","-c","netstat -an|grep -i listen"],
+        "commandOrEvent":"CommandACK",
+        "method":"ShellCommand",
+        "timeout":3,
+        "retries":3
+    }
+]
\ No newline at end of file
diff --git a/example/toShip2FromShip1.json b/example/toShip2FromShip1.json
new file mode 100644
index 0000000..f0d30f5
--- /dev/null
+++ b/example/toShip2FromShip1.json
@@ -0,0 +1,11 @@
+[
+    {
+        
+        "toNode": "ship2",
+        "data": ["bash","-c","netstat -an|grep -i listen"],
+        "commandOrEvent":"CommandACK",
+        "method":"ShellCommand",
+        "timeout":3,
+        "retries":3
+    }
+]
\ No newline at end of file
diff --git a/incommmingBuffer.db b/incommmingBuffer.db
index 46610ce36f6e8a5e31c00bfa597c5f7024182360..961180e7071496e9073045dca00e686079a3474f 100644
GIT binary patch
delta 310
zcmZo@U}|V!nqVQp&Hw=+P<rQzZ66}r1-EWiG}ypDNdTcB6skbNHhAferkG<81q!81
zMi!gP6(;kGGlLbdK$WpVX*QrbZUH7g#>sjL?m)GZ%N?xvWEdG3jLfT*JYAHmjLawR
zcTiw7oczEc$l6dzM=3K!$;v=SDJ8KaQOPP=DJiiy11PGS3?%YWONvVpOB8ey^J>zI
zQVSGxGZk_&i%U}Tlwv18bTFGd(NRIbSVyTSwWKIBwOGl@xVE--W1ueIqyR3c`<S5a
dg1DA}0pz~>{EBQ)VF3n+1iCVo&58l{<pBkKK_CDC

delta 238
zcmZo@U}|V!nqV<m!GJ^H4+8`Q{I=E5h<MVsS<zqv|0DsRWB`Z2U#R3q7v@KgU%kEo
zkyI#UvM||Pt}vOOp9!ph1*(j7vtq(?{z(oxOg4;@^%UHJ$|jdPSn+W&GB8*eS1WnC
zC|Ow;Pu}mKz-To2fkTj{k&=#5W{Q%PfsRs2Vo9QsRkTu4VsQpgR5uw&loX|=D(LCy
zE5%NJ=wLQ^v7-X7k&aSPYDrOMYO#`);pPjD27D6}geC=WL0tlI4+8@;&?WcfHyb2y
G2mk<neLnpF

diff --git a/prometheus.go b/prometheus.go
index eb12586..7962e50 100644
--- a/prometheus.go
+++ b/prometheus.go
@@ -4,6 +4,7 @@ import (
 	"log"
 	"net"
 	"net/http"
+	"os"
 
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -101,6 +102,7 @@ func (s *server) startMetrics() {
 	n, err := net.Listen("tcp", s.metrics.hostAndPort)
 	if err != nil {
 		log.Printf("error: failed to open prometheus listen port: %v\n", err)
+		os.Exit(1)
 	}
 	m := http.NewServeMux()
 	m.Handle("/metrics", promhttp.Handler())
diff --git a/publisher.go b/publisher.go
index 7ee04d0..ae36b2e 100644
--- a/publisher.go
+++ b/publisher.go
@@ -80,7 +80,7 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM
 				log.Printf("info: did not find that specific subject, starting new process for subject: %v\n", subjName)
 
 				sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode)
-				proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindPublisher)
+				proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindPublisher, nil)
 				// fmt.Printf("*** %#v\n", proc)
 				go s.spawnWorkerProcess(proc)
 
diff --git a/server.go b/server.go
index 189162a..ff12d43 100644
--- a/server.go
+++ b/server.go
@@ -114,6 +114,7 @@ func (s *server) Start() {
 	// starting asks to subscribe to TextLogging events.
 	go s.subscriberServices.startTextLogging()
 
+	// if enabled, start the sayHello I'm here service at the given interval
 	if s.publisherServices.sayHelloPublisher.interval != 0 {
 		go s.publisherServices.sayHelloPublisher.start(s.newMessagesCh, node(s.nodeName))
 	}
@@ -177,21 +178,29 @@ type process struct {
 	// NB: Implementing this as an int to report for testing
 	errorCh     chan errProcess
 	processKind processKind
+	// Who are we allowed to receive from ?
+	allowedReceivers map[node]struct{}
 }
 
 // prepareNewProcess will set the the provided values and the default
 // values for a process.
-func (s *server) processPrepareNew(subject Subject, errCh chan errProcess, processKind processKind) process {
+func (s *server) processPrepareNew(subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node) process {
 	// create the initial configuration for a sessions communicating with 1 host process.
 	s.lastProcessID++
+
+	m := make(map[node]struct{})
+	for _, a := range allowedReceivers {
+		m[a] = struct{}{}
+	}
+
 	proc := process{
-		messageID:   0,
-		subject:     subject,
-		node:        node(subject.ToNode),
-		processID:   s.lastProcessID,
-		errorCh:     errCh,
-		processKind: processKind,
-		//messageCh: make(chan Message),
+		messageID:        0,
+		subject:          subject,
+		node:             node(subject.ToNode),
+		processID:        s.lastProcessID,
+		errorCh:          errCh,
+		processKind:      processKind,
+		allowedReceivers: m,
 	}
 
 	return proc
@@ -296,7 +305,7 @@ func (s *server) messageDeliverNats(proc process, message Message) {
 					continue
 				}
 			}
-			log.Printf("info: publisher: received ACK for message: %s\n", msgReply.Data)
+			log.Printf("<--- publisher: received ACK for message: %s\n", msgReply.Data)
 		}
 		return
 	}
@@ -311,7 +320,7 @@ func (s *server) messageDeliverNats(proc process, message Message) {
 // the state of the message being processed, and then reply back to the
 // correct sending process's reply, meaning so we ACK back to the correct
 // publisher.
-func (s *server) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg) {
+func (s *server) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg, proc process) {
 
 	message := Message{}
 
@@ -338,13 +347,36 @@ func (s *server) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
 			log.Printf("error: subscriberHandler: method type not available: %v\n", message.CommandOrEvent)
 		}
 		fmt.Printf("*** DEBUG: BEFORE CALLING HANDLER: ACK\n")
-		out, err := mf.handler(s, message, thisNode)
 
-		if err != nil {
-			// TODO: Send to error kernel ?
-			log.Printf("error: subscriberHandler: failed to execute event: %v\n", err)
+		out := []byte("not allowed from " + message.FromNode)
+		var err error
+
+		// TESTING: TO ALLOW RECEIVING ONLY FROM SPECIFIC HOSTS
+		_, arOK1 := proc.allowedReceivers[message.FromNode]
+		_, arOK2 := proc.allowedReceivers[message.FromNode]
+
+		if arOK1 || arOK2 {
+			out, err = mf.handler(s, message, thisNode)
+
+			if err != nil {
+				// TODO: Send to error kernel ?
+				log.Printf("error: subscriberHandler: failed to execute event: %v\n", err)
+			} else {
+				log.Printf("--- info: we don't allow receiving from: %v\n", message.FromNode)
+			}
 		}
 
+		// if message.FromNode != "central" {
+		// 	log.Printf("--- info: we don't allow receiving from: %v\n", message.FromNode)
+		//
+		// 	out, err = mf.handler(s, message, thisNode)
+		//
+		// 	if err != nil {
+		// 		// TODO: Send to error kernel ?
+		// 		log.Printf("error: subscriberHandler: failed to execute event: %v\n", err)
+		// 	}
+		// }
+
 		// Send a confirmation message back to the publisher
 		natsConn.Publish(msg.Reply, out)
 
diff --git a/subscribers.go b/subscribers.go
index d3472ce..7de9afb 100644
--- a/subscribers.go
+++ b/subscribers.go
@@ -16,7 +16,7 @@ func (s *server) subscribeMessages(proc process) {
 		// We start one handler per message received by using go routines here.
 		// This is for being able to reply back the current publisher who sent
 		// the message.
-		go s.subscriberHandler(s.natsConn, s.nodeName, msg)
+		go s.subscriberHandler(s.natsConn, s.nodeName, msg, proc)
 	})
 	if err != nil {
 		log.Printf("error: Subscribe failed: %v\n", err)
@@ -28,7 +28,7 @@ func (s *server) subscribersStart() {
 	{
 		fmt.Printf("Starting shellCommand subscriber: %#v\n", s.nodeName)
 		sub := newSubject(ShellCommand, CommandACK, s.nodeName)
-		proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber)
+		proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber, []node{"central", "ship2"})
 		// fmt.Printf("*** %#v\n", proc)
 		go s.spawnWorkerProcess(proc)
 	}
@@ -37,7 +37,7 @@ func (s *server) subscribersStart() {
 	{
 		fmt.Printf("Starting textlogging subscriber: %#v\n", s.nodeName)
 		sub := newSubject(TextLogging, EventACK, s.nodeName)
-		proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber)
+		proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber, []node{"central"})
 		// fmt.Printf("*** %#v\n", proc)
 		go s.spawnWorkerProcess(proc)
 	}
@@ -46,7 +46,7 @@ func (s *server) subscribersStart() {
 	{
 		fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName)
 		sub := newSubject(SayHello, EventNACK, s.nodeName)
-		proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber)
+		proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"})
 		// fmt.Printf("*** %#v\n", proc)
 		go s.spawnWorkerProcess(proc)
 	}
@@ -56,7 +56,7 @@ func (s *server) subscribersStart() {
 		{
 			fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName)
 			sub := newSubject(ErrorLog, EventNACK, "errorCentral")
-			proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber)
+			proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"})
 			// fmt.Printf("*** %#v\n", proc)
 			go s.spawnWorkerProcess(proc)
 		}