1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

implemented general store and get to kv store

This commit is contained in:
postmannen 2021-02-15 11:28:27 +01:00
parent fb02962231
commit 993f89e2c6
9 changed files with 155 additions and 79 deletions

View file

@ -57,7 +57,9 @@ func (e *errorKernel) startErrorKernel(errorCh chan errProcess) {
// the operator....or other ?
//
// Just print the error, and tell the process to continue
log.Printf("*** error_kernel: %#v, type=%T\n", er, er)
// log.Printf("*** error_kernel: %#v, type=%T\n", er, er)
log.Printf("TESTING, we received and error from the process, but we're telling the process back to continue\n")
er.errorActionCh <- errActionContinue
}()
}

View file

@ -6,13 +6,5 @@
"commandOrEvent":"command",
"method":"shellCommand"
},
{
"toNode": "ship1",
"data": ["bash","-c","ls -l ../"],
"commandOrEvent":"command",
"method":"shellCommand"
}
]

View file

@ -14,7 +14,7 @@ import (
// getMessagesFromFile will start a file watcher for the given directory
// and filename. It will take a channel of []byte as input, and it is
// in this channel the content of a file that has changed is returned.
func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []subjectAndMessage) {
func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, inputFromFileCh chan []subjectAndMessage) {
fileUpdated := make(chan bool)
go fileWatcherStart(directoryToCheck, fileUpdated)
@ -47,7 +47,7 @@ func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, f
}
// Send the data back to be consumed
fileContentCh <- js
inputFromFileCh <- js
}
}

1
go.mod
View file

@ -7,5 +7,6 @@ require (
github.com/golang/protobuf v1.4.3 // indirect
github.com/nats-io/nats-server/v2 v2.1.9 // indirect
github.com/nats-io/nats.go v1.10.0
go.etcd.io/bbolt v1.3.5
google.golang.org/protobuf v1.25.0 // indirect
)

4
go.sum
View file

@ -39,6 +39,8 @@ github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1t
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
@ -63,6 +65,8 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e h1:D5TXcfTk7xF7hvieo4QErS3qq
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9 h1:L2auWcuQIvxz9xSEqzESnV/QN/gNRXNApHi3fYwl2w0=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=

BIN
incommmingBuffer.db Normal file

Binary file not shown.

View file

