mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-05 20:09:16 +00:00
renamed data to sam in ringbuffer
This commit is contained in:
parent
5865ad3701
commit
6f525162ea
2 changed files with 15 additions and 15 deletions
|
@ -28,8 +28,8 @@ import (
|
||||||
// struct type is used when storing and retreiving from
|
// struct type is used when storing and retreiving from
|
||||||
// db.
|
// db.
|
||||||
type samDBValue struct {
|
type samDBValue struct {
|
||||||
ID int
|
ID int
|
||||||
Data subjectAndMessage
|
SAM subjectAndMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
// ringBuffer holds the data of the buffer,
|
// ringBuffer holds the data of the buffer,
|
||||||
|
@ -208,8 +208,8 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage
|
||||||
|
|
||||||
// Create a structure for JSON marshaling.
|
// Create a structure for JSON marshaling.
|
||||||
samV := samDBValue{
|
samV := samDBValue{
|
||||||
ID: dbID,
|
ID: dbID,
|
||||||
Data: v,
|
SAM: v,
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.configuration.RingBufferPersistStore {
|
if r.configuration.RingBufferPersistStore {
|
||||||
|
@ -254,9 +254,9 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
|
||||||
// Range over the buffer of messages to pass on to processes.
|
// Range over the buffer of messages to pass on to processes.
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case v := <-r.bufData:
|
case samDBv := <-r.bufData:
|
||||||
r.metrics.promInMemoryBufferMessagesCurrent.Set(float64(len(r.bufData)))
|
r.metrics.promInMemoryBufferMessagesCurrent.Set(float64(len(r.bufData)))
|
||||||
v.Data.ID = v.ID
|
samDBv.SAM.ID = samDBv.ID
|
||||||
|
|
||||||
// Create a done channel per message. A process started by the
|
// Create a done channel per message. A process started by the
|
||||||
// spawnProcess function will handle incomming messages sequentaly.
|
// spawnProcess function will handle incomming messages sequentaly.
|
||||||
|
@ -278,11 +278,11 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
|
||||||
// that might be in use in the handler.
|
// that might be in use in the handler.
|
||||||
|
|
||||||
msgForPermStore := Message{}
|
msgForPermStore := Message{}
|
||||||
copier.Copy(&msgForPermStore, v.Data.Message)
|
copier.Copy(&msgForPermStore, v.SAM.Message)
|
||||||
// Remove the content of the data field.
|
// Remove the content of the data field.
|
||||||
msgForPermStore.Data = nil
|
msgForPermStore.Data = nil
|
||||||
|
|
||||||
v.Data.Message.done = make(chan struct{})
|
v.SAM.Message.done = make(chan struct{})
|
||||||
delivredCh := make(chan struct{})
|
delivredCh := make(chan struct{})
|
||||||
|
|
||||||
// Prepare the structure with the data, and a function that can
|
// Prepare the structure with the data, and a function that can
|
||||||
|
@ -294,7 +294,7 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
ticker := time.NewTicker(time.Duration(v.Data.ACKTimeout) * time.Duration(v.Data.Retries) * 2 * time.Second)
|
ticker := time.NewTicker(time.Duration(v.SAM.ACKTimeout) * time.Duration(v.SAM.Retries) * 2 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
outCh <- sd
|
outCh <- sd
|
||||||
|
@ -323,10 +323,10 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
|
||||||
//
|
//
|
||||||
// TODO: Figure out why what the reason for not receceving the done signals might be.
|
// TODO: Figure out why what the reason for not receceving the done signals might be.
|
||||||
select {
|
select {
|
||||||
case <-v.Data.done:
|
case <-v.SAM.done:
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
log.Printf("----------------------------------------------\n")
|
log.Printf("----------------------------------------------\n")
|
||||||
log.Printf("Error: ringBuffer message %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.Data.Subject)
|
log.Printf("Error: ringBuffer message %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.SAM.Subject)
|
||||||
log.Printf("----------------------------------------------\n")
|
log.Printf("----------------------------------------------\n")
|
||||||
}
|
}
|
||||||
// log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID)
|
// log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID)
|
||||||
|
@ -345,7 +345,7 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
|
||||||
}
|
}
|
||||||
r.permStore <- time.Now().Format("Mon Jan _2 15:04:05 2006") + ", " + string(js) + "\n"
|
r.permStore <- time.Now().Format("Mon Jan _2 15:04:05 2006") + ", " + string(js) + "\n"
|
||||||
|
|
||||||
}(v)
|
}(samDBv)
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
//close(outCh)
|
//close(outCh)
|
||||||
return
|
return
|
||||||
|
@ -414,8 +414,8 @@ func (r *ringBuffer) dumpBucket(bucket string) ([]samDBValue, error) {
|
||||||
return samDBValues[i].ID > samDBValues[j].ID
|
return samDBValues[i].ID > samDBValues[j].ID
|
||||||
})
|
})
|
||||||
|
|
||||||
for _, v := range samDBValues {
|
for _, samDBv := range samDBValues {
|
||||||
log.Printf("info: ringBuffer.dumpBucket: k/v store, kvID: %v, message.ID: %v, subject: %v, len(data): %v\n", v.ID, v.Data.ID, v.Data.Subject, len(v.Data.Data))
|
log.Printf("info: ringBuffer.dumpBucket: k/v store, kvID: %v, message.ID: %v, subject: %v, len(data): %v\n", samDBv.ID, samDBv.SAM.ID, samDBv.SAM.Subject, len(samDBv.SAM.Data))
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -447,7 +447,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
||||||
// Signal back to the ringbuffer that message have been picked up.
|
// Signal back to the ringbuffer that message have been picked up.
|
||||||
samDBVal.delivered()
|
samDBVal.delivered()
|
||||||
|
|
||||||
sam := samDBVal.samDBValue.Data
|
sam := samDBVal.samDBValue.SAM
|
||||||
// Check if the format of the message is correct.
|
// Check if the format of the message is correct.
|
||||||
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
|
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
|
||||||
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
|
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
|
||||||
|
|
Loading…
Reference in a new issue