mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 13:49:29 +00:00
rewrote so the processes map is based on the subject name instead of the node name
This commit is contained in:
parent
9fae25e19b
commit
0aa74de868
2 changed files with 31 additions and 17 deletions
|
@ -22,7 +22,7 @@ func main() {
|
|||
}
|
||||
|
||||
if *modePublisher {
|
||||
go s.RunPublisher()
|
||||
go s.PublisherStart()
|
||||
}
|
||||
|
||||
if *modeSubscriber {
|
||||
|
|
46
publisher.go
46
publisher.go
|
@ -53,7 +53,7 @@ type Message struct {
|
|||
type server struct {
|
||||
natsConn *nats.Conn
|
||||
// TODO: sessions should probably hold a slice/map of processes ?
|
||||
processes map[node]process
|
||||
processes map[subjectName]process
|
||||
// The last processID created
|
||||
lastProcessID int
|
||||
nodeName string
|
||||
|
@ -69,7 +69,7 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
|
|||
s := &server{
|
||||
nodeName: nodeName,
|
||||
natsConn: conn,
|
||||
processes: make(map[node]process),
|
||||
processes: make(map[subjectName]process),
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
@ -91,7 +91,7 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
|
|||
|
||||
}
|
||||
|
||||
func (s *server) RunPublisher() {
|
||||
func (s *server) PublisherStart() {
|
||||
// start the checking of files for input messages
|
||||
fileReadCh := make((chan []byte))
|
||||
go getMessagesFromFile("./", "inmsg.txt", fileReadCh)
|
||||
|
@ -114,9 +114,9 @@ func (s *server) RunPublisher() {
|
|||
method: "shellcommand",
|
||||
domain: "shell",
|
||||
}
|
||||
proc := s.prepareNewProcess(sub)
|
||||
proc := s.processPrepareNew(sub)
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
go s.spawnProcess(proc)
|
||||
go s.processSpawn(proc)
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -126,9 +126,9 @@ func (s *server) RunPublisher() {
|
|||
method: "shellcommand",
|
||||
domain: "shell",
|
||||
}
|
||||
proc := s.prepareNewProcess(sub)
|
||||
proc := s.processPrepareNew(sub)
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
go s.spawnProcess(proc)
|
||||
go s.processSpawn(proc)
|
||||
}
|
||||
|
||||
select {}
|
||||
|
@ -153,6 +153,12 @@ type subject struct {
|
|||
domain string
|
||||
}
|
||||
|
||||
type subjectName string
|
||||
|
||||
func (s subject) name() subjectName {
|
||||
return subjectName(fmt.Sprintf("%s.%s.%s.%s", s.node, s.messageType, s.method, s.domain))
|
||||
}
|
||||
|
||||
// process are represent the communication to one individual host
|
||||
type process struct {
|
||||
messageID int
|
||||
|
@ -168,11 +174,14 @@ type process struct {
|
|||
// errorCh is used to report errors from a process
|
||||
// NB: Implementing this as an int to report for testing
|
||||
errorCh chan string
|
||||
// messageCh are the channel where we put the message we want
|
||||
// a process to send
|
||||
messageCh chan Message
|
||||
}
|
||||
|
||||
// prepareNewProcess will set the the provided values and the default
|
||||
// values for a process.
|
||||
func (s *server) prepareNewProcess(subject subject) process {
|
||||
func (s *server) processPrepareNew(subject subject) process {
|
||||
// create the initial configuration for a sessions communicating with 1 host process.
|
||||
s.lastProcessID++
|
||||
proc := process{
|
||||
|
@ -181,6 +190,7 @@ func (s *server) prepareNewProcess(subject subject) process {
|
|||
node: node(subject.node),
|
||||
processID: s.lastProcessID,
|
||||
errorCh: make(chan string),
|
||||
messageCh: make(chan Message),
|
||||
}
|
||||
|
||||
return proc
|
||||
|
@ -189,11 +199,15 @@ func (s *server) prepareNewProcess(subject subject) process {
|
|||
// spawnProcess will spawn a new process. It will give the process
|
||||
// the next available ID, and also add the process to the processes
|
||||
// map.
|
||||
func (s *server) spawnProcess(proc process) {
|
||||
func (s *server) processSpawn(proc process) {
|
||||
mu.Lock()
|
||||
s.processes[proc.node] = proc
|
||||
s.processes[proc.subject.name()] = proc
|
||||
mu.Unlock()
|
||||
|
||||
for k, v := range s.processes {
|
||||
fmt.Printf("DEBUG: k=%v, v=%v \n", k, v)
|
||||
}
|
||||
|
||||
// Loop creating one new message every second to simulate getting new
|
||||
// messages to deliver.
|
||||
//
|
||||
|
@ -204,17 +218,17 @@ func (s *server) spawnProcess(proc process) {
|
|||
// messages from the message-pickup-process.
|
||||
for {
|
||||
m := getMessageToDeliver()
|
||||
m.ID = s.processes[proc.node].messageID
|
||||
m.ID = s.processes[proc.subject.name()].messageID
|
||||
messageDeliver(proc, m, s.natsConn)
|
||||
|
||||
// Increment the counter for the next message to be sent.
|
||||
proc.messageID++
|
||||
s.processes[proc.node] = proc
|
||||
s.processes[proc.subject.name()] = proc
|
||||
time.Sleep(time.Second * 1)
|
||||
|
||||
// NB: simulate that we get an error, and that we can send that
|
||||
// out of the process and receive it in another thread.
|
||||
s.processes[proc.node].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID)
|
||||
s.processes[proc.subject.name()].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID)
|
||||
|
||||
//fmt.Printf("%#v\n", s.processes[proc.node])
|
||||
}
|
||||
|
@ -237,11 +251,11 @@ func messageDeliver(proc process, message Message, natsConn *nats.Conn) {
|
|||
}
|
||||
|
||||
msg := &nats.Msg{
|
||||
Subject: fmt.Sprintf("%s.%s.%s.%s", proc.node, proc.subject.messageType, proc.subject.method, proc.subject.domain),
|
||||
Subject: string(proc.subject.name()),
|
||||
// Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "shellcommand"),
|
||||
// Structure of the reply message are:
|
||||
// reply.<nodename>.<message type>.<method>
|
||||
Reply: fmt.Sprintf("repply.%s.%s.%s.%s", proc.node, proc.subject.messageType, proc.subject.method, proc.subject.domain),
|
||||
Reply: fmt.Sprintf("reply.%s", proc.subject.name()),
|
||||
Data: dataPayload,
|
||||
}
|
||||
|
||||
|
@ -267,7 +281,7 @@ func messageDeliver(proc process, message Message, natsConn *nats.Conn) {
|
|||
// continue and resend if to reply received.
|
||||
msgReply, err := subReply.NextMsg(time.Second * 10)
|
||||
if err != nil {
|
||||
log.Printf("error: subRepl.NextMsg failed for node=%v pid=%v: %v\n", proc.node, proc.processID, err)
|
||||
log.Printf("error: subRepl.NextMsg failed for node=%v, subject=%v: %v\n", proc.node, proc.subject.name(), err)
|
||||
// did not receive a reply, continuing from top again
|
||||
continue
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue