diff --git a/.gitignore b/.gitignore index bdfeaf8..e0c919d 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ doc/concept/via/README.md notes.txt toolbox/ signing/ +frontend/ diff --git a/doc/.$drawings.drawio.bkp b/doc/.$drawings.drawio.bkp new file mode 100644 index 0000000..5ad8d24 --- /dev/null +++ b/doc/.$drawings.drawio.bkp @@ -0,0 +1,301 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/doc/drawings.drawio b/doc/drawings.drawio new file mode 100644 index 0000000..5ad8d24 --- /dev/null +++ b/doc/drawings.drawio @@ -0,0 +1,301 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/doc/portforward-ssh.svg b/doc/portforward-ssh.svg new file mode 100644 index 0000000..4d31d85 --- /dev/null +++ b/doc/portforward-ssh.svg @@ -0,0 +1,4 @@ + + + +
ctrl
node1
ctrl
node2
Start TCP Listener 
localhost:10022
Connect TCP
192.168.1.99:22
TCP packet encapsulated in NATS message
TCP packet encapsulated in NATS message
NATS
Server
Initial message to
connect a tcp stream from node1 on tcp port 10022
to
node2 tcp 192.168.1.99:22
and allow tcp forwarding
1
2
SSH Client
connect to
localhost:10022
SSH Server
at
192.168.1:99
SSH Connection
\ No newline at end of file diff --git a/doc/request_dummy.go.example_method b/doc/request_dummy.go.example_method new file mode 100644 index 0000000..4d41146 --- /dev/null +++ b/doc/request_dummy.go.example_method @@ -0,0 +1,335 @@ +// request_dummy.go +// +// request_dummy.go are put here as an example for how a process via it's +// method can spawn new sub processes. The only purpose of the 'main' method +// is to to prepare and start one or more sub processes that will handle the +// actual logic. +// +// Sub processes can be either short lived to do some work, like copying a +// file, or long lived like a server that listens for incoming connections +// to serve a web page. + +package ctrl + +import ( + "context" + "fmt" + "log/slog" + "strconv" + "time" + + "github.com/fxamacker/cbor/v2" + "github.com/google/uuid" +) + +type dummyInitialData struct { + UUID string + SourceNode Node + SourceSubMethod Method + SourcePort string + DestinationNode Node + DestinationSubMethod Method + DestinationPort string + MaxSessionTimeout int +} + +// The method that will handle the initial message, and setup whats needed for an outbound connection. +func methodDummySrc(proc process, message Message, node string) ([]byte, error) { + + go func() { + defer proc.processes.wg.Done() + + // Message example to start an outbound connection + // --- + // - toNode: + // - node1 + // method: dummySrc + // methodArgs: + // - node1 # destination node + // - 8080 # destination port + // - 100 # max session timeout + // replymethod: console + + const ( + arg0_DestinationNode = 0 + arg1_DestinationPort = 1 + arg2_MaxSessionTimeout = 2 + ) + + wantMethodArgs := "want: (0)destination-node, (1)destination-port, (2)max-session-timeout" + + pid := dummyInitialData{ + UUID: uuid.New().String(), + SourceNode: proc.node, + } + + proc.processes.wg.Add(1) + if len(message.MethodArgs) < 2 { + slog.Error("methodDummySrc: got <2 number methodArgs", "want", wantMethodArgs) + return + } + + // Destination node + if message.MethodArgs[arg0_DestinationNode] == "" { + slog.Error("methodDummySrc: no destination node specified in method args", "want", wantMethodArgs) + return + } + pid.DestinationNode = Node(message.MethodArgs[arg0_DestinationNode]) + + // Destination port + if message.MethodArgs[arg1_DestinationPort] == "" { + slog.Error("methodDummySrc: no destination port specified in method args", "want", wantMethodArgs) + return + } + pid.DestinationPort = message.MethodArgs[arg1_DestinationPort] + + // Max session timeout + if message.MethodArgs[arg2_MaxSessionTimeout] == "" { + slog.Error("methodDummySrc: no max session time specified in method args", "want", wantMethodArgs) + return + } + n, err := strconv.Atoi(message.MethodArgs[arg2_MaxSessionTimeout]) + if err != nil { + slog.Error("methodDummySrc: unable to convert max session timeout from string to int", "error", err) + return + } + pid.MaxSessionTimeout = n + + // Create a child context to use with the procFunc with timeout set to the max allowed total copy time + // specified in the message. + var ctx context.Context + var cancel context.CancelFunc + func() { + ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(pid.MaxSessionTimeout)) + }() + + srcSubProcessName := fmt.Sprintf("%v.%v", SUBDummySrc, pid.UUID) + pid.SourceSubMethod = Method(srcSubProcessName) + + // Create a new subprocess that will handle the network source side. + subject := newSubjectNoVerifyHandler(pid.SourceSubMethod, node) + dummySrcSubProc := newSubProcess(ctx, proc.server, subject) + + // Attach a procfunc to the sub process that will do the actual logic with the network source port. + // + // TODO: We need to initiate the network connection here to + // get hold of the source port, and send that over to the + // destination node. + // NB: assign pid.SourcePort a value. + dummySrcSubProc.procFunc = dummySrcSubProcFunc(pid, message, cancel) + + // Assign a handler to the sub process for receiving messages for the subprocess. + dummySrcSubProc.handler = dummySrcSubHandler() + + // Start sub process. The process will be killed when the context expires. + go dummySrcSubProc.start() + + // Prepare the naming for the dst method here so we can have all the + // information in the pid from the beginning at both ends and not have + // to generate naming on the destination node. + dstSubProcessName := fmt.Sprintf("%v.%v", SUBDummyDst, pid.UUID) + pid.DestinationSubMethod = Method(dstSubProcessName) + + fmt.Printf("DEBUG: methodDummySrc, pid: %+v\n", pid) + + // Marshal the data payload to send to the dst. + cb, err := cbor.Marshal(pid) + if err != nil { + er := fmt.Errorf("error: methodDummySrc: cbor marshalling failed: %v, message: %v", err, message) + proc.errorKernel.errSend(proc, message, er, logWarning) + cancel() + } + + // Send a message to the the destination node, to also start up a sub + // process there. + + msg := Message{ + ToNode: pid.DestinationNode, + Method: DummyDst, + Data: cb, + ACKTimeout: message.ACKTimeout, + Retries: message.Retries, + ReplyMethod: Console, // TODO: Adding for debug output + ReplyACKTimeout: message.ReplyACKTimeout, + ReplyRetries: message.ReplyRetries, + } + + proc.newMessagesCh <- msg + + replyData := fmt.Sprintf("info: succesfully started dummy source process: procName=%v, srcNode=%v, sourcePort=%v, dstNode=%v, starting sub process=%v", dummySrcSubProc.processName, node, pid.SourcePort, pid.DestinationNode, srcSubProcessName) + + newReplyMessage(proc, message, []byte(replyData)) + + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +func methodDummyDst(proc process, message Message, node string) ([]byte, error) { + var subProcessName string + + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + + // Get the status message sent from source. + var pid dummyInitialData + err := cbor.Unmarshal(message.Data, &pid) + if err != nil { + er := fmt.Errorf("error: methodDummyDst: failed to cbor Unmarshal data: %v, message=%v", err, message) + proc.errorKernel.errSend(proc, message, er, logWarning) + return + } + + fmt.Printf(" *** DEBUG: MaxSessionTimeout: %v\n", pid.MaxSessionTimeout) + + // Create a child context to use with the procFunc + var ctx context.Context + var cancel context.CancelFunc + func() { + ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(pid.MaxSessionTimeout)) + }() + + // Start preparing to start a sub process. + sub := newSubjectNoVerifyHandler(pid.DestinationSubMethod, node) + + // But first... + // Check if we already got a sub process registered and started with + // the processName. If true, return here and don't start up another + // process. + // + // This check is put in here if a message for some reason are + // received more than once. The reason that this might happen can be + // that a message for the same copy request was received earlier, but + // was unable to start up within the timeout specified. The Sender of + // that request will then resend the message, but at the time that + // second message is received the subscriber process started for the + // previous message is then fully up and running, so we just discard + // that second message in those cases. + + pn := processNameGet(sub.name()) + // fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn) + + proc.processes.active.mu.Lock() + _, ok := proc.processes.active.procNames[pn] + proc.processes.active.mu.Unlock() + + if ok { + proc.errorKernel.logDebug("methodCopyDst: subprocesses already existed, will not start another subscriber for", "processName", pn) + + // If the process name already existed we return here before any + // new information is registered in the process map and we avoid + // having to clean that up later. + return + } + + // Create a new sub process. + dummyDstSubProc := newSubProcess(ctx, proc.server, sub) + + // Give the sub process a procFunc that will do the actual networking within a procFunc, + dummyDstSubProc.procFunc = dummyDstSubProcFunc(pid, message, cancel) + + // assign a handler to the sub process + dummyDstSubProc.handler = dummyDstSubHandler() + + // The process will be killed when the context expires. + go dummyDstSubProc.start() + + replyData := fmt.Sprintf("info: succesfully initiated dummy destination process: procName=%v, srcNode=%v, starting sub process=%v for the actual copying", dummyDstSubProc.processName, node, subProcessName) + + newReplyMessage(proc, message, []byte(replyData)) + + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +func dummySrcSubHandler() func(process, Message, string) ([]byte, error) { + h := func(proc process, message Message, node string) ([]byte, error) { + + select { + case <-proc.ctx.Done(): + proc.errorKernel.logDebug("copySrcHandler: process ended", "processName", proc.processName) + case proc.procFuncCh <- message: + proc.errorKernel.logDebug("copySrcHandler: passing message over to procFunc", "processName", proc.processName) + } + + return nil, nil + } + + return h +} + +func dummyDstSubHandler() func(process, Message, string) ([]byte, error) { + h := func(proc process, message Message, node string) ([]byte, error) { + + select { + case <-proc.ctx.Done(): + proc.errorKernel.logDebug("copyDstHandler: process ended", "processName", proc.processName) + case proc.procFuncCh <- message: + proc.errorKernel.logDebug("copyDstHandler: passing message over to procFunc", "processName", proc.processName) + + } + + return nil, nil + } + + return h +} + +func dummySrcSubProcFunc(pid dummyInitialData, initialMessage Message, cancel context.CancelFunc) func(context.Context, process, chan Message) error { + pf := func(ctx context.Context, proc process, procFuncCh chan Message) error { + fmt.Printf("STARTED PROCFUNC: %v\n", proc.subject.name()) + defer fmt.Println("dummySrcProcFunc: canceled procFunc", "processName", proc.processName) + + for { + select { + case <-ctx.Done(): + fmt.Println("dummySrcProcFunc: canceling procFunc", "processName", proc.processName) + return nil + + // Pick up the message recived by the copySrcSubHandler. + case message := <-procFuncCh: + _ = message + + for i := 0; i < 20; i++ { + fmt.Printf("dst: %v\n", i) + time.Sleep(time.Second * 1) + } + } + } + + } + + return pf +} + +func dummyDstSubProcFunc(pid dummyInitialData, message Message, cancel context.CancelFunc) func(context.Context, process, chan Message) error { + + pf := func(ctx context.Context, proc process, procFuncCh chan Message) error { + fmt.Printf("STARTED PROCFUNC: %v\n", proc.subject.name()) + defer fmt.Println("dummyDstProcFunc: canceled procFunc", "processName", proc.processName) + + for { + select { + case <-ctx.Done(): + fmt.Println("dummyDstProcFunc: got <-ctx.Done() cancelling procFunc", "processName", proc.processName) + return nil + + // Pick up the message recived by the copySrcSubHandler. + case message := <-procFuncCh: + _ = message + + for i := 0; i < 20; i++ { + fmt.Printf("dst: %v\n", i) + time.Sleep(time.Second * 1) + } + } + } + + } + + return pf +} diff --git a/doc/src/SUMMARY.md b/doc/src/SUMMARY.md index 02f3d47..afc9e72 100644 --- a/doc/src/SUMMARY.md +++ b/doc/src/SUMMARY.md @@ -40,3 +40,4 @@ - [ctrl as github action runner](usecase-ctrl-as-github-action-runner.md) - [ctrl as prometheus collector](usecase-ctrl-as-prometheus-collector.md) +- [ctrl as tcp forwarder for ssh](usecase-ctrl-as-tcp-forwarder-for-ssh.md) diff --git a/doc/src/usecase-ctrl-as-tcp-forwarder-for-ssh.md b/doc/src/usecase-ctrl-as-tcp-forwarder-for-ssh.md new file mode 100644 index 0000000..a840e39 --- /dev/null +++ b/doc/src/usecase-ctrl-as-tcp-forwarder-for-ssh.md @@ -0,0 +1,44 @@ +# ctrl as tcp forwarder for ssh + +ctrl can be used to forward a TCP stream between two nodes. The Individual TCP packet will be read from a TCP listener on a node, put into a standard ctrl message, the message are then sent to the destination node using NATS PUB/SUB where the TCP packet is extracted, and written to the TCP connection. + +In the follwing example we have two nodes, where we can think of **node1** as the local node running on your computer, and node2 are the remote node running on a server somewhere. + +We want to connect with ssh to node2, but it is not directly available from the network where the local node1 resides, so we use ctrl as a proxy server and forward the ssh tcp stream. + + + + +