@ -41,7 +41,7 @@ type server struct {
mu sync.Mutex
// The channel where we receive new messages from the outside to
// insert into the system for being processed
newMessagesCh chan []subjectAndMessage
inputFromFileCh chan []subjectAndMessage
// errorCh is used to report errors from a process
// NB: Implementing this as an int to report for testing
errorCh chan errProcess
@ -69,7 +69,7 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
nodeName: nodeName,
natsConn: conn,
processes: make(map[subjectName]process),
newMessagesCh: make(chan []subjectAndMessage),
inputFromFileCh: make(chan []subjectAndMessage),
errorCh: make(chan errProcess, 2),
logCh: make(chan []byte),
methodsAvailable: m.GetMethodsAvailable(),
@ -91,7 +91,7 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
// checking is also started here in Start by calling handleMessagesToPublish.
func (s *server) Start() {
// Start the checking the input file for new messages from operator.
go s.getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh)
go s.getMessagesFromFile("./", "inmsg.txt", s.inputFromFileCh)
// Start the textLogging service that will run on the subscribers
// TODO: Figure out how to structure event services like these
@ -146,7 +146,7 @@ func (s *server) handleMessagesToPublish() {
// Start reading new messages received on the incomming message
// pipe requested by operator, and fill them into the buffer.
go func() {
for samSlice := range s.newMessagesCh {
for samSlice := range s.inputFromFileCh {
fmt.Println("***** DEBUG ranging samSlice")
for _, sam := range samSlice {
fmt.Println("***** DEBUG putting on channel")
@ -355,7 +355,7 @@ func (s *server) processSpawnWorker(proc process) {
func messageDeliver(proc process, message Message, natsConn *nats.Conn) {
for {
dataPayload, err := gobEncodePayload(message)
dataPayload, err := gobEncodeMessage(message)
if err != nil {
log.Printf("error: createDataPayload: %v\n", err)
}
@ -404,12 +404,12 @@ func messageDeliver(proc process, message Message, natsConn *nats.Conn) {
// gobEncodePayload will encode the message structure along with its
// valued in gob binary format.
// TODO: Check if it adds value to compress with gzip.
func gobEncodePayload(m Message) ([]byte, error) {
func gobEncodeMessage(m Message) ([]byte, error) {
var buf bytes.Buffer
gobEnc := gob.NewEncoder(&buf)
err := gobEnc.Encode(m)
if err != nil {
return nil, fmt.Errorf("error: gob.Enode failed: %v", err)
return nil, fmt.Errorf("error: gob.Encode failed: %v", err)
}
return buf.Bytes(), nil

View file

@ -9,41 +9,172 @@
// set to true.
package steward
import "fmt"
import (
"bytes"
"encoding/gob"
"fmt"
"log"
"strconv"
bolt "go.etcd.io/bbolt"
)
// samValue represents one message with a subject. This
// struct type is used when storing and retreiving from
// db.
type samDBValue struct {
ID int
Data subjectAndMessage
}
// ringBuffer holds the data of the buffer,
type ringBuffer struct {
buf chan subjectAndMessage
bufData chan subjectAndMessage
db *bolt.DB
}
// newringBuffer is a push/pop storage for values.
func newringBuffer(size int) *ringBuffer {
db, err := bolt.Open("./incommmingBuffer.db", 0600, nil)
if err != nil {
// TODO: error handling
log.Printf("error: failed to open db: %v\n", err)
}
return &ringBuffer{
buf: make(chan subjectAndMessage, size),
bufData: make(chan subjectAndMessage, size),
db: db,
}
}
// start will process incomming messages through the inCh,
// put the messages on a buffered channel
// and deliver messages out when requested on the outCh.
func (s *ringBuffer) start(inCh chan subjectAndMessage, outCh chan subjectAndMessage) {
func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan subjectAndMessage) {
// Starting both writing and reading in separate go routines so we
// can write and read concurrently.
const samValueBucket string = "samValues"
i := 0
// Fill the buffer when new data arrives
go func() {
for v := range inCh {
s.buf <- v
r.bufData <- v
fmt.Printf("**BUFFER** DEBUG PUSHED ON BUFFER: value = %v\n\n", v)
iv := strconv.Itoa(i)
samV := samDBValue{
ID: i,
Data: v,
}
svGob, err := gobEncodeSamValue(samV)
if err != nil {
log.Printf("error: gob encoding samValue: %v\n", err)
}
// Also store the incomming message in key/value store
err = r.dbUpdate(r.db, samValueBucket, iv, svGob)
if err != nil {
// TODO: Handle error
log.Printf("error: dbUpdate samValue failed: %v\n", err)
}
retreivedGob, err := r.dbView(r.db, samValueBucket, iv)
if err != nil {
// TODO: Handle error
log.Printf("error: dbView retreival samValue failed: %v\n", err)
}
retreived, err := gobDecodeSamValue(retreivedGob)
if err != nil {
// TODO: Handle error
log.Printf("error: dbView gobDecode retreival samValue failed: %v\n", err)
}
fmt.Printf("*************** INFO: dbView, key: %v, got value: %v\n ", iv, retreived)
i++
}
close(s.buf)
// When done close the buffer channel
close(r.bufData)
}()
// Empty the buffer when data asked for
go func() {
for v := range s.buf {
for v := range r.bufData {
outCh <- v
}
close(outCh)
}()
}
func (r *ringBuffer) dbView(db *bolt.DB, bucket string, key string) ([]byte, error) {
var value []byte
//View is a help function to get values out of the database.
err := db.View(func(tx *bolt.Tx) error {
//Open a bucket to get key's and values from.
bu := tx.Bucket([]byte(bucket))
v := bu.Get([]byte(key))
if len(v) == 0 {
return fmt.Errorf("info: view: key not found")
}
value = v
return nil
})
return value, err
}
//dbUpdate will update the specified bucket with a key and value.
func (r *ringBuffer) dbUpdate(db *bolt.DB, bucket string, key string, value []byte) error {
err := db.Update(func(tx *bolt.Tx) error {
//Create a bucket
bu, err := tx.CreateBucketIfNotExists([]byte(bucket))
if err != nil {
return fmt.Errorf("error: CreateBuckerIfNotExists failed: %v", err)
}
//Put a value into the bucket.
if err := bu.Put([]byte(key), []byte(value)); err != nil {
return err
}
//If all was ok, we should return a nil for a commit to happen. Any error
// returned will do a rollback.
return nil
})
return err
}
func gobEncodeSamValue(samValue samDBValue) ([]byte, error) {
var buf bytes.Buffer
gobEnc := gob.NewEncoder(&buf)
err := gobEnc.Encode(samValue)
if err != nil {
return nil, fmt.Errorf("error: gob.Encode failed: %v", err)
}
return buf.Bytes(), nil
}
func gobDecodeSamValue(b []byte) (samDBValue, error) {
sv := samDBValue{}
buf := bytes.NewBuffer(b)
gobDec := gob.NewDecoder(buf)
err := gobDec.Decode(&sv)
if err != nil {
log.Printf("error: gob decoding failed: %v\n", err)
return samDBValue{}, err
}
return sv, nil
}

View file

@ -1,54 +0,0 @@
package main
import "fmt"
// ringBuffer holds the data of the buffer,
type ringBuffer struct {
data chan string
}
// newringBuffer is a push/pop storage for values.
func newringBuffer() *ringBuffer {
return &ringBuffer{
data: make(chan string, 10),
}
}
// startWithChannels
func (s *ringBuffer) startWithChannels(inCh chan string, outCh chan string) {
// Fill the buffer when new data arrives
go func() {
for v := range inCh {
s.data <- v
fmt.Printf("**BUFFER** DEBUG PUSHED ON BUFFER: value = %v\n\n", v)
}
close(s.data)
}()
go func() {
for v := range s.data {
outCh <- v
}
close(outCh)
}()
}
func main() {
rb := newringBuffer()
inCh := make(chan string)
outCh := make(chan string)
rb.startWithChannels(inCh, outCh)
inCh <- "apekatt"
inCh <- "hest"
inCh <- "ku"
close(inCh)
for v := range outCh {
fmt.Printf("got: %v\n", v)
}
}