1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

fixed bug, out of order packages with port

This commit is contained in:
postmannen 2025-01-25 08:04:40 +01:00
parent 5127ab7099
commit f3626dae88
2 changed files with 63 additions and 73 deletions

View file

@ -9,7 +9,7 @@ We want to connect with ssh to node2, but it is not directly available from the
</style>
</head>
<body>
<p align="center"><img src="https://github.com/postmannen/ctrl/blob/main/doc/usecase-portforward-ssh.svg?raw=true" /></p>
<p align="center"><img src="https://github.com/postmannen/ctrl/blob/main/doc/portforward-ssh.svg?raw=true" /></p>
</body>
## Steps

View file

@ -340,10 +340,6 @@ func portSrcSubProcFunc(pid portInitialData, initialMessage Message, cancel cont
return err
}
// Since the data to write to the src network listener is coming from the dst node in
// the form of a message, we need to write it to a channel that the listener can read from.
srcWriteCh := make(chan []byte, 1)
// Start a goroutine to handle the tcp listener.
go func() {
for {
@ -354,6 +350,7 @@ func portSrcSubProcFunc(pid portInitialData, initialMessage Message, cancel cont
fmt.Printf("error: portSrcSubProcFunc: listener.Accept failed. err=%v\n", err)
return
}
defer func() {
conn.Close()
listener.Close()
@ -404,87 +401,80 @@ func portSrcSubProcFunc(pid portInitialData, initialMessage Message, cancel cont
}
}()
// We only allow 1 connection to the listener, so we're not starting a new goroutine for each
// connection.
// -----------------------------------------------------------------------------------
// Read from messages from dst node and write to the source network connection.
// -----------------------------------------------------------------------------------
expectedID := 0
buffer := NewPortSortedBuffer()
for {
select {
case <-ctx.Done():
fmt.Println("portSrcProcFunc: canceling procFunc", "processName", proc.processName)
return
case b := <-srcWriteCh:
n, err := conn.Write(b)
// Handle the messages reveived from the sub process on the src node.
// The messages will contain the data to be sent to the dst node.
case message := <-procFuncCh:
var pd portData
err := cbor.Unmarshal(message.Data, &pd)
if err != nil {
log.Fatalf("error: portSrcSubProcFunc: cbor unmarshalling failed. err=%v", err)
}
fmt.Printf("<---- GOT MESSAGE ON SRC, pdd.OK:%v, id: %v, length of pddData ::: %v\n", pd.OK, pd.ID, len(pd.Data))
buffer.Push(pd)
err = func() error {
// Write the data to the channel that the listener is reading from.
for {
nextID, _ := buffer.PeekNextID()
if expectedID != nextID {
log.Printf("WRONG ID, WILL WAIT FOR NEXT MESSAGE, expectedID %v < nextID: %v\n", expectedID, pd.ID)
return nil
}
log.Printf("------------CORRECT ID, EXPECTED: %v, GOT: %v\n", expectedID, pd.ID)
pdPopped, ok := buffer.Pop()
if !ok {
fmt.Println("src: Buffer is empty, break out, and wait for next message.")
return nil
}
fmt.Printf("popped, id: %v, size: %v\n", pdPopped.ID, len(pdPopped.Data))
n, err := conn.Write(pdPopped.Data)
if err != nil {
log.Printf("error: portSrcSubProcFunc: conn.Write failed. err=%v", err)
return err
}
fmt.Printf("--------> conn: portSrcSubProcFunc: wrote %v bytes with ID=%v to connection, exptedID was %v\n", n, pdPopped.ID, expectedID)
expectedID++
if !pdPopped.OK {
log.Printf("error: portSrcSubProcFunc: pdd.OK is false. err=%v\n", pdPopped.ErrorMsg)
return fmt.Errorf("%v", pdPopped.ErrorMsg)
}
}
}()
if err != nil {
log.Printf("error: portSrcSubProcFunc: conn.Write failed. err=%v", err)
return
}
fmt.Printf("--------> conn: portSrcSubProcFunc: wrote %v bytes to connection\n", n)
}
}
}
}()
// -----------------------------------------------------------------------------------
// Read from messages from dst node and write to the source network connection.
// -----------------------------------------------------------------------------------
expectedID := -1
buffer := NewPortSortedBuffer()
for {
select {
case <-ctx.Done():
fmt.Println("portSrcProcFunc: canceling procFunc", "processName", proc.processName)
return nil
// Handle the messages reveived from the sub process on the src node.
// The messages will contain the data to be sent to the dst node.
case message := <-procFuncCh:
expectedID++
var pd portData
err := cbor.Unmarshal(message.Data, &pd)
if err != nil {
log.Fatalf("error: portSrcSubProcFunc: cbor unmarshalling failed. err=%v", err)
}
fmt.Printf("<---- GOT DATA ON SRC, pdd.OK:%v, length of pddData ::: %v\n", pd.OK, len(pd.Data))
buffer.Push(pd)
err = func() error {
nextID, _ := buffer.PeekNextID()
if expectedID < nextID {
log.Printf("------------WRONG ID, WILL WAIT FOR NEXT MESSAGE, expectedID %v < nextID: %v\n", expectedID, pd.ID)
return nil
}
log.Printf("------------CORRECT ID, EXPECTED: %v, GOT: %v\n", expectedID, pd.ID)
// Write the data to the channel that the listener is reading from.
for {
pdPopped, ok := buffer.Pop()
if !ok {
// Buffer is empty, break out, and wait for next message.
break
}
srcWriteCh <- pdPopped.Data
if !pd.OK {
log.Printf("error: portSrcSubProcFunc: pdd.OK is false. err=%v\n", pd.ErrorMsg)
return fmt.Errorf("%v", pd.ErrorMsg)
}
}
return nil
}()
if err != nil {
return err
}
}
}
<-ctx.Done()
fmt.Println("------------------------------------------------EXITING portSrcSubProcFunc---------------------------------------------------------------")
return nil
}
return pf