+ + +## Steps + +1. Create the message with forwarding details, and copy iy into **node1's readfolder**. + +```yaml +--- +- toNodes: + - node1 # The source node where we start the tcp listener + method: portSrc # The ctrl tcp forward method + methodArgs: + - node2 # The destination node who connects to the actual endpoint + - 192.168.99.1:22 # The ip address and port we want to connect to from endpoint. + - :10022 # The local port to start the listener on node1 + - 30 # How many seconds the tcp forwarder should be active + methodTimeout: 30 # Same as above, but at the method level. Set them to the same. + replyMethod: console # Do logging to console on node1 + ACKTimeout: 0 # No ACK'in of messages. Fire and forget. +``` + +1. On **node1** a process with a TCP listener will be started at **0.0.0.0:10022**. + +1. The process on **node1** will then send a message to **node2** to start up a process and connect to the endpoint defined in the **methodArgs**. + +1. The forwarding are now up running, and we can use a ssh client to connect to the endpoint which are now forwarded the **node1** on port **10022**. + +```bash +ssh -p 10022 user@localhost +``` + +The forwarding will automatically end after the timeperiod specified, which in this example is 30 seconds. diff --git a/message_readers.go b/message_readers.go index 6925485..8d6bd10 100644 --- a/message_readers.go +++ b/message_readers.go @@ -345,7 +345,7 @@ func (s *server) readFolder() { fh, err := os.Open(event.Name) if err != nil { er := fmt.Errorf("error: readFolder: failed to open readFile from readFolder: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) + s.errorKernel.errSend(s.processInitial, Message{}, er, logDebug) return } diff --git a/processes.go b/processes.go index ad263d1..5394341 100644 --- a/processes.go +++ b/processes.go @@ -180,6 +180,10 @@ func (p *processes) Start(proc process) { } proc.startup.startProcess(proc, PublicKey, nil) + + // TODO: Create configuration flags for enabling processes. + proc.startup.startProcess(proc, PortSrc, nil) + proc.startup.startProcess(proc, PortDst, nil) } // Stop all subscriber processes. diff --git a/requests.go b/requests.go index fbf2423..de9cb63 100644 --- a/requests.go +++ b/requests.go @@ -104,6 +104,17 @@ const ( SUBCopySrc Method = "subCopySrc" // Write the destination copied to some node. SUBCopyDst Method = "subCopyDst" + + PortSrc Method = "portSrc" + PortDst Method = "PortDst" + SUBPortSrc Method = "subPortSrc" + SUBPortDst Method = "subPortDst" + + DummySrc Method = "dummySrc" + DummyDst Method = "dummyDst" + SUBDummySrc Method = "subDummySrc" + SUBDummyDst Method = "subDummyDst" + // Hello I'm here message. Hello Method = "hello" HelloPublisher Method = "helloPublisher" @@ -185,6 +196,10 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { CopyDst: Handler(methodCopyDst), SUBCopySrc: Handler(methodSUB), SUBCopyDst: Handler(methodSUB), + PortSrc: Handler(methodPortSrc), + PortDst: Handler(methodPortDst), + SUBPortSrc: Handler(methodSUB), + SUBPortDst: Handler(methodSUB), Hello: Handler(methodHello), // The hello publisher will not subscribe for messages, it will // only start a procFunc, so we we don't need a handler with a method, @@ -258,7 +273,7 @@ func methodInitial(proc process, message Message, node string) ([]byte, error) { // ---- // place holder method used for sub processes. -// Methods used in sub processes are defined within the the requests +// Methods used in sub processes are defined within the requests // they are spawned in, so this type is primarily for us to use the // same logic with sub process requests as we do with normal requests. func methodSUB(proc process, message Message, node string) ([]byte, error) { diff --git a/requests_port.go b/requests_port.go new file mode 100644 index 0000000..067c9e1 --- /dev/null +++ b/requests_port.go @@ -0,0 +1,708 @@ +package ctrl + +import ( + "context" + "fmt" + "io" + "log" + "log/slog" + "net" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/fxamacker/cbor/v2" + "github.com/google/uuid" +) + +// portInitialData is the data that is sent to the source and destination nodes when a port forwarding is created. +type portInitialData struct { + UUID string + SourceNode Node + SourceSubMethod Method + SourceIPAndPort string + DestinationNode Node + DestinationSubMethod Method + DestinationIPAndPort string + MaxSessionTimeout int +} + +// NewPortInitialData creates a new portInitialData struct. +func NewPortInitialData(uuid string, sourceNode Node, sourceSubMethod Method, sourcePort string, destinationNode Node, destinationSubMethod Method, destinationPort string, maxSessionTimeout int) portInitialData { + pid := portInitialData{ + UUID: uuid, + SourceNode: sourceNode, + SourceSubMethod: sourceSubMethod, + SourceIPAndPort: sourcePort, + DestinationNode: destinationNode, + DestinationSubMethod: destinationSubMethod, + DestinationIPAndPort: destinationPort, + MaxSessionTimeout: maxSessionTimeout, + } + + return pid +} + +// The main method that will handle the initial message, and setup whats needed for an outbound connection. +// It will setup a sub process that will handle the individual session of the port forwarding on the +// source node, and also setup the local tcp listener on the source node that a client can connect to. +// +// NB: All logic for the forwarding are done in the subprocesses started. +func methodPortSrc(proc process, message Message, node string) ([]byte, error) { + + go func() { + defer proc.processes.wg.Done() + + // Message example to start an outbound connection + // --- + // - toNode: + // - node1 + // method: portSrc + // methodArgs: + // - node1 # destination node + // - localhost:8080 # destination node and port + // - localhost:8090 # source node and port + // - 100 # max session timeout + // replymethod: console + + const ( + arg0_DestinationNode = 0 + arg1_DestinationIPAndPort = 1 + arg2_SourceIPandPort = 2 + arg3_MaxSessionTimeout = 3 + ) + + wantMethodArgs := "want: (0)destination-node, (1)destination-ip-and-port, (2) source-ip-and-port, (3)max-session-timeout" + + uuid := uuid.New().String() + sourceNode := proc.node + + proc.processes.wg.Add(1) + if len(message.MethodArgs) < arg3_MaxSessionTimeout { + slog.Error("methodPortSrc: to few methodArgs defined in message", "want", wantMethodArgs) + return + } + + // Destination node + if message.MethodArgs[arg0_DestinationNode] == "" { + slog.Error("methodPortSrc: no destination node specified in method args", "want", wantMethodArgs) + return + } + destinationNode := Node(message.MethodArgs[arg0_DestinationNode]) + + // Destination port + if message.MethodArgs[arg1_DestinationIPAndPort] == "" { + slog.Error("methodPortSrc: no destination port specified in method args", "want", wantMethodArgs) + return + } + destinationIPAndPort := message.MethodArgs[arg1_DestinationIPAndPort] + + // Source port + if message.MethodArgs[arg2_SourceIPandPort] == "" { + slog.Error("methodPortSrc: no source port specified in method args", "want", wantMethodArgs) + return + } + sourceIPAndPort := message.MethodArgs[arg2_SourceIPandPort] + + // Max session timeout + if message.MethodArgs[arg3_MaxSessionTimeout] == "" { + slog.Error("methodPortSrc: no max session time specified in method args", "want", wantMethodArgs) + return + } + n, err := strconv.Atoi(message.MethodArgs[arg3_MaxSessionTimeout]) + if err != nil { + slog.Error("methodPortSrc: unable to convert max session timeout from string to int", "error", err) + return + } + maxSessionTimeout := n + + // Prepare the naming for the dst method here so we can have all the + // information in the pid from the beginning at both ends and not have + // to generate naming on the destination node. + dstSubProcessName := fmt.Sprintf("%v.%v", SUBPortDst, uuid) + destinationSubMethod := Method(dstSubProcessName) + + // ------- Create the source sub process ------- + + // Create a child context to use with the procFunc with timeout set to the max allowed total copy time + // specified in the message. + var ctx context.Context + var cancel context.CancelFunc + func() { + ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(maxSessionTimeout)) + }() + + srcSubProcessName := fmt.Sprintf("%v.%v", SUBPortSrc, uuid) + sourceSubMethod := Method(srcSubProcessName) + + // Create a new subprocess that will handle the network source side. + subject := newSubjectNoVerifyHandler(sourceSubMethod, node) + portSrcSubProc := newSubProcess(ctx, proc.server, subject) + + // Create a new portInitialData with all the information we need for starting the sub processes. + // NB: Using the destination port as the source port on the source process for now. + pid := NewPortInitialData(uuid, sourceNode, sourceSubMethod, sourceIPAndPort, destinationNode, destinationSubMethod, destinationIPAndPort, maxSessionTimeout) + + // Attach a procfunc to the sub process that will do the actual logic with the network source port. + portSrcSubProc.procFunc = portSrcSubProcFunc(pid, message, cancel) + + // Assign a handler to the sub process for receiving messages for the subprocess. + portSrcSubProc.handler = portSrcSubHandler() + + // Start sub process. The process will be killed when the context expires. + go portSrcSubProc.start() + + fmt.Printf("DEBUG: methodPortSrc, pid: %+v\n", pid) + + // ------- Prepare the data payload to send to the dst to start the dst sub process ------- + cb, err := cbor.Marshal(pid) + if err != nil { + er := fmt.Errorf("error: methodPortSrc: cbor marshalling failed: %v, message: %v", err, message) + proc.errorKernel.errSend(proc, message, er, logWarning) + cancel() + } + + // Send a message to the the destination node, to also start up a sub + // process there. + + msg := Message{ + ToNode: pid.DestinationNode, + Method: PortDst, + Data: cb, + ACKTimeout: message.ACKTimeout, + Retries: message.Retries, + ReplyMethod: Console, // TODO: Adding for debug output + ReplyACKTimeout: message.ReplyACKTimeout, + ReplyRetries: message.ReplyRetries, + } + + proc.newMessagesCh <- msg + + replyData := fmt.Sprintf("info: succesfully started port source process: procName=%v, srcNode=%v, sourcePort=%v, dstNode=%v, starting sub process=%v", portSrcSubProc.processName, node, pid.SourceIPAndPort, pid.DestinationNode, srcSubProcessName) + + newReplyMessage(proc, message, []byte(replyData)) + + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +// The main method that will handle the initial message sendt from the source process, and setup +// sub process that will handle the individual session of the port forwarding, and also +// setup the connection the final tcp endpoint on the destination node. +// +// NB: All logic for the forwarding are done in the subprocesses started. +func methodPortDst(proc process, message Message, node string) ([]byte, error) { + var subProcessName string + + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + + // Get the status message sent from source. + var pid portInitialData + err := cbor.Unmarshal(message.Data, &pid) + if err != nil { + er := fmt.Errorf("error: methodPortDst: failed to cbor Unmarshal data: %v, message=%v", err, message) + proc.errorKernel.errSend(proc, message, er, logWarning) + return + } + + fmt.Printf(" *** DEBUG: ON DST: got pid: %+v\n", pid) + + // Create a child context to use with the procFunc + var ctx context.Context + var cancel context.CancelFunc + func() { + ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(pid.MaxSessionTimeout)) + }() + + // Start preparing to start a sub process. + sub := newSubjectNoVerifyHandler(pid.DestinationSubMethod, node) + + // But first... + // Check if we already got a sub process registered and started with + // the processName. If true, return here and don't start up another + // process. + // + // This check is put in here if a message for some reason are + // received more than once. The reason that this might happen can be + // that a message for the same copy request was received earlier, but + // was unable to start up within the timeout specified. The Sender of + // that request will then resend the message, but at the time that + // second message is received the subscriber process started for the + // previous message is then fully up and running, so we just discard + // that second message in those cases. + + pn := processNameGet(sub.name()) + // fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn) + + proc.processes.active.mu.Lock() + _, ok := proc.processes.active.procNames[pn] + proc.processes.active.mu.Unlock() + + if ok { + proc.errorKernel.logDebug("methodCopyDst: subprocesses already existed, will not start another subscriber for", "processName", pn) + + // If the process name already existed we return here before any + // new information is registered in the process map and we avoid + // having to clean that up later. + return + } + + // Create a new sub process. + portDstSubProc := newSubProcess(ctx, proc.server, sub) + + // Give the sub process a procFunc that will do the actual networking within a procFunc, + portDstSubProc.procFunc = portDstSubProcFunc(pid, message, cancel) + + // assign a handler to the sub process + portDstSubProc.handler = portDstSubHandler() + + // The process will be killed when the context expires. + go portDstSubProc.start() + + replyData := fmt.Sprintf("info: succesfully initiated port destination process: procName=%v, srcNode=%v, starting sub process=%v for the actual copying", portDstSubProc.processName, node, subProcessName) + + newReplyMessage(proc, message, []byte(replyData)) + + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +// portSrcSubHandler is the handler for messages received and destined for the port source sub process. +// It will pass the message to the procFunc of the port source sub process, which will do the actual +// handling of the messages. +func portSrcSubHandler() func(process, Message, string) ([]byte, error) { + h := func(proc process, message Message, node string) ([]byte, error) { + + select { + case <-proc.ctx.Done(): + proc.errorKernel.logDebug("copySrcHandler: process ended", "processName", proc.processName) + case proc.procFuncCh <- message: + proc.errorKernel.logDebug("copySrcHandler: passing message over to procFunc", "processName", proc.processName) + } + + return nil, nil + } + + return h +} + +// portDstSubHandler is the handler for messages received and destined for the port destination sub process. +// It will pass the message to the procFunc of the port destination sub process, which will do the actual +// handling of the messages. +func portDstSubHandler() func(process, Message, string) ([]byte, error) { + h := func(proc process, message Message, node string) ([]byte, error) { + + select { + case <-proc.ctx.Done(): + proc.errorKernel.logDebug("copyDstHandler: process ended", "processName", proc.processName) + case proc.procFuncCh <- message: + proc.errorKernel.logDebug("copyDstHandler: passing message over to procFunc", "processName", proc.processName) + + } + + return nil, nil + } + + return h +} + +type portData struct { + OK bool + ErrorMsg string + Data []byte + ID int +} + +// portSrcSubProcFunc is the function that will be run by the portSrcSubProc process. +// It will listen on the source IP and port, and send messages to the destination node +// to write the data to the destination IP and port. +// It will also handle the incomming messages from the destination node and write the +// data in the message to the source IP and port. +func portSrcSubProcFunc(pid portInitialData, initialMessage Message, cancel context.CancelFunc) func(context.Context, process, chan Message) error { + pf := func(ctx context.Context, proc process, procFuncCh chan Message) error { + fmt.Printf("STARTED PROCFUNC: %v\n", proc.subject.name()) + defer cancel() + defer fmt.Println("portSrcProcFunc: canceled procFunc", "processName", proc.processName) + + listener, err := net.Listen("tcp", pid.SourceIPAndPort) + if err != nil { + // TODO: Send a message to destination sub process that there was an error, + // and that it should stop. + fmt.Printf("error: portSrcSubProcFunc: net.Listen failed. err=%v\n", err) + return err + } + + // Since the data to write to the src network listener is coming from the dst node in + // the form of a message, we need to write it to a channel that the listener can read from. + srcWriteCh := make(chan []byte, 1) + + // Start a goroutine to handle the tcp listener. + go func() { + for { + conn, err := listener.Accept() + if err != nil { + // TODO: Send a message to destination sub process that there was an error, + // and that it should stop. + fmt.Printf("error: portSrcSubProcFunc: listener.Accept failed. err=%v\n", err) + return + } + defer func() { + conn.Close() + listener.Close() + fmt.Println(" DEBUG: Canceled portSrcSubProcFunc connection and listener") + }() + + // Read the data from the tcp connection, create messages from it, and + // send it to the dst node. + go func() { + id := 0 + for { + b := make([]byte, 65535) + n, err := conn.Read(b) + if err != nil { + log.Printf("error: portSrcSubProcFunc: conn.Read failed. err=%v", err) + return + } + + pd := portData{ + ID: id, + Data: b[:n], + } + id++ + + cb, err := cbor.Marshal(pd) + if err != nil { + log.Fatalf("error: portDstSubProcFunc: cbor marshalling failed. err=%v", err) + } + + msg := Message{ + ToNode: pid.DestinationNode, + Method: Method(pid.DestinationSubMethod), + Data: cb, + ACKTimeout: initialMessage.ACKTimeout, + Retries: initialMessage.Retries, + ReplyMethod: None, + } + + fmt.Printf(" ******* SRC: Created message to send with id : %v\n", pd.ID) + + select { + case <-ctx.Done(): + fmt.Println("portSrcProcFunc: canceling procFunc", "processName", proc.processName) + return + case proc.newMessagesCh <- msg: + fmt.Printf(" ---->: Sending message, id: %v, size: %v\n", pd.ID, len(pd.Data)) + } + } + }() + + // We only allow 1 connection to the listener, so we're not starting a new goroutine for each + // connection. + for { + select { + case <-ctx.Done(): + fmt.Println("portSrcProcFunc: canceling procFunc", "processName", proc.processName) + return + case b := <-srcWriteCh: + n, err := conn.Write(b) + if err != nil { + log.Printf("error: portSrcSubProcFunc: conn.Write failed. err=%v", err) + return + } + fmt.Printf("--------> conn: portSrcSubProcFunc: wrote %v bytes to connection\n", n) + } + } + } + }() + + // ----------------------------------------------------------------------------------- + // Read from messages from dst node and write to the source network connection. + // ----------------------------------------------------------------------------------- + + expectedID := -1 + buffer := NewPortSortedBuffer() + for { + select { + case <-ctx.Done(): + fmt.Println("portSrcProcFunc: canceling procFunc", "processName", proc.processName) + return nil + + // Handle the messages reveived from the sub process on the src node. + // The messages will contain the data to be sent to the dst node. + case message := <-procFuncCh: + expectedID++ + + var pd portData + err := cbor.Unmarshal(message.Data, &pd) + if err != nil { + log.Fatalf("error: portSrcSubProcFunc: cbor unmarshalling failed. err=%v", err) + } + + fmt.Printf("<---- GOT DATA ON SRC, pdd.OK:%v, length of pddData ::: %v\n", pd.OK, len(pd.Data)) + + buffer.Push(pd) + + err = func() error { + nextID, _ := buffer.PeekNextID() + + if expectedID < nextID { + log.Printf("------------WRONG ID, WILL WAIT FOR NEXT MESSAGE, expectedID %v < nextID: %v\n", expectedID, pd.ID) + return nil + } + + log.Printf("------------CORRECT ID, EXPECTED: %v, GOT: %v\n", expectedID, pd.ID) + + // Write the data to the channel that the listener is reading from. + for { + pdPopped, ok := buffer.Pop() + if !ok { + // Buffer is empty, break out, and wait for next message. + break + } + + srcWriteCh <- pdPopped.Data + + if !pd.OK { + log.Printf("error: portSrcSubProcFunc: pdd.OK is false. err=%v\n", pd.ErrorMsg) + return fmt.Errorf("%v", pd.ErrorMsg) + } + } + + return nil + }() + + if err != nil { + return err + } + } + } + + } + + return pf +} + +// portDstSubProcFunc is the function that will be run by the portDstSubProc process. +// It will connect to the destination IP and port, and send messages to the source node +// to write the data to the source IP and port. +// It will also handle the incomming messages from the source node and write the +// data in the message to the destination IP and port. +func portDstSubProcFunc(pid portInitialData, message Message, cancel context.CancelFunc) func(context.Context, process, chan Message) error { + + pf := func(ctx context.Context, proc process, procFuncCh chan Message) error { + defer cancel() + + fmt.Printf("STARTED PROCFUNC: %v\n", proc.subject.name()) + defer fmt.Println("portDstProcFunc: canceled procFunc", "processName", proc.processName) + + // TODO: Start the tcp connection for the dst node here. + // ------------ + conn, err := net.Dial("tcp", pid.DestinationIPAndPort) + if err != nil { + log.Fatalf("error: portDstSubProcFunc: dial failed. err=%v", err) + } + defer conn.Close() + + // Read from destination network connection and send messages to src node of whats read. + go func() { + id := 0 + + for { + var errorMsg string + ok := true + + b := make([]byte, 65535) + + n, err := conn.Read(b) + if err != nil { + // If there was an error while reading, set the value of errorMsg and ok accordingly. + // This information will then be used on the src node to stop sub processes etc. + switch { + case err == io.EOF: + ok = false + errorMsg = fmt.Sprintf("portDstSubProcFunc: conn.Read() returned EOF, n: %v", n) + log.Printf("AT DST: %v\n", errorMsg) + case strings.Contains(err.Error(), "use of closed network connection"): + ok = false + fmt.Printf("AT DST: portDstSubProcFunc: conn.Read(): closed network connection, err: %v, n: %v\n", err, n) + default: + ok = false + fmt.Printf("AT DST: portDstSubProcFunc: conn.Read(): other error, err: %v, n: %v\n", err, n) + } + + } + + fmt.Printf("portDstSubProcFunc: read from network conn: length: %v\n", n) + + pdd := portData{ + OK: ok, + ErrorMsg: errorMsg, + Data: b[:n], + ID: id, + } + id++ + + cb, err := cbor.Marshal(pdd) + if err != nil { + log.Fatalf("error: portDstSubProcFunc: cbor marshalling failed. err=%v", err) + } + + msg := Message{ + ToNode: pid.SourceNode, + Method: Method(pid.SourceSubMethod), + Data: cb, + ACKTimeout: message.ACKTimeout, + Retries: message.Retries, + ReplyMethod: None, + } + + proc.newMessagesCh <- msg + fmt.Printf(" ******* DST: Created message to send with id : %v\n", id) + + // If there was en error while reading, we exit the loop, so the connection is closed. + if !ok { + // TODO: Check out the cancelation!!! + //cancel() + return + } + + } + + }() + + // ----------------------------------------------------------------------------------- + // Read from messages from src node and write to the destination network connection. + // ----------------------------------------------------------------------------------- + + expectedID := -1 + buffer := NewPortSortedBuffer() + + for { + select { + case <-ctx.Done(): + fmt.Println("portDstProcFunc: got <-ctx.Done() cancelling procFunc", "processName", proc.processName) + return nil + + // Pick up the message recived by the copySrcSubHandler. + case message := <-procFuncCh: + expectedID++ + + var pd portData + err := cbor.Unmarshal(message.Data, &pd) + if err != nil { + log.Fatalf("error: portDstSubProcFunc: cbor unmarshalling failed. err=%v", err) + } + + fmt.Printf("<---- GOT DATA ON DST, id: %v, length: %v\n", pd.ID, len(pd.Data)) + + buffer.Push(pd) + + err = func() error { + nextID, _ := buffer.PeekNextID() + // Messages might come out of order. + // If the expected id is larger than the next id, we've lost packages, and end the process. + // If the expected id is less than the next id, we return and wait for the next message. + // if expectedID > nextID { + // err := fmt.Errorf("portDstSubProcFunc: error: expectedID %v > nextID %v, we've probably lost packages", expectedID, nextID) + // log.Println(err) + // + // log.Printf(" ---------- DEBUG portDstSubProcFunc: buffer now contains: %+v\n", buffer.buffer) + // + // return err + // } + if expectedID < nextID { + log.Printf("------------WRONG ID, WILL WAIT FOR NEXT MESSAGE, expectedID %v < nextID: %v\n", expectedID, pd.ID) + return nil + } + + log.Printf("------------CORRECT ID, EXPECTED: %v, GOT: %v\n", expectedID, pd.ID) + + // Loop over eventual messages in the buffer and write them to the connection. + for { + pdPopped, ok := buffer.Pop() + + if !ok { + // Buffer is empty, break out and wait for next message. + break + } + + n, err := conn.Write(pdPopped.Data) + if err != nil { + err := fmt.Errorf("error: portDstSubProcFunc: conn.Write failed. err=%v", err) + log.Println(err) + return err + } + fmt.Printf("--------> conn: portDstSubProcFunc: wrote %v bytes to connection\n", n) + } + + return nil + }() + + // Check if there was an error in the above function. + if err != nil { + return err + } + } + } + + } + + return pf +} + +// ----------------------------------------------------------- + +// portSortedBuffer is a thread-safe buffer that sorts the data by ID. +type portSortedBuffer struct { + buffer []portData + mu sync.Mutex +} + +// NewPortSortedBuffer creates a new portSortedBuffer. +func NewPortSortedBuffer() *portSortedBuffer { + b := portSortedBuffer{} + return &b +} + +// Push adds a new portData to the buffer and sorts it by ID. +func (b *portSortedBuffer) Push(value portData) { + b.buffer = append(b.buffer, value) + + b.mu.Lock() + defer b.mu.Unlock() + sort.SliceStable(b.buffer, func(i, j int) bool { + return b.buffer[i].ID < b.buffer[j].ID + }) +} + +// Pop removes and returns the first portData from the buffer. +func (b *portSortedBuffer) Pop() (portData, bool) { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.buffer) == 0 { + return portData{}, false + } + value := b.buffer[0] + b.buffer = b.buffer[1:] + return value, true +} + +// PeekNextID returns the ID of the next portData in the buffer without removing it. +func (b *portSortedBuffer) PeekNextID() (int, bool) { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.buffer) == 0 { + return -1, false + } + id := b.buffer[0].ID + + return id, true +}