1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

Moved out the error handling from the process run

This commit is contained in:
postmannen 2021-02-01 13:41:04 +01:00
parent 223f472f5b
commit ec76bd36cd
2 changed files with 27 additions and 21 deletions

View file

@ -56,7 +56,7 @@ type server struct {
processes map[node]process processes map[node]process
// The last processID created // The last processID created
lastProcessID int lastProcessID int
thisNodeName string nodeName string
} }
// newServer will prepare and return a server type // newServer will prepare and return a server type
@ -66,23 +66,12 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
log.Printf("error: nats.Connect failed: %v\n", err) log.Printf("error: nats.Connect failed: %v\n", err)
} }
return &server{ s := &server{
thisNodeName: nodeName, nodeName: nodeName,
natsConn: conn, natsConn: conn,
processes: make(map[node]process), processes: make(map[node]process),
}, nil }
}
func (s *server) RunPublisher() {
proc := s.prepareNewProcess("btship1")
// fmt.Printf("*** %#v\n", proc)
go s.spawnProcess(proc)
proc = s.prepareNewProcess("btship2")
// fmt.Printf("*** %#v\n", proc)
go s.spawnProcess(proc)
// start the error handling
go func() { go func() {
for { for {
@ -98,6 +87,19 @@ func (s *server) RunPublisher() {
} }
}() }()
return s, nil
}
func (s *server) RunPublisher() {
proc := s.prepareNewProcess("btship1")
// fmt.Printf("*** %#v\n", proc)
go s.spawnProcess(proc)
proc = s.prepareNewProcess("btship2")
// fmt.Printf("*** %#v\n", proc)
go s.spawnProcess(proc)
select {} select {}
} }
@ -118,6 +120,8 @@ type process struct {
errorCh chan string errorCh chan string
} }
// prepareNewProcess will set the the provided values and the default
// values for a process.
func (s *server) prepareNewProcess(nodeName string) process { func (s *server) prepareNewProcess(nodeName string) process {
// create the initial configuration for a sessions communicating with 1 host. // create the initial configuration for a sessions communicating with 1 host.
s.lastProcessID++ s.lastProcessID++
@ -131,7 +135,9 @@ func (s *server) prepareNewProcess(nodeName string) process {
return proc return proc
} }
// spawnProcess will spawn a new process // spawnProcess will spawn a new process. It will give the process
// the next available ID, and also add the process to the processes
// map.
func (s *server) spawnProcess(proc process) { func (s *server) spawnProcess(proc process) {
mu.Lock() mu.Lock()
s.processes[proc.node] = proc s.processes[proc.node] = proc
@ -151,7 +157,7 @@ func (s *server) spawnProcess(proc process) {
// simulate that we get an error, and that we can send that // simulate that we get an error, and that we can send that
// out of the process and receive it in another thread. // out of the process and receive it in another thread.
// s.processes[proc.node].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID) s.processes[proc.node].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID)
//fmt.Printf("%#v\n", s.processes[proc.node]) //fmt.Printf("%#v\n", s.processes[proc.node])
} }

View file

@ -18,8 +18,8 @@ func (s *server) RunSubscriber() {
// Subscribe will start up a Go routine under the hood calling the // Subscribe will start up a Go routine under the hood calling the
// callback function specified when a new message is received. // callback function specified when a new message is received.
subject := fmt.Sprintf("%s.%s.%s", s.thisNodeName, "command", "shellcommand") subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand")
_, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.thisNodeName)) _, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.nodeName))
if err != nil { if err != nil {
fmt.Printf("error: Subscribe failed: %v\n", err) fmt.Printf("error: Subscribe failed: %v\n", err)
} }