mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
injected server directly procs
This commit is contained in:
parent
d69634a706
commit
f0c84f48db
7 changed files with 123 additions and 116 deletions
|
@ -40,6 +40,8 @@ NB: Nodes that don't have hello messages enabled and are not present in the hell
|
||||||
|
|
||||||
If a node is registered in the auth db but not present in the network we should throw a log message to the errorKernel so operators would be aware of such nodes.
|
If a node is registered in the auth db but not present in the network we should throw a log message to the errorKernel so operators would be aware of such nodes.
|
||||||
|
|
||||||
|
DECIDE: Hello messages should contain the public key ?
|
||||||
|
|
||||||
### Public Keys
|
### Public Keys
|
||||||
|
|
||||||
#### Central Store
|
#### Central Store
|
||||||
|
|
|
@ -211,7 +211,7 @@ func (s *server) readSocket() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the SAM struct to be picked up by the ring buffer.
|
// Send the SAM struct to be picked up by the ring buffer.
|
||||||
s.ringBufferBulkInCh <- sams
|
s.toRingBufferCh <- sams
|
||||||
|
|
||||||
}(conn)
|
}(conn)
|
||||||
}
|
}
|
||||||
|
@ -276,7 +276,7 @@ func (s *server) readTCPListener() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the SAM struct to be picked up by the ring buffer.
|
// Send the SAM struct to be picked up by the ring buffer.
|
||||||
s.ringBufferBulkInCh <- sam
|
s.toRingBufferCh <- sam
|
||||||
|
|
||||||
}(conn)
|
}(conn)
|
||||||
}
|
}
|
||||||
|
@ -320,7 +320,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the SAM struct to be picked up by the ring buffer.
|
// Send the SAM struct to be picked up by the ring buffer.
|
||||||
s.ringBufferBulkInCh <- sam
|
s.toRingBufferCh <- sam
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
44
process.go
44
process.go
|
@ -32,6 +32,9 @@ const (
|
||||||
// process holds all the logic to handle a message type and it's
|
// process holds all the logic to handle a message type and it's
|
||||||
// method, subscription/publishin messages for a subject, and more.
|
// method, subscription/publishin messages for a subject, and more.
|
||||||
type process struct {
|
type process struct {
|
||||||
|
// server
|
||||||
|
server *server
|
||||||
|
// messageID
|
||||||
messageID int
|
messageID int
|
||||||
// the subject used for the specific process. One process
|
// the subject used for the specific process. One process
|
||||||
// can contain only one sender on a message bus, hence
|
// can contain only one sender on a message bus, hence
|
||||||
|
@ -103,29 +106,30 @@ type process struct {
|
||||||
|
|
||||||
// prepareNewProcess will set the the provided values and the default
|
// prepareNewProcess will set the the provided values and the default
|
||||||
// values for a process.
|
// values for a process.
|
||||||
func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, processKind processKind, procFunc func() error, signatures *signatures) process {
|
func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind, procFunc func() error) process {
|
||||||
// create the initial configuration for a sessions communicating with 1 host process.
|
// create the initial configuration for a sessions communicating with 1 host process.
|
||||||
processes.lastProcessID++
|
server.processes.lastProcessID++
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
var method Method
|
var method Method
|
||||||
|
|
||||||
proc := process{
|
proc := process{
|
||||||
|
server: server,
|
||||||
messageID: 0,
|
messageID: 0,
|
||||||
subject: subject,
|
subject: subject,
|
||||||
node: Node(configuration.NodeName),
|
node: Node(server.configuration.NodeName),
|
||||||
processID: processes.lastProcessID,
|
processID: server.processes.lastProcessID,
|
||||||
processKind: processKind,
|
processKind: processKind,
|
||||||
methodsAvailable: method.GetMethodsAvailable(),
|
methodsAvailable: method.GetMethodsAvailable(),
|
||||||
toRingbufferCh: toRingbufferCh,
|
toRingbufferCh: server.toRingBufferCh,
|
||||||
configuration: configuration,
|
configuration: server.configuration,
|
||||||
processes: processes,
|
processes: server.processes,
|
||||||
natsConn: natsConn,
|
natsConn: server.natsConn,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
ctxCancel: cancel,
|
ctxCancel: cancel,
|
||||||
startup: newStartup(metrics, signatures),
|
startup: newStartup(server),
|
||||||
signatures: signatures,
|
signatures: server.signatures,
|
||||||
}
|
}
|
||||||
|
|
||||||
return proc
|
return proc
|
||||||
|
@ -138,7 +142,7 @@ func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, proc
|
||||||
//
|
//
|
||||||
// It will give the process the next available ID, and also add the
|
// It will give the process the next available ID, and also add the
|
||||||
// process to the processes map in the server structure.
|
// process to the processes map in the server structure.
|
||||||
func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
func (p process) spawnWorker() {
|
||||||
// We use the full name of the subject to identify a unique
|
// We use the full name of the subject to identify a unique
|
||||||
// process. We can do that since a process can only handle
|
// process. We can do that since a process can only handle
|
||||||
// one message queue.
|
// one message queue.
|
||||||
|
@ -153,7 +157,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
||||||
processName := processNameGet(p.subject.name(), p.processKind)
|
processName := processNameGet(p.subject.name(), p.processKind)
|
||||||
|
|
||||||
// Add prometheus metrics for the process.
|
// Add prometheus metrics for the process.
|
||||||
p.processes.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(processName)})
|
p.server.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(processName)})
|
||||||
|
|
||||||
// Start a publisher worker, which will start a go routine (process)
|
// Start a publisher worker, which will start a go routine (process)
|
||||||
// That will take care of all the messages for the subject it owns.
|
// That will take care of all the messages for the subject it owns.
|
||||||
|
@ -176,7 +180,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
go p.publishMessages(natsConn)
|
go p.publishMessages(p.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber worker, which will start a go routine (process)
|
// Start a subscriber worker, which will start a go routine (process)
|
||||||
|
@ -205,9 +209,9 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
||||||
p.processName = pn
|
p.processName = pn
|
||||||
|
|
||||||
// Add information about the new process to the started processes map.
|
// Add information about the new process to the started processes map.
|
||||||
procs.active.mu.Lock()
|
p.server.processes.active.mu.Lock()
|
||||||
procs.active.procNames[pn] = p
|
p.server.processes.active.procNames[pn] = p
|
||||||
procs.active.mu.Unlock()
|
p.server.processes.active.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// messageDeliverNats will create the Nats message with headers and payload.
|
// messageDeliverNats will create the Nats message with headers and payload.
|
||||||
|
@ -242,7 +246,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.processes.metrics.promNatsDeliveredTotal.Inc()
|
p.server.metrics.promNatsDeliveredTotal.Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -309,7 +313,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
|
|
||||||
subReply.Unsubscribe()
|
subReply.Unsubscribe()
|
||||||
|
|
||||||
p.processes.metrics.promNatsMessagesFailedACKsTotal.Inc()
|
p.server.metrics.promNatsMessagesFailedACKsTotal.Inc()
|
||||||
return
|
return
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
@ -317,7 +321,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
er := fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID)
|
er := fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID)
|
||||||
p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
|
||||||
p.processes.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
p.server.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
||||||
|
|
||||||
subReply.Unsubscribe()
|
subReply.Unsubscribe()
|
||||||
continue
|
continue
|
||||||
|
@ -328,7 +332,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
|
|
||||||
subReply.Unsubscribe()
|
subReply.Unsubscribe()
|
||||||
|
|
||||||
p.processes.metrics.promNatsDeliveredTotal.Inc()
|
p.server.metrics.promNatsDeliveredTotal.Inc()
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
124
processes.go
124
processes.go
|
@ -17,6 +17,8 @@ type processes struct {
|
||||||
// cancel func to send cancel signal to the subscriber processes context.
|
// cancel func to send cancel signal to the subscriber processes context.
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
// The active spawned processes
|
// The active spawned processes
|
||||||
|
// server
|
||||||
|
server *server
|
||||||
active procsMap
|
active procsMap
|
||||||
// mutex to lock the map
|
// mutex to lock the map
|
||||||
// mu sync.RWMutex
|
// mu sync.RWMutex
|
||||||
|
@ -39,13 +41,14 @@ type processes struct {
|
||||||
|
|
||||||
// newProcesses will prepare and return a *processes which
|
// newProcesses will prepare and return a *processes which
|
||||||
// is map containing all the currently running processes.
|
// is map containing all the currently running processes.
|
||||||
func newProcesses(ctx context.Context, metrics *metrics, tui *tui, errorKernel *errorKernel, configuration *Configuration, signatures *signatures) *processes {
|
func newProcesses(ctx context.Context, server *server) *processes {
|
||||||
p := processes{
|
p := processes{
|
||||||
|
server: server,
|
||||||
active: *newProcsMap(),
|
active: *newProcsMap(),
|
||||||
tui: tui,
|
tui: server.tui,
|
||||||
errorKernel: errorKernel,
|
errorKernel: server.errorKernel,
|
||||||
configuration: configuration,
|
configuration: server.configuration,
|
||||||
Signatures: signatures,
|
Signatures: server.signatures,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare the parent context for the subscribers.
|
// Prepare the parent context for the subscribers.
|
||||||
|
@ -59,8 +62,6 @@ func newProcesses(ctx context.Context, metrics *metrics, tui *tui, errorKernel *
|
||||||
p.ctx = ctx
|
p.ctx = ctx
|
||||||
p.cancel = cancel
|
p.cancel = cancel
|
||||||
|
|
||||||
p.metrics = metrics
|
|
||||||
|
|
||||||
return &p
|
return &p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,22 +95,22 @@ func (p *processes) Start(proc process) {
|
||||||
{
|
{
|
||||||
log.Printf("Starting REQOpProcessList subscriber: %#v\n", proc.node)
|
log.Printf("Starting REQOpProcessList subscriber: %#v\n", proc.node)
|
||||||
sub := newSubject(REQOpProcessList, string(proc.node))
|
sub := newSubject(REQOpProcessList, string(proc.node))
|
||||||
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, processKindSubscriber, nil, p.Signatures)
|
proc := newProcess(proc.ctx, p.server, sub, processKindSubscriber, nil)
|
||||||
go proc.spawnWorker(proc.processes, proc.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
log.Printf("Starting REQOpProcessStart subscriber: %#v\n", proc.node)
|
log.Printf("Starting REQOpProcessStart subscriber: %#v\n", proc.node)
|
||||||
sub := newSubject(REQOpProcessStart, string(proc.node))
|
sub := newSubject(REQOpProcessStart, string(proc.node))
|
||||||
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, processKindSubscriber, nil, p.Signatures)
|
proc := newProcess(proc.ctx, p.server, sub, processKindSubscriber, nil)
|
||||||
go proc.spawnWorker(proc.processes, proc.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
log.Printf("Starting REQOpProcessStop subscriber: %#v\n", proc.node)
|
log.Printf("Starting REQOpProcessStop subscriber: %#v\n", proc.node)
|
||||||
sub := newSubject(REQOpProcessStop, string(proc.node))
|
sub := newSubject(REQOpProcessStop, string(proc.node))
|
||||||
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, processKindSubscriber, nil, p.Signatures)
|
proc := newProcess(proc.ctx, p.server, sub, processKindSubscriber, nil)
|
||||||
go proc.spawnWorker(proc.processes, proc.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for textLogging messages
|
// Start a subscriber for textLogging messages
|
||||||
|
@ -217,12 +218,11 @@ func (p *processes) Stop() {
|
||||||
|
|
||||||
// Startup holds all the startup methods for subscribers.
|
// Startup holds all the startup methods for subscribers.
|
||||||
type startup struct {
|
type startup struct {
|
||||||
metrics *metrics
|
server *server
|
||||||
Signatures *signatures
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStartup(metrics *metrics, signatures *signatures) *startup {
|
func newStartup(server *server) *startup {
|
||||||
s := startup{metrics: metrics, Signatures: signatures}
|
s := startup{server}
|
||||||
|
|
||||||
return &s
|
return &s
|
||||||
}
|
}
|
||||||
|
@ -231,9 +231,9 @@ func (s startup) subREQHttpGet(p process) {
|
||||||
|
|
||||||
log.Printf("Starting Http Get subscriber: %#v\n", p.node)
|
log.Printf("Starting Http Get subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQHttpGet, string(p.node))
|
sub := newSubject(REQHttpGet, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, s.Signatures)
|
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,9 +241,9 @@ func (s startup) subREQHttpGetScheduled(p process) {
|
||||||
|
|
||||||
log.Printf("Starting Http Get Scheduled subscriber: %#v\n", p.node)
|
log.Printf("Starting Http Get Scheduled subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQHttpGetScheduled, string(p.node))
|
sub := newSubject(REQHttpGetScheduled, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, s.Signatures)
|
proc := newProcess(p.ctx, p.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,7 +251,7 @@ func (s startup) pubREQHello(p process) {
|
||||||
log.Printf("Starting Hello Publisher: %#v\n", p.node)
|
log.Printf("Starting Hello Publisher: %#v\n", p.node)
|
||||||
|
|
||||||
sub := newSubject(REQHello, p.configuration.CentralNodeName)
|
sub := newSubject(REQHello, p.configuration.CentralNodeName)
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindPublisher, nil, s.Signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil)
|
||||||
|
|
||||||
// Define the procFunc to be used for the process.
|
// Define the procFunc to be used for the process.
|
||||||
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {
|
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {
|
||||||
|
@ -289,49 +289,49 @@ func (s startup) pubREQHello(p process) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQToConsole(p process) {
|
func (s startup) subREQToConsole(p process) {
|
||||||
log.Printf("Starting Text To Console subscriber: %#v\n", p.node)
|
log.Printf("Starting Text To Console subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQToConsole, string(p.node))
|
sub := newSubject(REQToConsole, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQTuiToConsole(p process) {
|
func (s startup) subREQTuiToConsole(p process) {
|
||||||
log.Printf("Starting Tui To Console subscriber: %#v\n", p.node)
|
log.Printf("Starting Tui To Console subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQTuiToConsole, string(p.node))
|
sub := newSubject(REQTuiToConsole, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQCliCommand(p process) {
|
func (s startup) subREQCliCommand(p process) {
|
||||||
log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node)
|
log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQCliCommand, string(p.node))
|
sub := newSubject(REQCliCommand, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQPong(p process) {
|
func (s startup) subREQPong(p process) {
|
||||||
log.Printf("Starting Pong subscriber: %#v\n", p.node)
|
log.Printf("Starting Pong subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQPong, string(p.node))
|
sub := newSubject(REQPong, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQPing(p process) {
|
func (s startup) subREQPing(p process) {
|
||||||
log.Printf("Starting Ping Request subscriber: %#v\n", p.node)
|
log.Printf("Starting Ping Request subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQPing, string(p.node))
|
sub := newSubject(REQPing, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQErrorLog(p process) {
|
func (s startup) subREQErrorLog(p process) {
|
||||||
log.Printf("Starting REQErrorLog subscriber: %#v\n", p.node)
|
log.Printf("Starting REQErrorLog subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQErrorLog, "errorCentral")
|
sub := newSubject(REQErrorLog, "errorCentral")
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
// subREQHello is the handler that is triggered when we are receiving a hello
|
// subREQHello is the handler that is triggered when we are receiving a hello
|
||||||
|
@ -344,7 +344,7 @@ func (s startup) subREQErrorLog(p process) {
|
||||||
func (s startup) subREQHello(p process) {
|
func (s startup) subREQHello(p process) {
|
||||||
log.Printf("Starting Hello subscriber: %#v\n", p.node)
|
log.Printf("Starting Hello subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQHello, string(p.node))
|
sub := newSubject(REQHello, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
// The reason for running the say hello subscriber as a procFunc is that
|
// The reason for running the say hello subscriber as a procFunc is that
|
||||||
// a handler are not able to hold state, and we need to hold the state
|
// a handler are not able to hold state, and we need to hold the state
|
||||||
|
@ -370,101 +370,101 @@ func (s startup) subREQHello(p process) {
|
||||||
sayHelloNodes[m.FromNode] = struct{}{}
|
sayHelloNodes[m.FromNode] = struct{}{}
|
||||||
|
|
||||||
// update the prometheus metrics
|
// update the prometheus metrics
|
||||||
s.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes)))
|
s.server.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes)))
|
||||||
s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
|
s.server.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQToFile(p process) {
|
func (s startup) subREQToFile(p process) {
|
||||||
log.Printf("Starting text to file subscriber: %#v\n", p.node)
|
log.Printf("Starting text to file subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQToFile, string(p.node))
|
sub := newSubject(REQToFile, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQToFileNACK(p process) {
|
func (s startup) subREQToFileNACK(p process) {
|
||||||
log.Printf("Starting text to file subscriber: %#v\n", p.node)
|
log.Printf("Starting text to file subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQToFileNACK, string(p.node))
|
sub := newSubject(REQToFileNACK, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQCopyFileFrom(p process) {
|
func (s startup) subREQCopyFileFrom(p process) {
|
||||||
log.Printf("Starting copy file from subscriber: %#v\n", p.node)
|
log.Printf("Starting copy file from subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQCopyFileFrom, string(p.node))
|
sub := newSubject(REQCopyFileFrom, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQCopyFileTo(p process) {
|
func (s startup) subREQCopyFileTo(p process) {
|
||||||
log.Printf("Starting copy file to subscriber: %#v\n", p.node)
|
log.Printf("Starting copy file to subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQCopyFileTo, string(p.node))
|
sub := newSubject(REQCopyFileTo, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQToFileAppend(p process) {
|
func (s startup) subREQToFileAppend(p process) {
|
||||||
log.Printf("Starting text logging subscriber: %#v\n", p.node)
|
log.Printf("Starting text logging subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQToFileAppend, string(p.node))
|
sub := newSubject(REQToFileAppend, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQTailFile(p process) {
|
func (s startup) subREQTailFile(p process) {
|
||||||
log.Printf("Starting tail log files subscriber: %#v\n", p.node)
|
log.Printf("Starting tail log files subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQTailFile, string(p.node))
|
sub := newSubject(REQTailFile, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQCliCommandCont(p process) {
|
func (s startup) subREQCliCommandCont(p process) {
|
||||||
log.Printf("Starting cli command with continous delivery: %#v\n", p.node)
|
log.Printf("Starting cli command with continous delivery: %#v\n", p.node)
|
||||||
sub := newSubject(REQCliCommandCont, string(p.node))
|
sub := newSubject(REQCliCommandCont, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQRelay(p process) {
|
func (s startup) subREQRelay(p process) {
|
||||||
nodeWithRelay := fmt.Sprintf("*.%v", p.node)
|
nodeWithRelay := fmt.Sprintf("*.%v", p.node)
|
||||||
log.Printf("Starting Relay: %#v\n", nodeWithRelay)
|
log.Printf("Starting Relay: %#v\n", nodeWithRelay)
|
||||||
sub := newSubject(REQRelay, string(nodeWithRelay))
|
sub := newSubject(REQRelay, string(nodeWithRelay))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQRelayInitial(p process) {
|
func (s startup) subREQRelayInitial(p process) {
|
||||||
log.Printf("Starting Relay Initial: %#v\n", p.node)
|
log.Printf("Starting Relay Initial: %#v\n", p.node)
|
||||||
sub := newSubject(REQRelayInitial, string(p.node))
|
sub := newSubject(REQRelayInitial, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQToSocket(p process) {
|
func (s startup) subREQToSocket(p process) {
|
||||||
log.Printf("Starting write to socket subscriber: %#v\n", p.node)
|
log.Printf("Starting write to socket subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQToSocket, string(p.node))
|
sub := newSubject(REQToSocket, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQPublicKey(p process) {
|
func (s startup) subREQPublicKey(p process) {
|
||||||
log.Printf("Starting get Public Key subscriber: %#v\n", p.node)
|
log.Printf("Starting get Public Key subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQPublicKey, string(p.node))
|
sub := newSubject(REQPublicKey, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------
|
// ---------------------------------------------------------------
|
||||||
|
@ -480,7 +480,7 @@ func (p *processes) printProcessesMap() {
|
||||||
log.Printf("* proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v\n", proc.processKind, pName, proc.processID, proc.subject.name())
|
log.Printf("* proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v\n", proc.processKind, pName, proc.processID, proc.subject.name())
|
||||||
}
|
}
|
||||||
|
|
||||||
p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames)))
|
p.server.metrics.promProcessesTotal.Set(float64(len(p.active.procNames)))
|
||||||
|
|
||||||
p.active.mu.Unlock()
|
p.active.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
|
@ -473,8 +473,8 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
|
||||||
|
|
||||||
// Create the process and start it.
|
// Create the process and start it.
|
||||||
sub := newSubject(method, proc.configuration.NodeName)
|
sub := newSubject(method, proc.configuration.NodeName)
|
||||||
procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, processKindSubscriber, nil, proc.signatures)
|
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber, nil)
|
||||||
go procNew.spawnWorker(proc.processes, proc.natsConn)
|
go procNew.spawnWorker()
|
||||||
|
|
||||||
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||||
er := fmt.Errorf(txt)
|
er := fmt.Errorf(txt)
|
||||||
|
@ -564,7 +564,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove the prometheus label
|
// Remove the prometheus label
|
||||||
proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
|
proc.server.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
|
||||||
|
|
||||||
txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", toStopProc.processID, sub, message.ToNode)
|
txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", toStopProc.processID, sub, message.ToNode)
|
||||||
er := fmt.Errorf(txt)
|
er := fmt.Errorf(txt)
|
||||||
|
@ -1011,7 +1011,7 @@ func (m methodREQErrorLog) getKind() Event {
|
||||||
|
|
||||||
// Handle the writing of error logs.
|
// Handle the writing of error logs.
|
||||||
func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) {
|
func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
proc.processes.metrics.promErrorMessagesReceivedTotal.Inc()
|
proc.server.metrics.promErrorMessagesReceivedTotal.Inc()
|
||||||
|
|
||||||
// If it was a request type message we want to check what the initial messages
|
// If it was a request type message we want to check what the initial messages
|
||||||
// method, so we can use that in creating the file name to store the data.
|
// method, so we can use that in creating the file name to store the data.
|
||||||
|
|
27
server.go
27
server.go
|
@ -40,12 +40,12 @@ type server struct {
|
||||||
processes *processes
|
processes *processes
|
||||||
// The name of the node
|
// The name of the node
|
||||||
nodeName string
|
nodeName string
|
||||||
// ringBufferBulkInCh are the channel where new messages in a bulk
|
// toRingBufferCh are the channel where new messages in a bulk
|
||||||
// format (slice) are put into the system.
|
// format (slice) are put into the system.
|
||||||
//
|
//
|
||||||
// In general the ringbuffer will read this
|
// In general the ringbuffer will read this
|
||||||
// channel, unfold each slice, and put single messages on the buffer.
|
// channel, unfold each slice, and put single messages on the buffer.
|
||||||
ringBufferBulkInCh chan []subjectAndMessage
|
toRingBufferCh chan []subjectAndMessage
|
||||||
// errorKernel is doing all the error handling like what to do if
|
// errorKernel is doing all the error handling like what to do if
|
||||||
// an error occurs.
|
// an error occurs.
|
||||||
errorKernel *errorKernel
|
errorKernel *errorKernel
|
||||||
|
@ -148,15 +148,14 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
||||||
signatures := newSignatures(configuration, errorKernel)
|
signatures := newSignatures(configuration, errorKernel)
|
||||||
// fmt.Printf(" * DEBUG: newServer: signatures contains: %+v\n", signatures)
|
// fmt.Printf(" * DEBUG: newServer: signatures contains: %+v\n", signatures)
|
||||||
|
|
||||||
s := &server{
|
s := server{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
configuration: configuration,
|
configuration: configuration,
|
||||||
nodeName: configuration.NodeName,
|
nodeName: configuration.NodeName,
|
||||||
natsConn: conn,
|
natsConn: conn,
|
||||||
StewardSocket: stewardSocket,
|
StewardSocket: stewardSocket,
|
||||||
processes: newProcesses(ctx, metrics, tuiClient, errorKernel, configuration, signatures),
|
toRingBufferCh: make(chan []subjectAndMessage),
|
||||||
ringBufferBulkInCh: make(chan []subjectAndMessage),
|
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
version: version,
|
version: version,
|
||||||
tui: tuiClient,
|
tui: tuiClient,
|
||||||
|
@ -166,6 +165,8 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
||||||
centralAuth: newCentralAuth(),
|
centralAuth: newCentralAuth(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.processes = newProcesses(ctx, &s)
|
||||||
|
|
||||||
// Create the default data folder for where subscribers should
|
// Create the default data folder for where subscribers should
|
||||||
// write it's data, check if data folder exist, and create it if needed.
|
// write it's data, check if data folder exist, and create it if needed.
|
||||||
if _, err := os.Stat(configuration.SubscribersDataFolder); os.IsNotExist(err) {
|
if _, err := os.Stat(configuration.SubscribersDataFolder); os.IsNotExist(err) {
|
||||||
|
@ -181,7 +182,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
||||||
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
return s, nil
|
return &s, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -235,7 +236,7 @@ func (s *server) Start() {
|
||||||
s.metrics.promVersion.With(prometheus.Labels{"version": string(s.version)})
|
s.metrics.promVersion.With(prometheus.Labels{"version": string(s.version)})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := s.errorKernel.start(s.ringBufferBulkInCh)
|
err := s.errorKernel.start(s.toRingBufferCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("%v\n", err)
|
log.Printf("%v\n", err)
|
||||||
}
|
}
|
||||||
|
@ -272,7 +273,7 @@ func (s *server) Start() {
|
||||||
//
|
//
|
||||||
// NB: The context of the initial process are set in processes.Start.
|
// NB: The context of the initial process are set in processes.Start.
|
||||||
sub := newSubject(REQInitial, s.nodeName)
|
sub := newSubject(REQInitial, s.nodeName)
|
||||||
s.processInitial = newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, "", nil, s.signatures)
|
s.processInitial = newProcess(context.TODO(), s, sub, "", nil)
|
||||||
// Start all wanted subscriber processes.
|
// Start all wanted subscriber processes.
|
||||||
s.processes.Start(s.processInitial)
|
s.processes.Start(s.processInitial)
|
||||||
|
|
||||||
|
@ -287,7 +288,7 @@ func (s *server) Start() {
|
||||||
|
|
||||||
if s.configuration.EnableTUI {
|
if s.configuration.EnableTUI {
|
||||||
go func() {
|
go func() {
|
||||||
err := s.tui.Start(s.ctx, s.ringBufferBulkInCh)
|
err := s.tui.Start(s.ctx, s.toRingBufferCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("%v\n", err)
|
log.Printf("%v\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
@ -392,7 +393,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
||||||
const samValueBucket string = "samValueBucket"
|
const samValueBucket string = "samValueBucket"
|
||||||
const indexValueBucket string = "indexValueBucket"
|
const indexValueBucket string = "indexValueBucket"
|
||||||
|
|
||||||
s.ringBuffer = newringBuffer(s.ctx, s.metrics, s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.ringBufferBulkInCh, samValueBucket, indexValueBucket, s.errorKernel, s.processInitial)
|
s.ringBuffer = newringBuffer(s.ctx, s.metrics, s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.toRingBufferCh, samValueBucket, indexValueBucket, s.errorKernel, s.processInitial)
|
||||||
|
|
||||||
ringBufferInCh := make(chan subjectAndMessage)
|
ringBufferInCh := make(chan subjectAndMessage)
|
||||||
ringBufferOutCh := make(chan samDBValueAndDelivered)
|
ringBufferOutCh := make(chan samDBValueAndDelivered)
|
||||||
|
@ -405,7 +406,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
||||||
// we loop here, unfold the slice, and put single subjectAndMessages's on
|
// we loop here, unfold the slice, and put single subjectAndMessages's on
|
||||||
// the channel to the ringbuffer.
|
// the channel to the ringbuffer.
|
||||||
go func() {
|
go func() {
|
||||||
for sams := range s.ringBufferBulkInCh {
|
for sams := range s.toRingBufferCh {
|
||||||
for _, sam := range sams {
|
for _, sam := range sams {
|
||||||
ringBufferInCh <- sam
|
ringBufferInCh <- sam
|
||||||
}
|
}
|
||||||
|
@ -497,9 +498,9 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
||||||
// log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
|
// log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
|
||||||
|
|
||||||
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
|
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
|
||||||
proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, processKindPublisher, nil, s.signatures)
|
proc := newProcess(s.ctx, s, sub, processKindPublisher, nil)
|
||||||
|
|
||||||
proc.spawnWorker(s.processes, s.natsConn)
|
proc.spawnWorker()
|
||||||
// log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID)
|
// log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID)
|
||||||
|
|
||||||
// Now when the process is spawned we continue,
|
// Now when the process is spawned we continue,
|
||||||
|
|
|
@ -235,7 +235,7 @@ func checkREQErrorLogTest(stewardServer *server, conf *Configuration, t *testing
|
||||||
ToNode: "somenode",
|
ToNode: "somenode",
|
||||||
}
|
}
|
||||||
|
|
||||||
p := newProcess(stewardServer.ctx, stewardServer.metrics, stewardServer.natsConn, stewardServer.processes, stewardServer.processInitial.toRingbufferCh, stewardServer.configuration, Subject{}, processKindSubscriber, nil, stewardServer.signatures)
|
p := newProcess(stewardServer.ctx, stewardServer, Subject{}, processKindSubscriber, nil)
|
||||||
|
|
||||||
stewardServer.errorKernel.errSend(p, m, fmt.Errorf("some error"))
|
stewardServer.errorKernel.errSend(p, m, fmt.Errorf("some error"))
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue