1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

newSubject now uses the methods available map to figure out what kind of CommandOrEvent type it is

This commit is contained in:
postmannen 2021-04-03 07:33:03 +02:00
parent dc209d7ad3
commit f75aa73703
3 changed files with 25 additions and 15 deletions

View file

@ -4,6 +4,8 @@ import (
"bytes"
"encoding/gob"
"fmt"
"log"
"os"
)
// --- Message
@ -88,10 +90,18 @@ type Subject struct {
// newSubject will return a new variable of the type subject, and insert
// all the values given as arguments. It will also create the channel
// to receive new messages on the specific subject.
func newSubject(method Method, commandOrEvent CommandOrEvent, node string) Subject {
func newSubject(method Method, node string) Subject {
// Get the CommandOrEvent type for the Method.
ma := method.GetMethodsAvailable()
coe, ok := ma.methodhandlers[method]
if !ok {
log.Printf("error: no CommandOrEvent type specified for the method\n")
os.Exit(1)
}
return Subject{
ToNode: node,
CommandOrEvent: commandOrEvent,
CommandOrEvent: coe.getKind(),
Method: method,
messageCh: make(chan Message),
}

View file

@ -278,7 +278,7 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject
// by using the goto at the end redo the process for this specific message.
log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil)
// fmt.Printf("*** %#v\n", proc)
proc.spawnWorker(s)

View file

@ -16,7 +16,7 @@ func (s *server) ProcessesStart() {
if s.configuration.StartSubOpCommand.OK {
{
fmt.Printf("Starting OpCommand subscriber: %#v\n", s.nodeName)
sub := newSubject(OpCommand, CommandACK, s.nodeName)
sub := newSubject(OpCommand, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubOpCommand.Values, nil)
go proc.spawnWorker(s)
}
@ -24,7 +24,7 @@ func (s *server) ProcessesStart() {
{
fmt.Printf("Starting OpCommandRequest subscriber: %#v\n", s.nodeName)
sub := newSubject(OpCommandRequest, CommandACK, s.nodeName)
sub := newSubject(OpCommandRequest, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
go proc.spawnWorker(s)
}
@ -33,7 +33,7 @@ func (s *server) ProcessesStart() {
if s.configuration.StartSubCLICommand.OK {
{
fmt.Printf("Starting CLICommand subscriber: %#v\n", s.nodeName)
sub := newSubject(CLICommand, CommandACK, s.nodeName)
sub := newSubject(CLICommand, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommand.Values, nil)
// fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(s)
@ -44,7 +44,7 @@ func (s *server) ProcessesStart() {
if s.configuration.StartSubTextLogging.OK {
{
fmt.Printf("Starting textlogging subscriber: %#v\n", s.nodeName)
sub := newSubject(TextLogging, EventACK, s.nodeName)
sub := newSubject(TextLogging, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubTextLogging.Values, nil)
// fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(s)
@ -55,7 +55,7 @@ func (s *server) ProcessesStart() {
if s.configuration.StartSubSayHello.OK {
{
fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName)
sub := newSubject(SayHello, EventNACK, s.nodeName)
sub := newSubject(SayHello, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubSayHello.Values, nil)
proc.procFuncCh = make(chan Message)
@ -90,7 +90,7 @@ func (s *server) ProcessesStart() {
// Start a subscriber for ErrorLog messages
{
fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName)
sub := newSubject(ErrorLog, EventNACK, "errorCentral")
sub := newSubject(ErrorLog, "errorCentral")
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubErrorLog.Values, nil)
go proc.spawnWorker(s)
}
@ -100,7 +100,7 @@ func (s *server) ProcessesStart() {
if s.configuration.StartSubEchoRequest.OK {
{
fmt.Printf("Starting Echo Request subscriber: %#v\n", s.nodeName)
sub := newSubject(ECHORequest, EventACK, s.nodeName)
sub := newSubject(ECHORequest, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubEchoRequest.Values, nil)
go proc.spawnWorker(s)
}
@ -110,7 +110,7 @@ func (s *server) ProcessesStart() {
if s.configuration.StartSubEchoReply.OK {
{
fmt.Printf("Starting Echo Reply subscriber: %#v\n", s.nodeName)
sub := newSubject(ECHOReply, EventACK, s.nodeName)
sub := newSubject(ECHOReply, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubEchoReply.Values, nil)
go proc.spawnWorker(s)
}
@ -120,7 +120,7 @@ func (s *server) ProcessesStart() {
if s.configuration.StartSubCLICommandRequest.OK {
{
fmt.Printf("Starting CLICommand Request subscriber: %#v\n", s.nodeName)
sub := newSubject(CLICommandRequest, CommandACK, s.nodeName)
sub := newSubject(CLICommandRequest, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandRequest.Values, nil)
go proc.spawnWorker(s)
}
@ -130,7 +130,7 @@ func (s *server) ProcessesStart() {
if s.configuration.StartSubCLICommandRequestNOSEQ.OK {
{
fmt.Printf("Starting CLICommand NOSEQ Request subscriber: %#v\n", s.nodeName)
sub := newSubject(CLICommandRequestNOSEQ, CommandACK, s.nodeName)
sub := newSubject(CLICommandRequestNOSEQ, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandRequestNOSEQ.Values, nil)
go proc.spawnWorker(s)
}
@ -140,7 +140,7 @@ func (s *server) ProcessesStart() {
if s.configuration.StartSubCLICommandReply.OK {
{
fmt.Printf("Starting CLICommand Reply subscriber: %#v\n", s.nodeName)
sub := newSubject(CLICommandReply, EventACK, s.nodeName)
sub := newSubject(CLICommandReply, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandReply.Values, nil)
go proc.spawnWorker(s)
}
@ -155,7 +155,7 @@ func (s *server) ProcessesStart() {
if s.configuration.StartPubSayHello != 0 {
fmt.Printf("Starting SayHello Publisher: %#v\n", s.nodeName)
sub := newSubject(SayHello, EventNACK, s.configuration.CentralNodeName)
sub := newSubject(SayHello, s.configuration.CentralNodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil)
// Define the procFunc to be used for the process.