mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
removed errorCh from newProcess arguments
This commit is contained in:
parent
77b89cdfb9
commit
5fa3f593e7
7 changed files with 29 additions and 34 deletions
11
process.go
11
process.go
|
@ -42,13 +42,7 @@ type process struct {
|
|||
// NB: Might not be needed later on.
|
||||
node Node
|
||||
// The processID for the current process
|
||||
processID int
|
||||
// errorCh is the same channel the errorKernel uses to
|
||||
// read incomming errors. By having this channel available
|
||||
// within a process we can send errors to the error kernel,
|
||||
// the EK desided what to do, and sends the action about
|
||||
// what to do back to the process where the error came from.
|
||||
errorCh chan errorEvent
|
||||
processID int
|
||||
processKind processKind
|
||||
// Who are we allowed to receive from ?
|
||||
// allowedReceivers map[Node]struct{}
|
||||
|
@ -91,7 +85,7 @@ type process struct {
|
|||
|
||||
// prepareNewProcess will set the the provided values and the default
|
||||
// values for a process.
|
||||
func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errorEvent, processKind processKind, procFunc func() error, signatures *signatures) process {
|
||||
func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, processKind processKind, procFunc func() error, signatures *signatures) process {
|
||||
// create the initial configuration for a sessions communicating with 1 host process.
|
||||
processes.lastProcessID++
|
||||
|
||||
|
@ -104,7 +98,6 @@ func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, proc
|
|||
subject: subject,
|
||||
node: Node(configuration.NodeName),
|
||||
processID: processes.lastProcessID,
|
||||
errorCh: errCh,
|
||||
processKind: processKind,
|
||||
methodsAvailable: method.GetMethodsAvailable(),
|
||||
toRingbufferCh: toRingbufferCh,
|
||||
|
|
42
processes.go
42
processes.go
|
@ -94,21 +94,21 @@ func (p *processes) Start(proc process) {
|
|||
{
|
||||
log.Printf("Starting REQOpProcessList subscriber: %#v\n", proc.node)
|
||||
sub := newSubject(REQOpProcessList, string(proc.node))
|
||||
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil, p.Signatures)
|
||||
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, processKindSubscriber, nil, p.Signatures)
|
||||
go proc.spawnWorker(proc.processes, proc.natsConn)
|
||||
}
|
||||
|
||||
{
|
||||
log.Printf("Starting REQOpProcessStart subscriber: %#v\n", proc.node)
|
||||
sub := newSubject(REQOpProcessStart, string(proc.node))
|
||||
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil, p.Signatures)
|
||||
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, processKindSubscriber, nil, p.Signatures)
|
||||
go proc.spawnWorker(proc.processes, proc.natsConn)
|
||||
}
|
||||
|
||||
{
|
||||
log.Printf("Starting REQOpProcessStop subscriber: %#v\n", proc.node)
|
||||
sub := newSubject(REQOpProcessStop, string(proc.node))
|
||||
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil, p.Signatures)
|
||||
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, processKindSubscriber, nil, p.Signatures)
|
||||
go proc.spawnWorker(proc.processes, proc.natsConn)
|
||||
}
|
||||
|
||||
|
@ -220,7 +220,7 @@ func (s startup) subREQHttpGet(p process) {
|
|||
|
||||
log.Printf("Starting Http Get subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQHttpGet, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, s.Signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, s.Signatures)
|
||||
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
|
||||
|
@ -230,7 +230,7 @@ func (s startup) pubREQHello(p process) {
|
|||
log.Printf("Starting Hello Publisher: %#v\n", p.node)
|
||||
|
||||
sub := newSubject(REQHello, p.configuration.CentralNodeName)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, nil, s.Signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindPublisher, nil, s.Signatures)
|
||||
|
||||
// Define the procFunc to be used for the process.
|
||||
proc.procFunc = procFunc(
|
||||
|
@ -275,42 +275,42 @@ func (s startup) pubREQHello(p process) {
|
|||
func (s startup) subREQToConsole(p process) {
|
||||
log.Printf("Starting Text To Console subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQToConsole, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQTuiToConsole(p process) {
|
||||
log.Printf("Starting Tui To Console subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQTuiToConsole, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQCliCommand(p process) {
|
||||
log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQCliCommand, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQPong(p process) {
|
||||
log.Printf("Starting Pong subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQPong, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQPing(p process) {
|
||||
log.Printf("Starting Ping Request subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQPing, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQErrorLog(p process) {
|
||||
log.Printf("Starting REQErrorLog subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQErrorLog, "errorCentral")
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
|
@ -324,7 +324,7 @@ func (s startup) subREQErrorLog(p process) {
|
|||
func (s startup) subREQHello(p process) {
|
||||
log.Printf("Starting Hello subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQHello, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
|
||||
// The reason for running the say hello subscriber as a procFunc is that
|
||||
// a handler are not able to hold state, and we need to hold the state
|
||||
|
@ -361,7 +361,7 @@ func (s startup) subREQHello(p process) {
|
|||
func (s startup) subREQToFile(p process) {
|
||||
log.Printf("Starting text to file subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQToFile, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
@ -369,7 +369,7 @@ func (s startup) subREQToFile(p process) {
|
|||
func (s startup) subREQCopyFileFrom(p process) {
|
||||
log.Printf("Starting copy file from subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQCopyFileFrom, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
@ -377,7 +377,7 @@ func (s startup) subREQCopyFileFrom(p process) {
|
|||
func (s startup) subREQCopyFileTo(p process) {
|
||||
log.Printf("Starting copy file to subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQCopyFileTo, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
@ -385,7 +385,7 @@ func (s startup) subREQCopyFileTo(p process) {
|
|||
func (s startup) subREQToFileAppend(p process) {
|
||||
log.Printf("Starting text logging subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQToFileAppend, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
@ -393,7 +393,7 @@ func (s startup) subREQToFileAppend(p process) {
|
|||
func (s startup) subREQTailFile(p process) {
|
||||
log.Printf("Starting tail log files subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQTailFile, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
@ -401,7 +401,7 @@ func (s startup) subREQTailFile(p process) {
|
|||
func (s startup) subREQCliCommandCont(p process) {
|
||||
log.Printf("Starting cli command with continous delivery: %#v\n", p.node)
|
||||
sub := newSubject(REQCliCommandCont, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
@ -410,7 +410,7 @@ func (s startup) subREQRelay(p process) {
|
|||
nodeWithRelay := fmt.Sprintf("*.%v", p.node)
|
||||
log.Printf("Starting Relay: %#v\n", nodeWithRelay)
|
||||
sub := newSubject(REQRelay, string(nodeWithRelay))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
@ -418,7 +418,7 @@ func (s startup) subREQRelay(p process) {
|
|||
func (s startup) subREQRelayInitial(p process) {
|
||||
log.Printf("Starting Relay Initial: %#v\n", p.node)
|
||||
sub := newSubject(REQRelayInitial, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
@ -426,7 +426,7 @@ func (s startup) subREQRelayInitial(p process) {
|
|||
func (s startup) subREQToSocket(p process) {
|
||||
log.Printf("Starting write to socket subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQToSocket, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
|
|
@ -459,7 +459,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
|
|||
|
||||
// Create the process and start it.
|
||||
sub := newSubject(method, proc.configuration.NodeName)
|
||||
procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil, proc.signatures)
|
||||
procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, processKindSubscriber, nil, proc.signatures)
|
||||
go procNew.spawnWorker(proc.processes, proc.natsConn)
|
||||
|
||||
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||
|
|
|
@ -254,7 +254,7 @@ func (s *server) Start() {
|
|||
//
|
||||
// NB: The context of the initial process are set in processes.Start.
|
||||
sub := newSubject(REQInitial, s.nodeName)
|
||||
s.processInitial = newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, "", nil, s.signatures)
|
||||
s.processInitial = newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, "", nil, s.signatures)
|
||||
// Start all wanted subscriber processes.
|
||||
s.processes.Start(s.processInitial)
|
||||
|
||||
|
@ -479,7 +479,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
|||
// log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
|
||||
|
||||
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
|
||||
proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, s.signatures)
|
||||
proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, processKindPublisher, nil, s.signatures)
|
||||
|
||||
proc.spawnWorker(s.processes, s.natsConn)
|
||||
// log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID)
|
||||
|
|
1
signing/private.key
Normal file
1
signing/private.key
Normal file
|
@ -0,0 +1 @@
|
|||
hqSDUsuHUyRlBvHUL5KFThj6Cne3cSBzirGry3wu1Yj9z4SvsEjqXl5a4FXlAk5QDT1j/egYWZUxqGLte+j0pA
|
1
signing/public.key
Normal file
1
signing/public.key
Normal file
|
@ -0,0 +1 @@
|
|||
/c+Er7BI6l5eWuBV5QJOUA09Y/3oGFmVMahi7Xvo9KQ
|
|
@ -234,7 +234,7 @@ func checkREQErrorLogTest(stewardServer *server, conf *Configuration, t *testing
|
|||
ToNode: "somenode",
|
||||
}
|
||||
|
||||
p := newProcess(stewardServer.ctx, stewardServer.metrics, stewardServer.natsConn, stewardServer.processes, stewardServer.processInitial.toRingbufferCh, stewardServer.configuration, Subject{}, stewardServer.errorKernel.errorCh, processKindSubscriber, nil, stewardServer.signatures)
|
||||
p := newProcess(stewardServer.ctx, stewardServer.metrics, stewardServer.natsConn, stewardServer.processes, stewardServer.processInitial.toRingbufferCh, stewardServer.configuration, Subject{}, processKindSubscriber, nil, stewardServer.signatures)
|
||||
|
||||
stewardServer.errorKernel.errSend(p, m, fmt.Errorf("some error"))
|
||||
|
||||
|
|
Loading…
Reference in a new issue