1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

replaced struct{} with callback func for testing

This commit is contained in:
postmannen 2021-07-05 07:43:33 +02:00
parent dd6d52f427
commit 1523ae84c3
2 changed files with 13 additions and 5 deletions

View file

@ -211,10 +211,15 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
// error with an individual message occurs.
go func(v samDBValue) {
v.Data.Message.done = make(chan struct{})
delivredCh := make(chan struct{})
// Prepare the structure with the data, and a function that can
// be called when the data is received for signaling back.
sd := samDBValueAndDelivered{
samDBValue: v,
delivered: make(chan struct{}),
delivered: func() {
delivredCh <- struct{}{}
},
}
outCh <- sd
@ -222,7 +227,7 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
// the read process have stalled or not.
// For now it will not do anything,
select {
case <-sd.delivered:
case <-delivredCh:
// OK.
case <-time.After(time.Second * 5):
// Testing with a timeout here to figure out if messages are stuck

View file

@ -288,7 +288,7 @@ func (s *server) Start() {
// Adding a safety function here so we can make sure that all processes
// are stopped after a given time if the context cancelation below hangs.
defer func() {
func() {
time.Sleep(time.Second * 0)
log.Printf("error: doing a non graceful shutdown of all processes..\n")
os.Exit(1)
@ -354,9 +354,12 @@ func createErrorMsgContent(FromNode Node, theError error) subjectAndMessage {
return sam
}
// Contains the sam value as it is used in the state DB, and also a
// delivered function to be called when this message is picked up, so
// we can control if messages gets stale at some point.
type samDBValueAndDelivered struct {
samDBValue samDBValue
delivered chan struct{}
delivered func()
}
// routeMessagesToProcess takes a database name and an input channel as
@ -401,7 +404,7 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject
go func() {
for samTmp := range ringBufferOutCh {
samTmp.delivered <- struct{}{}
samTmp.delivered()
sam := samTmp.samDBValue.Data
// Check if the format of the message is correct.