diff --git a/requests_port.go b/requests_port.go index 14e7a86..2defa8d 100644 --- a/requests_port.go +++ b/requests_port.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "io" - "log" - "log/slog" "net" "sort" "strconv" @@ -81,39 +79,39 @@ func methodPortSrc(proc process, message Message, node string) ([]byte, error) { proc.processes.wg.Add(1) if len(message.MethodArgs) < arg3_MaxSessionTimeout { - slog.Error("methodPortSrc: to few methodArgs defined in message", "want", wantMethodArgs) + proc.errorKernel.logError("methodPortSrc: to few methodArgs defined in message", "want", wantMethodArgs) return } // Destination node if message.MethodArgs[arg0_DestinationNode] == "" { - slog.Error("methodPortSrc: no destination node specified in method args", "want", wantMethodArgs) + proc.errorKernel.logError("methodPortSrc: no destination node specified in method args", "want", wantMethodArgs) return } destinationNode := Node(message.MethodArgs[arg0_DestinationNode]) // Destination port if message.MethodArgs[arg1_DestinationIPAndPort] == "" { - slog.Error("methodPortSrc: no destination port specified in method args", "want", wantMethodArgs) + proc.errorKernel.logError("methodPortSrc: no destination port specified in method args", "want", wantMethodArgs) return } destinationIPAndPort := message.MethodArgs[arg1_DestinationIPAndPort] // Source port if message.MethodArgs[arg2_SourceIPandPort] == "" { - slog.Error("methodPortSrc: no source port specified in method args", "want", wantMethodArgs) + proc.errorKernel.logError("methodPortSrc: no source port specified in method args", "want", wantMethodArgs) return } sourceIPAndPort := message.MethodArgs[arg2_SourceIPandPort] // Max session timeout if message.MethodArgs[arg3_MaxSessionTimeout] == "" { - slog.Error("methodPortSrc: no max session time specified in method args", "want", wantMethodArgs) + proc.errorKernel.logError("methodPortSrc: no max session time specified in method args", "want", wantMethodArgs) return } n, err := strconv.Atoi(message.MethodArgs[arg3_MaxSessionTimeout]) if err != nil { - slog.Error("methodPortSrc: unable to convert max session timeout from string to int", "error", err) + proc.errorKernel.logError("methodPortSrc: unable to convert max session timeout from string to int", "error", err) return } maxSessionTimeout := n @@ -154,7 +152,7 @@ func methodPortSrc(proc process, message Message, node string) ([]byte, error) { // Start sub process. The process will be killed when the context expires. go portSrcSubProc.start() - fmt.Printf("DEBUG: methodPortSrc, pid: %+v\n", pid) + proc.errorKernel.logDebug("methodPortSrc, pid", "pid", pid) // ------- Prepare the data payload to send to the dst to start the dst sub process ------- cb, err := cbor.Marshal(pid) @@ -211,7 +209,7 @@ func methodPortDst(proc process, message Message, node string) ([]byte, error) { return } - fmt.Printf(" *** DEBUG: ON DST: got pid: %+v\n", pid) + proc.errorKernel.logDebug("methodPortDst: got pid", "pid", pid) // Create a child context to use with the procFunc var ctx context.Context @@ -238,7 +236,6 @@ func methodPortDst(proc process, message Message, node string) ([]byte, error) { // that second message in those cases. pn := processNameGet(sub.name()) - // fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn) proc.processes.active.mu.Lock() _, ok := proc.processes.active.procNames[pn] @@ -328,15 +325,15 @@ type portData struct { // data in the message to the source IP and port. func portSrcSubProcFunc(pid portInitialData, initialMessage Message, cancel context.CancelFunc) func(context.Context, process, chan Message) error { pf := func(ctx context.Context, proc process, procFuncCh chan Message) error { - fmt.Printf("STARTED PROCFUNC: %v\n", proc.subject.name()) + proc.errorKernel.logDebug("STARTED PROCFUNC", "processName", proc.subject.name()) defer cancel() - defer fmt.Println("portSrcProcFunc: canceled procFunc", "processName", proc.processName) + defer proc.errorKernel.logDebug("portSrcProcFunc: canceled procFunc", "processName", proc.processName) listener, err := net.Listen("tcp", pid.SourceIPAndPort) if err != nil { // TODO: Send a message to destination sub process that there was an error, // and that it should stop. - fmt.Printf("error: portSrcSubProcFunc: net.Listen failed. err=%v\n", err) + proc.errorKernel.logError("portSrcSubProcFunc: net.Listen failed", "err", err) return err } @@ -347,14 +344,14 @@ func portSrcSubProcFunc(pid portInitialData, initialMessage Message, cancel cont if err != nil { // TODO: Send a message to destination sub process that there was an error, // and that it should stop. - fmt.Printf("error: portSrcSubProcFunc: listener.Accept failed. err=%v\n", err) + proc.errorKernel.logError("portSrcSubProcFunc: listener.Accept failed", "err", err) return } defer func() { conn.Close() listener.Close() - fmt.Println(" DEBUG: Canceled portSrcSubProcFunc connection and listener") + proc.errorKernel.logDebug("portSrcSubProcFunc: closed connection and listener") }() // Read the data from the tcp connection, create messages from it, and @@ -365,7 +362,7 @@ func portSrcSubProcFunc(pid portInitialData, initialMessage Message, cancel cont b := make([]byte, 65535) n, err := conn.Read(b) if err != nil { - log.Printf("error: portSrcSubProcFunc: conn.Read failed. err=%v", err) + proc.errorKernel.logError("portSrcSubProcFunc: conn.Read failed", "err=", err) return } @@ -377,7 +374,8 @@ func portSrcSubProcFunc(pid portInitialData, initialMessage Message, cancel cont cb, err := cbor.Marshal(pd) if err != nil { - log.Fatalf("error: portDstSubProcFunc: cbor marshalling failed. err=%v", err) + proc.errorKernel.logError("portDstSubProcFunc: cbor marshalling failed", "err", err) + return } msg := Message{ @@ -389,14 +387,14 @@ func portSrcSubProcFunc(pid portInitialData, initialMessage Message, cancel cont ReplyMethod: None, } - fmt.Printf(" ******* SRC: Created message to send with id : %v\n", pd.ID) + proc.errorKernel.logDebug("portSrcSubProcFunc: Created message to send", "pd.ID", pd.ID) select { case <-ctx.Done(): - fmt.Println("portSrcProcFunc: canceling procFunc", "processName", proc.processName) + proc.errorKernel.logDebug("portSrcProcFunc: canceling procFunc", "processName", proc.processName) return case proc.newMessagesCh <- msg: - fmt.Printf(" ---->: Sending message, id: %v, size: %v\n", pd.ID, len(pd.Data)) + proc.errorKernel.logDebug(" ---->: Sending message", "pd.ID", pd.ID, "length", len(pd.Data)) } } }() @@ -410,7 +408,7 @@ func portSrcSubProcFunc(pid portInitialData, initialMessage Message, cancel cont for { select { case <-ctx.Done(): - fmt.Println("portSrcProcFunc: canceling procFunc", "processName", proc.processName) + proc.errorKernel.logDebug("portSrcProcFunc: canceling procFunc", "processName", proc.processName) return // Handle the messages reveived from the sub process on the src node. @@ -420,43 +418,45 @@ func portSrcSubProcFunc(pid portInitialData, initialMessage Message, cancel cont var pd portData err := cbor.Unmarshal(message.Data, &pd) if err != nil { - log.Fatalf("error: portSrcSubProcFunc: cbor unmarshalling failed. err=%v", err) + proc.errorKernel.logError("portSrcSubProcFunc: cbor unmarshalling failed", "err", err) + return } - fmt.Printf("<---- GOT MESSAGE ON SRC, pdd.OK:%v, id: %v, length of pddData ::: %v\n", pd.OK, pd.ID, len(pd.Data)) + proc.errorKernel.logDebug("<---- GOT MESSAGE ON SRC", "pd.OK", pd.OK, "pd.ID", pd.ID, "length", len(pd.Data)) buffer.Push(pd) err = func() error { - // Write the data to the channel that the listener is reading from. + // Write the data to the network connection. for { nextID, _ := buffer.PeekNextID() + if expectedID != nextID { - log.Printf("WRONG ID, WILL WAIT FOR NEXT MESSAGE, expectedID %v < nextID: %v\n", expectedID, pd.ID) + proc.errorKernel.logDebug("portSrcSubProcFunc: WRONG ID, WILL WAIT FOR NEXT MESSAGE", "expectedID", expectedID, "nextID", pd.ID) return nil } - log.Printf("------------CORRECT ID, EXPECTED: %v, GOT: %v\n", expectedID, pd.ID) + proc.errorKernel.logDebug("portSrcSubProcFunc correct id", "EXPECTED", expectedID, "GOT", pd.ID) pdPopped, ok := buffer.Pop() if !ok { - fmt.Println("src: Buffer is empty, break out, and wait for next message.") + proc.errorKernel.logDebug("portSrcSubProcFunc: Buffer is empty, break out, and wait for next message.") return nil } - fmt.Printf("popped, id: %v, size: %v\n", pdPopped.ID, len(pdPopped.Data)) + proc.errorKernel.logDebug("portSrcSubProcFunc: popped", "id", pdPopped.ID, "size", len(pdPopped.Data)) n, err := conn.Write(pdPopped.Data) if err != nil { - log.Printf("error: portSrcSubProcFunc: conn.Write failed. err=%v", err) + proc.errorKernel.logError("portSrcSubProcFunc: conn.Write failed", "err", err) return err } - fmt.Printf("--------> conn: portSrcSubProcFunc: wrote %v bytes with ID=%v to connection, exptedID was %v\n", n, pdPopped.ID, expectedID) + proc.errorKernel.logDebug("--------> conn: portSrcSubProcFunc: wrote bytes with ID to connection, exptedID was", "bytes", n, "popped id", pdPopped.ID, "expectedID", expectedID) expectedID++ if !pdPopped.OK { - log.Printf("error: portSrcSubProcFunc: pdd.OK is false. err=%v\n", pdPopped.ErrorMsg) + proc.errorKernel.logDebug("error: portSrcSubProcFunc: pdd.OK is false", "err", pdPopped.ErrorMsg) return fmt.Errorf("%v", pdPopped.ErrorMsg) } } @@ -473,7 +473,7 @@ func portSrcSubProcFunc(pid portInitialData, initialMessage Message, cancel cont }() <-ctx.Done() - fmt.Println("------------------------------------------------EXITING portSrcSubProcFunc---------------------------------------------------------------") + return nil } @@ -490,14 +490,15 @@ func portDstSubProcFunc(pid portInitialData, message Message, cancel context.Can pf := func(ctx context.Context, proc process, procFuncCh chan Message) error { defer cancel() - fmt.Printf("STARTED PROCFUNC: %v\n", proc.subject.name()) - defer fmt.Println("portDstProcFunc: canceled procFunc", "processName", proc.processName) + proc.errorKernel.logDebug("portDstSubProcFunc: STARTED PROCFUNC", "processName", proc.subject.name()) + defer proc.errorKernel.logDebug("portDstProcFunc: canceled procFunc", "processName", proc.processName) // TODO: Start the tcp connection for the dst node here. // ------------ conn, err := net.Dial("tcp", pid.DestinationIPAndPort) if err != nil { - log.Fatalf("error: portDstSubProcFunc: dial failed. err=%v", err) + proc.errorKernel.logError("portDstSubProcFunc: dial failed", "err", err) + return err } defer conn.Close() @@ -518,19 +519,18 @@ func portDstSubProcFunc(pid portInitialData, message Message, cancel context.Can switch { case err == io.EOF: ok = false - errorMsg = fmt.Sprintf("portDstSubProcFunc: conn.Read() returned EOF, n: %v", n) - log.Printf("AT DST: %v\n", errorMsg) + proc.errorKernel.logError("portDstSubProcFunc: conn.Read() returned EOF", "bytes", n) case strings.Contains(err.Error(), "use of closed network connection"): ok = false - fmt.Printf("AT DST: portDstSubProcFunc: conn.Read(): closed network connection, err: %v, n: %v\n", err, n) + proc.errorKernel.logError("portDstSubProcFunc: conn.Read(): closed network connection", "err", err, "bytes", n) default: ok = false - fmt.Printf("AT DST: portDstSubProcFunc: conn.Read(): other error, err: %v, n: %v\n", err, n) + proc.errorKernel.logError("portDstSubProcFunc: conn.Read(): other error", "err", err, "bytes", n) } } - fmt.Printf("portDstSubProcFunc: read from network conn: length: %v\n", n) + proc.errorKernel.logDebug("portDstSubProcFunc: read from network conn", "bytes", n) pdd := portData{ OK: ok, @@ -542,7 +542,8 @@ func portDstSubProcFunc(pid portInitialData, message Message, cancel context.Can cb, err := cbor.Marshal(pdd) if err != nil { - log.Fatalf("error: portDstSubProcFunc: cbor marshalling failed. err=%v", err) + proc.errorKernel.logError("portDstSubProcFunc: cbor marshalling failed", "err", err) + return } msg := Message{ @@ -555,7 +556,7 @@ func portDstSubProcFunc(pid portInitialData, message Message, cancel context.Can } proc.newMessagesCh <- msg - fmt.Printf(" ******* DST: Created message to send with id : %v\n", id) + proc.errorKernel.logDebug("portDstSubProcFunc: Created message to send", "id", id) // If there was en error while reading, we exit the loop, so the connection is closed. if !ok { @@ -572,68 +573,57 @@ func portDstSubProcFunc(pid portInitialData, message Message, cancel context.Can // Read from messages from src node and write to the destination network connection. // ----------------------------------------------------------------------------------- - expectedID := -1 + expectedID := 0 buffer := NewPortSortedBuffer() for { select { case <-ctx.Done(): - fmt.Println("portDstProcFunc: got <-ctx.Done() cancelling procFunc", "processName", proc.processName) + proc.errorKernel.logDebug("portDstProcFunc: got <-ctx.Done() cancelling procFunc", "processName", proc.processName) return nil // Pick up the message recived by the copySrcSubHandler. case message := <-procFuncCh: - expectedID++ var pd portData err := cbor.Unmarshal(message.Data, &pd) if err != nil { - log.Fatalf("error: portDstSubProcFunc: cbor unmarshalling failed. err=%v", err) + proc.errorKernel.logError("portDstSubProcFunc: cbor unmarshalling failed", "err", err) } - fmt.Printf("<---- GOT DATA ON DST, id: %v, length: %v\n", pd.ID, len(pd.Data)) + proc.errorKernel.logDebug("portdstSubProcFunc: <---- GOT DATA ON DST, id: %v, length: %v\n", pd.ID, len(pd.Data)) buffer.Push(pd) err = func() error { - nextID, _ := buffer.PeekNextID() - // Messages might come out of order. - // If the expected id is larger than the next id, we've lost packages, and end the process. - // If the expected id is less than the next id, we return and wait for the next message. - // if expectedID > nextID { - // err := fmt.Errorf("portDstSubProcFunc: error: expectedID %v > nextID %v, we've probably lost packages", expectedID, nextID) - // log.Println(err) - // - // log.Printf(" ---------- DEBUG portDstSubProcFunc: buffer now contains: %+v\n", buffer.buffer) - // - // return err - // } - if expectedID < nextID { - log.Printf("------------WRONG ID, WILL WAIT FOR NEXT MESSAGE, expectedID %v < nextID: %v\n", expectedID, pd.ID) - return nil - } - - log.Printf("------------CORRECT ID, EXPECTED: %v, GOT: %v\n", expectedID, pd.ID) - // Loop over eventual messages in the buffer and write them to the connection. for { + nextID, _ := buffer.PeekNextID() + + if expectedID != nextID { + proc.errorKernel.logDebug("portdstSubProcFunc: WRONG ID, WILL WAIT FOR NEXT MESSAGE", "expectedID", expectedID, "nextID", pd.ID) + return nil + } + + proc.errorKernel.logDebug("portDstSubProcFunc: CORRECT ID, EXPECTED: %v, GOT: %v\n", expectedID, pd.ID) + pdPopped, ok := buffer.Pop() if !ok { // Buffer is empty, break out and wait for next message. - break + return nil } n, err := conn.Write(pdPopped.Data) if err != nil { err := fmt.Errorf("error: portDstSubProcFunc: conn.Write failed. err=%v", err) - log.Println(err) + proc.errorKernel.logError(err.Error()) return err } - fmt.Printf("--------> conn: portDstSubProcFunc: wrote %v bytes to connection\n", n) - } + proc.errorKernel.logDebug("portDstSubProcFunc: --------> conn: wrote to connection", "bytes", n) - return nil + expectedID++ + } }() // Check if there was an error in the above function.