diff --git a/central_auth.go b/central_auth.go index 319205e..3a3a6ef 100644 --- a/central_auth.go +++ b/central_auth.go @@ -49,7 +49,7 @@ func newCentralAuth(configuration *Configuration, errorKernel *errorKernel) *cen // Only assign from storage to in memory map if the storage contained any values. if keys != nil { - c.nodePublicKeys.keyMap = keys + c.nodePublicKeys.KeyMap = keys for k, v := range keys { log.Printf("info: public keys db contains: %v, %v\n", k, []byte(v)) } @@ -64,7 +64,7 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) { c.nodePublicKeys.mu.Lock() // Check if a key for the current node already exists in the map. - existingKey, ok := c.nodePublicKeys.keyMap[msg.FromNode] + existingKey, ok := c.nodePublicKeys.KeyMap[msg.FromNode] if ok && existingKey == string(msg.Data) { fmt.Printf(" * key value for node %v is the same, doing nothing\n", msg.FromNode) @@ -73,7 +73,7 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) { } // New key - c.nodePublicKeys.keyMap[msg.FromNode] = string(msg.Data) + c.nodePublicKeys.KeyMap[msg.FromNode] = string(msg.Data) c.nodePublicKeys.mu.Unlock() // Add key to persistent storage. @@ -188,13 +188,13 @@ func (c *centralAuth) dbDumpPublicKey() (map[Node]string, error) { // The keys will be written to a k/v store for persistence. type nodePublicKeys struct { mu sync.Mutex - keyMap map[Node]string + KeyMap map[Node]string } // newNodePublicKeys will return a prepared type of nodePublicKeys. func newNodePublicKeys(configuration *Configuration) *nodePublicKeys { n := nodePublicKeys{ - keyMap: make(map[Node]string), + KeyMap: make(map[Node]string), } return &n diff --git a/configuration_flags.go b/configuration_flags.go index 46fc5f6..1261552 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -41,6 +41,8 @@ type Configuration struct { NatsReconnectJitter int // NatsReconnectJitterTLS in seconds NatsReconnectJitterTLS int + // PublicKeysGetInterval in seconds + PublicKeysGetInterval int // The number of the profiling port ProfilingPort string // host and port for prometheus listener, e.g. localhost:2112 @@ -87,8 +89,13 @@ type Configuration struct { // Make the current node send hello messages to central at given interval in seconds StartPubREQHello int + // Publisher for asking central for public keys + StartPubREQPublicKeysGet bool + // Subscriber for receiving reqests to get public keys registered on central + StartSubREQPublicKeysGet bool + // Subscriber for receiving updates of public keys from central + StartSubREQPublicKeysPut bool // Start the central error logger. - // Takes a comma separated string of nodes to receive from or "*" for all nodes. StartSubREQErrorLog bool // Subscriber for hello messages StartSubREQHello bool @@ -139,6 +146,7 @@ type ConfigurationFromFile struct { NatsConnectRetryInterval *int NatsReconnectJitter *int NatsReconnectJitterTLS *int + PublicKeysGetInterval *int ProfilingPort *string PromHostAndPort *string DefaultMessageTimeout *int @@ -161,6 +169,9 @@ type ConfigurationFromFile struct { EnableDebug *bool StartPubREQHello *int + StartPubREQPublicKeysGet *bool + StartSubREQPublicKeysGet *bool + StartSubREQPublicKeysPut *bool StartSubREQErrorLog *bool StartSubREQHello *bool StartSubREQToFileAppend *bool @@ -200,6 +211,7 @@ func newConfigurationDefaults() Configuration { NatsConnectRetryInterval: 10, NatsReconnectJitter: 100, NatsReconnectJitterTLS: 1, + PublicKeysGetInterval: 60, ProfilingPort: "", PromHostAndPort: "", DefaultMessageTimeout: 10, @@ -222,6 +234,9 @@ func newConfigurationDefaults() Configuration { EnableDebug: false, StartPubREQHello: 30, + StartPubREQPublicKeysGet: true, + StartSubREQPublicKeysGet: false, + StartSubREQPublicKeysPut: true, StartSubREQErrorLog: false, StartSubREQHello: true, StartSubREQToFileAppend: true, @@ -308,6 +323,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.NatsReconnectJitterTLS = *cf.NatsReconnectJitterTLS } + if cf.PublicKeysGetInterval == nil { + conf.PublicKeysGetInterval = cd.PublicKeysGetInterval + } else { + conf.PublicKeysGetInterval = *cf.PublicKeysGetInterval + } if cf.ProfilingPort == nil { conf.ProfilingPort = cd.ProfilingPort } else { @@ -416,6 +436,21 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.StartPubREQHello = *cf.StartPubREQHello } + if cf.StartPubREQPublicKeysGet == nil { + conf.StartPubREQPublicKeysGet = cd.StartPubREQPublicKeysGet + } else { + conf.StartPubREQPublicKeysGet = *cf.StartPubREQPublicKeysGet + } + if cf.StartSubREQPublicKeysGet == nil { + conf.StartSubREQPublicKeysGet = cd.StartSubREQPublicKeysGet + } else { + conf.StartSubREQPublicKeysGet = *cf.StartSubREQPublicKeysGet + } + if cf.StartSubREQPublicKeysPut == nil { + conf.StartSubREQPublicKeysPut = cd.StartSubREQPublicKeysPut + } else { + conf.StartSubREQPublicKeysPut = *cf.StartSubREQPublicKeysPut + } if cf.StartSubREQErrorLog == nil { conf.StartSubREQErrorLog = cd.StartSubREQErrorLog } else { @@ -540,6 +575,7 @@ func (c *Configuration) CheckFlags() error { flag.IntVar(&c.NatsConnectRetryInterval, "natsConnectRetryInterval", fc.NatsConnectRetryInterval, "default nats retry connect interval in seconds.") flag.IntVar(&c.NatsReconnectJitter, "natsReconnectJitter", fc.NatsReconnectJitter, "default nats ReconnectJitter interval in milliseconds.") flag.IntVar(&c.NatsReconnectJitterTLS, "natsReconnectJitterTLS", fc.NatsReconnectJitterTLS, "default nats ReconnectJitterTLS interval in seconds.") + flag.IntVar(&c.PublicKeysGetInterval, "publicKeysGetInterval", fc.PublicKeysGetInterval, "default interval in seconds for asking the central for public keys") flag.StringVar(&c.ProfilingPort, "profilingPort", fc.ProfilingPort, "The number of the profiling port") flag.StringVar(&c.PromHostAndPort, "promHostAndPort", fc.PromHostAndPort, "host and port for prometheus listener, e.g. localhost:2112") flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", fc.DefaultMessageTimeout, "default message timeout in seconds. This can be overridden on the message level") @@ -563,6 +599,9 @@ func (c *Configuration) CheckFlags() error { flag.IntVar(&c.StartPubREQHello, "startPubREQHello", fc.StartPubREQHello, "Make the current node send hello messages to central at given interval in seconds") + flag.BoolVar(&c.StartPubREQPublicKeysGet, "startPubREQPublicKeysGet", fc.StartPubREQPublicKeysGet, "true/false") + flag.BoolVar(&c.StartSubREQPublicKeysGet, "startSubREQPublicKeysGet", fc.StartSubREQPublicKeysGet, "true/false") + flag.BoolVar(&c.StartSubREQPublicKeysPut, "startSubREQPublicKeysPut", fc.StartSubREQPublicKeysPut, "true/false") flag.BoolVar(&c.StartSubREQErrorLog, "startSubREQErrorLog", fc.StartSubREQErrorLog, "true/false") flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", fc.StartSubREQHello, "true/false") flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false") diff --git a/doc/concept/auth/REQPublicKeysGet.drawio b/doc/concept/auth/REQPublicKeysGet.drawio new file mode 100644 index 0000000..9ca9a4b --- /dev/null +++ b/doc/concept/auth/REQPublicKeysGet.drawio @@ -0,0 +1 @@ +5Vjfc9soEP5rNHP34I5+26+Jk7SdXO/Spp0mj1hCElMkXIRjK3/9LQJZQih1k8ZNr/figV12ge9bdtdygmW5e83RunjHUkwd3013TnDm+P4icuFXCholiD1fCXJOUiXyesE1ucdaqO3yDUlxbSwUjFFB1qYwYVWFE2HIEOdsay7LGDV3XaMcW4LrBFFb+pmkotBSz3V7xRtM8kKMLrxCyZecs02l96tYhZWmRJ0bvbQuUMq2A1Fw7gRLzphQo3K3xFSi2iGm7C4e0O6PzHElvsfg682pn324fH9/e7P41NCw8P7KZl6g3NwhutFY6NOKpgOnvR6WXlwnON0WRODrNUqkdgvRALJClBRmHgxTVBftWjnJWCU02b601XthLvDuwVt4e2wg2jArseANLNl1kRUpEx1pHbrbnjZ/oWXFgLFwroVIh0q+d91jBgMN22Mg9P9jEO6j+iUgdOf1RXJB08XZ23/ug0v05iRezuLDCOIqPZEPHWYJRXVNEhM0E+EHIPFdnBqJwMZoAEI0gUEn45giQe7M9DGFi97hihE4yZ6CyDMpiIMRtDXb8ARrq+GjHjnywwOOBOI5Fpajlqb9tZ/O3Pz/xpwFePxE5qwQGDt6Puam05bF3NVmRQmkHi4J4CzBNZzkxPFjChCerkAc53L04fx9uzS5xM1rLOwFtoTjNW3eYVGwtHXpDn1cbSZ8SDTcGkPBT+VNKoE5HPUbidU7nFifI3eOWPOCieQZT4Tf0cqPXX3+hu7MAgquLEw0asHZF7xklHGQtI0LlBtC6UiEKMkr+XixJAEEEkACvdOJVpQkTeU2k/Cbr/sI1cx3Q4ORyCZkio9xnnw2PuyGagk34ROx+9tSMg/cX4qS0KLkerOqE05WPyvXfWQXhAKxOpW/ZAaL/VEGCycy2HyCnnF9ejZ6IoseC/UfainGEY53RNzolXJ8K9e9ivTsbKfN2knTTeAJNzfDycBKTnuzdtbZTfYuk/2N6hsOF2zVFhyK9pdqj8JRffTdJ7ZHYTwK0+jntkf2X5J6mDN+l/5lKjdPJuejvf6pfxAKzHqNKgOt+OtGfiw5TVQ1lLmV56s/JNKOD7u7g9Gfjq5mrnz+swyVhDbKBhyhct0qgyCUEGN6h2X9tDSmk7pNItIFQL4b6dQppbJivJRUD9VbDaXUh+qcrZJiAZExg6smpMon7WWbMNMVX6p10TfUBIKo0u6HR2uV0HxUdQZOO/fyE5VesGU8NXcfmvdft2YjzP0o2mM9GvfIp6ReU6RRJxUlg40zypAYHqgjd6ryDl4YxJiKi678HqU09OXgdlAoDpQG71il4WDKj7/55uEoXmi98iMWgXk0/kb3xCIQLMxsFXxnEYAIQM1g2VouqB9x4IXxKRUGyuPIunPPsqzGj6xCMO0//arl/Zf14Pxf \ No newline at end of file diff --git a/doc/concept/auth/~$REQPublicKeysGet.drawio.dtmp b/doc/concept/auth/~$REQPublicKeysGet.drawio.dtmp new file mode 100644 index 0000000..45672e7 --- /dev/null +++ b/doc/concept/auth/~$REQPublicKeysGet.drawio.dtmp @@ -0,0 +1 @@ +5Vjfc9soEP5rNNN7cEe/7dfESdpO2ru06c0lj1hCElMkXIRjK3/9LQJZQih1ksaX3vTFA7vLAt+37K7lBMty946jdfGJpZg6vpvunODM8f1F5MKvFDRKEHu+EuScpErk9YJrco+1UK/LNyTFtWEoGKOCrE1hwqoKJ8KQIc7Z1jTLGDV3XaMcW4LrBFFb+g9JRaGlnuv2iveY5IUYXXiFkm85Z5tK71exCitNiTo32rQuUMq2A1Fw7gRLzphQo3K3xFSi2iGm1l08oN0fmeNKPGbB95tTP/ty+fn+9mbxd0PDwvuYzbxAublDdKOx0KcVTQdOez0svbhOcLotiMDXa5RI7RaiAWSFKCnMPBimqC5aWznJWCU02b5cq/fCXODdg7fw9thAtGFWYsEbMNl1kRWpJTrSOnS3PW3+QsuKAWPhXAuRDpV877rHDAYatqdA6P/PINxH9WtA6M7ri+SCpouzD3/dB5fo/Um8nMWHEcRVeiIfOswSiuqaJCZoJsIPQOK7ODUSgY3RAIRoAoNOxjFFgtyZ6WMKF73DFSNwkj0FkWdSEAcjaGu24QnWq4aPeuTIDw84EojnWFiOWpr2134+c/PfjTkL8PiZzFkhMHb0csxNpy2LuavNihJIPVwSwFmCazjJiePHFCA8XYE4zuXoy/nn1jS5xM07LGwDeRO3xlCsU3mKSmAO29h2toTjNW0+YVGwtN3aHe51tRE/SKze4cT6ErlzxJoXTCTPeCL8jlZ+7OrzJ3RnFlBwZWGiUQvOvuElo4yDpG1coNwQSkciREleyceLJZEgkAAS6J1OtKIkaSq3mYTffN1HqGa+GxqMRDYhU3yM8+SL8WE3VEu4CZfx/7tQMg/cX4qS0KLkerOqE05WP5/rHpfDvrILQoHY6vUzWOyPMlg4kcHmE/SM69OL0RNZ9Fio/1RLMY5wvCPiRlvK8a20exvp2dlOL2snTTeBJ9zcDCeDVXLaL2tn3brJ3mWyv1F9w+GCrdqCQ9H+Wu1ROKqPvvvM9iiMR2Ea/bftkf2XpB7mDKsveWwP9Iv1L1O5eTI5H+31T/2DUGDWa1QZaMXfN/JjyWmiqqHMrTxfvZFIOz7s7g5Gfzi6mrny+c8yVBLaqDXgCJXrVhkEoYQY0zss66elMZ3UbRKRLgDy3UinTimVFeOlpHqo3moopT5U52yVFAuIjBlcNSFVPrletgkzXfGlWhd9Q00giCrtfni0VgnNR1Vn4LRzLz9RaYMt46m5+3B5/3VrNsLcj6I91qNxj3xK6jVFGnVSUTLYOKMMieGBOnKnKu/ghUGMqbjoyu9RSkNfDm4HheJAafCOVRoOpvz4h28ejuKF1is/YhGYR+NvdM8sAsHCzFbBI4sARABqBmZraVA/4cAL41MqDJTH0erOPcuyGj+xCsG0//SrzPsv68H5vw== \ No newline at end of file diff --git a/process.go b/process.go index d23ff89..45a0b8f 100644 --- a/process.go +++ b/process.go @@ -101,6 +101,8 @@ type process struct { startup *startup // Signatures signatures *signatures + // centralAuth + centralAuth *centralAuth // errorKernel errorKernel *errorKernel // metrics @@ -133,6 +135,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin ctxCancel: cancel, startup: newStartup(server), signatures: server.signatures, + centralAuth: server.centralAuth, errorKernel: server.errorKernel, metrics: server.metrics, } diff --git a/processes.go b/processes.go index 6923963..9a8a9fe 100644 --- a/processes.go +++ b/processes.go @@ -114,62 +114,50 @@ func (p *processes) Start(proc process) { go proc.spawnWorker() } - // Start a subscriber for textLogging messages if proc.configuration.StartSubREQToFileAppend { proc.startup.subREQToFileAppend(proc) } - // Start a subscriber for text to file messages if proc.configuration.StartSubREQToFile { proc.startup.subREQToFile(proc) } - // Start a subscriber for text to file messages if proc.configuration.StartSubREQToFileNACK { proc.startup.subREQToFileNACK(proc) } - // Start a subscriber for reading file to copy if proc.configuration.StartSubREQCopyFileFrom { proc.startup.subREQCopyFileFrom(proc) } - // Start a subscriber for writing copied file to disk if proc.configuration.StartSubREQCopyFileTo { proc.startup.subREQCopyFileTo(proc) } - // Start a subscriber for Hello messages if proc.configuration.StartSubREQHello { proc.startup.subREQHello(proc) } if proc.configuration.StartSubREQErrorLog { - // Start a subscriber for REQErrorLog messages proc.startup.subREQErrorLog(proc) } - // Start a subscriber for Ping Request messages if proc.configuration.StartSubREQPing { proc.startup.subREQPing(proc) } - // Start a subscriber for REQPong messages if proc.configuration.StartSubREQPong { proc.startup.subREQPong(proc) } - // Start a subscriber for REQCliCommand messages if proc.configuration.StartSubREQCliCommand { proc.startup.subREQCliCommand(proc) } - // Start a subscriber for CLICommandReply messages if proc.configuration.StartSubREQToConsole { proc.startup.subREQToConsole(proc) } - // Start a subscriber for CLICommandReply messages if proc.configuration.EnableTUI { proc.startup.subREQTuiToConsole(proc) } @@ -178,7 +166,18 @@ func (p *processes) Start(proc process) { proc.startup.pubREQHello(proc) } - // Start a subscriber for Http Get Requests + if proc.configuration.StartPubREQPublicKeysGet { + proc.startup.pubREQPublicKeysGet(proc) + } + + if proc.configuration.StartSubREQPublicKeysGet { + proc.startup.subREQPublicKeysGet(proc) + } + + if proc.configuration.StartSubREQPublicKeysPut { + proc.startup.subREQPublicKeysPut(proc) + } + if proc.configuration.StartSubREQHttpGet { proc.startup.subREQHttpGet(proc) } @@ -300,6 +299,68 @@ func (s startup) pubREQHello(p process) { go proc.spawnWorker() } +// pubREQPublicKeysGet defines the startup of a publisher that will send REQPublicKeysGet +// to central server and ask for publics keys, and to get them deliver back with a request +// of type pubREQPublicKeysPut. +func (s startup) pubREQPublicKeysGet(p process) { + log.Printf("Starting PublicKeysGet Publisher: %#v\n", p.node) + + sub := newSubject(REQPublicKeysGet, p.configuration.CentralNodeName) + proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil) + + // Define the procFunc to be used for the process. + proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { + // TODO: replace this with a separate timer for the request type. + ticker := time.NewTicker(time.Second * time.Duration(p.configuration.PublicKeysGetInterval)) + for { + + m := Message{ + FileName: "publickeysget.log", + Directory: "publickeysget", + ToNode: Node(p.configuration.CentralNodeName), + FromNode: Node(p.node), + // Data: []byte(d), + Method: REQPublicKeysGet, + ReplyMethod: REQPublicKeysPut, + ACKTimeout: proc.configuration.DefaultMessageTimeout, + Retries: 1, + } + + sam, err := newSubjectAndMessage(m) + if err != nil { + // In theory the system should drop the message before it reaches here. + p.errorKernel.errSend(p, m, err) + log.Printf("error: ProcessesStart: %v\n", err) + } + proc.toRingbufferCh <- []subjectAndMessage{sam} + + select { + case <-ticker.C: + case <-ctx.Done(): + er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name()) + // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + return nil + } + } + } + go proc.spawnWorker() +} + +func (s startup) subREQPublicKeysGet(p process) { + log.Printf("Starting Public keys get subscriber: %#v\n", p.node) + sub := newSubject(REQPublicKeysGet, string(p.node)) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() +} + +func (s startup) subREQPublicKeysPut(p process) { + log.Printf("Starting Public keys put subscriber: %#v\n", p.node) + sub := newSubject(REQPublicKeysPut, string(p.node)) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() +} + func (s startup) subREQToConsole(p process) { log.Printf("Starting Text To Console subscriber: %#v\n", p.node) sub := newSubject(REQToConsole, string(p.node)) @@ -377,7 +438,7 @@ func (s startup) subREQHello(p process) { s.centralAuth.addPublicKey(proc, m) // update the prometheus metrics - s.metrics.promHelloNodesTotal.Set(float64(len(s.server.centralAuth.nodePublicKeys.keyMap))) + s.metrics.promHelloNodesTotal.Set(float64(len(s.server.centralAuth.nodePublicKeys.KeyMap))) s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() } diff --git a/requests.go b/requests.go index 846a332..707ff53 100644 --- a/requests.go +++ b/requests.go @@ -36,6 +36,7 @@ import ( "bufio" "bytes" "context" + "encoding/json" "fmt" "io" "net/http" @@ -133,8 +134,12 @@ const ( REQRelayInitial Method = "REQRelayInitial" // REQNone is used when there should be no reply. REQNone Method = "REQNone" - // REQPublicKey will get the public ed25519 certificate from a node. + // REQPublicKey will get the public ed25519 key from a node. REQPublicKey Method = "REQPublicKey" + // REQPublicKeysGet will get all the public keys from central. + REQPublicKeysGet Method = "REQPublicKeysGet" + // REQPublicKeysPut will put all the public received from central. + REQPublicKeysPut Method = "REQPublicKeysPut" ) // The mapping of all the method constants specified, what type @@ -219,6 +224,12 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQPublicKey: methodREQPublicKey{ event: EventACK, }, + REQPublicKeysGet: methodREQPublicKeysGet{ + event: EventNACK, + }, + REQPublicKeysPut: methodREQPublicKeysPut{ + event: EventNACK, + }, }, } @@ -332,6 +343,7 @@ func newReplyMessage(proc process, message Message, outData []byte) { // Create a new message for the reply, and put it on the // ringbuffer to be published. + // TODO: Check that we still got all the fields present that are needed here. newMsg := Message{ ToNode: message.FromNode, FromNode: message.ToNode, @@ -2025,6 +2037,115 @@ func (m methodREQPublicKey) handler(proc process, message Message, node string) // ---- +type methodREQPublicKeysGet struct { + event Event +} + +func (m methodREQPublicKeysGet) getKind() Event { + return m.event +} + +// Handler to get all the public ed25519 keys from a central server. +func (m methodREQPublicKeysGet) handler(proc process, message Message, node string) ([]byte, error) { + // Get a context with the timeout specified in message.MethodTimeout. + + // TODO: + // - Since this is implemented as a NACK message we could implement a + // metric thats shows the last time a node did a key request. + // - We could also implement a metrics on the receiver showing the last + // time a node had done an update. + + ctx, _ := getContextForMethodTimeout(proc.ctx, message) + + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + outCh := make(chan []byte) + + go func() { + select { + case <-ctx.Done(): + // TODO: Should we receive a hash of he current keys from the node here + // to verify if we need to update or not ? + case outCh <- []byte{}: + } + }() + + select { + case <-ctx.Done(): + // case out := <-outCh: + case <-outCh: + b, err := json.Marshal(proc.centralAuth.nodePublicKeys.KeyMap) + if err != nil { + er := fmt.Errorf("error: REQPublicKeysGet, failed to marshal keys map: %v", err) + proc.errorKernel.errSend(proc, message, er) + } + + newReplyMessage(proc, message, b) + } + }() + + // NB: We're not sending an ACK message for this request type. + return nil, nil +} + +// ---- + +type methodREQPublicKeysPut struct { + event Event +} + +func (m methodREQPublicKeysPut) getKind() Event { + return m.event +} + +// Handler to put the public key replies received from a central server. +func (m methodREQPublicKeysPut) handler(proc process, message Message, node string) ([]byte, error) { + // Get a context with the timeout specified in message.MethodTimeout. + + // TODO: + // - Since this is implemented as a NACK message we could implement a + // metric thats shows the last time keys were updated. + + // TODO: Define a subscriber for this Request type in startups. + + ctx, _ := getContextForMethodTimeout(proc.ctx, message) + + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + outCh := make(chan []byte) + + go func() { + select { + case <-ctx.Done(): + // TODO: Should we receive a hash of he current keys from the node her ? + case outCh <- []byte{}: + } + }() + + select { + // case proc.toRingbufferCh <- []subjectAndMessage{sam}: + case <-ctx.Done(): + case <-outCh: + keys := make(map[Node]string) + json.Unmarshal(message.Data, &keys) + + fmt.Printf(" *** RECEIVED KEYS: %v\n", keys) + + // Prepare and queue for sending a new message with the output + // of the action executed. + // newReplyMessage(proc, message, out) + } + }() + + // Send back an ACK message. + // ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return nil, nil +} + +// ---- + // ---- Template that can be used for creating request methods // func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {