From 3cb0c9f5c9e471b2a05f8b90b0c83ca4c4ba5e1f Mon Sep 17 00:00:00 2001 From: postmannen Date: Tue, 16 Feb 2021 04:57:54 +0100 Subject: [PATCH] kv store for index and messages pu in --- commandOrEventType.go | 6 +-- getmessagefromfile.go | 2 +- incommmingBuffer.db | Bin 32768 -> 65536 bytes methodType.go | 4 +- publisher.go | 3 -- ringbuffer.go | 107 +++++++++++++++++++++--------------------- 6 files changed, 58 insertions(+), 64 deletions(-) diff --git a/commandOrEventType.go b/commandOrEventType.go index f77f832..1ec6540 100644 --- a/commandOrEventType.go +++ b/commandOrEventType.go @@ -6,8 +6,6 @@ package steward -import "fmt" - // CommandOrEvent describes on the message level if this is // an event or command kind of message in the Subject name. // This field is mainly used to be able to spawn up different @@ -54,10 +52,10 @@ type CommandOrEventAvailable struct { func (co CommandOrEventAvailable) CheckIfExists(c CommandOrEvent) bool { _, ok := co.topics[c] if ok { - fmt.Printf("******THE TOPIC EXISTS: %v******\n", c) + // fmt.Printf("******THE TOPIC EXISTS: %v******\n", c) return true } else { - fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", c) + // fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", c) return false } } diff --git a/getmessagefromfile.go b/getmessagefromfile.go index 1f31c8a..9675b85 100644 --- a/getmessagefromfile.go +++ b/getmessagefromfile.go @@ -136,7 +136,7 @@ func fileWatcherStart(directoryToCheck string, fileUpdated chan bool) { select { case event := <-watcher.Events: if event.Op&fsnotify.Write == fsnotify.Write { - log.Println("info: inmsg.txt file updated, processing input: ", event.Name) + // log.Println("info: inmsg.txt file updated, processing input: ", event.Name) //testing with an update chan to get updates fileUpdated <- true } diff --git a/incommmingBuffer.db b/incommmingBuffer.db index f35d5f1ca55a3efe75ff99c4d7c7aa9c5f79a1c5..39fdd07d2dced91344c70ef0ce99d7f4015d7cd1 100644 GIT binary patch literal 65536 zcmeI)O=}%h7{Kv!CbvmiL#Z33s1RpmAxSf7+BBgHtx?FL3My`D7B_c>Bzj*Yb0ftP z&{$t8xDfmXf@>GzQe3(43kZG!1+#SHW_+GG=d_cdK~QXyJO6>1GiT1co!?EN&zxt* z7&oFMwcV*AfB*srAb& z<}VZ2Cr@Q&|YVFdkx4Ah4ablu}C zd1#$DVLK}>#xZ}1l|9SvelT2@-U%VpOS|P)%x$X`cLw5LACvBNj9j~KR9!^ zzW&(#IwSmy@E@bpiia}7pUmHm*H+_|WPT>r1eSQ^cI@+3bhR&;C-F*Lrgy|MvCr@D zNYdapNK>!OLn-&YpZXE+X+xR&qGubHsVJ56ZKv^yTsSQ+d^6VU5INOsM;7-oMB?>- zaow3XB(LaAx>S;QZ*MP2e0P&Zba%n^cA*iSz2rK3sS%xBbe&ynL}xF%&R%XrXRo-< zUTH*Umt1F;8qwLSuCrGg(b;9!+2uxb)(0yT2sB6_jQ2-If@nW|E2YNUA^tyKi~sM` z;{T4s|JC^X`C9z{OfCNZt5*M|7XN=&i~ql@#sB?E{7<}3yidG8((yh4u)cvbicfL~ zfJgP0B>*1PUzPxPRDap<-+xL35NL=%$n$@+@Bb&^4S)9$fsm9CeuL1;`~S5M74ZIl z_$b4P{P04``~TseI3)rIAb6sr#GKO9;)#J0u1pLpU_~Ip0f<40Dxn2d}!1~&3cGT>ro0!y=OmMnPBKgnT$+!4Sb TD8K-534~-|VBV})@LV4N*z_|< diff --git a/methodType.go b/methodType.go index 1571a99..748b49e 100644 --- a/methodType.go +++ b/methodType.go @@ -46,10 +46,10 @@ type MethodsAvailable struct { func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) { mFunc, ok := ma.topics[m] if ok { - fmt.Printf("******THE TOPIC EXISTS: %v******\n", m) + // fmt.Printf("******THE TOPIC EXISTS: %v******\n", m) return mFunc, true } else { - fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", m) + // fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", m) return nil, false } } diff --git a/publisher.go b/publisher.go index 97b9613..582551b 100644 --- a/publisher.go +++ b/publisher.go @@ -147,11 +147,8 @@ func (s *server) handleMessagesToPublish() { // pipe requested by operator, and fill them into the buffer. go func() { for samSlice := range s.inputFromFileCh { - fmt.Println("***** DEBUG ranging samSlice") for _, sam := range samSlice { - fmt.Println("***** DEBUG putting on channel") inCh <- sam - fmt.Println("***** DEBUG done putting on channel") } } close(inCh) diff --git a/ringbuffer.go b/ringbuffer.go index f036dcd..202ba48 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -10,8 +10,7 @@ package steward import ( - "bytes" - "encoding/gob" + "encoding/json" "fmt" "log" "strconv" @@ -29,15 +28,15 @@ type samDBValue struct { // ringBuffer holds the data of the buffer, type ringBuffer struct { - bufData chan subjectAndMessage - db *bolt.DB + bufData chan subjectAndMessage + db *bolt.DB + totalMessagesIndex int } // newringBuffer is a push/pop storage for values. func newringBuffer(size int) *ringBuffer { db, err := bolt.Open("./incommmingBuffer.db", 0600, nil) if err != nil { - // TODO: error handling log.Printf("error: failed to open db: %v\n", err) } return &ringBuffer{ @@ -53,49 +52,44 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan subjectAndMes // Starting both writing and reading in separate go routines so we // can write and read concurrently. - const samValueBucket string = "samValues" + const samValueBucket string = "samValueBucket" + const indexValueBucket string = "indexValueBucket" - i := 0 + r.totalMessagesIndex = r.getIndexValue(indexValueBucket) // Fill the buffer when new data arrives go func() { + // Check for incomming messages. These are typically comming from + // the go routine who reads inmsg.txt. for v := range inCh { - r.bufData <- v - fmt.Printf("**BUFFER** DEBUG PUSHED ON BUFFER: value = %v\n\n", v) + // --- Store the incomming message in the k/v store --- - iv := strconv.Itoa(i) + // Create a structure for JSON marshaling. samV := samDBValue{ - ID: i, + ID: r.totalMessagesIndex, Data: v, } - svGob, err := gobEncodeSamValue(samV) + js, err := json.Marshal(samV) if err != nil { log.Printf("error: gob encoding samValue: %v\n", err) } - // Also store the incomming message in key/value store - err = r.dbUpdate(r.db, samValueBucket, iv, svGob) + // Store the incomming message in key/value store + err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(r.totalMessagesIndex), js) if err != nil { // TODO: Handle error log.Printf("error: dbUpdate samValue failed: %v\n", err) } - retreivedGob, err := r.dbView(r.db, samValueBucket, iv) - if err != nil { - // TODO: Handle error - log.Printf("error: dbView retreival samValue failed: %v\n", err) - } + // Send the message to some process to consume it. + r.bufData <- v - retreived, err := gobDecodeSamValue(retreivedGob) - if err != nil { - // TODO: Handle error - log.Printf("error: dbView gobDecode retreival samValue failed: %v\n", err) - } - - fmt.Printf("*************** INFO: dbView, key: %v, got value: %v\n ", iv, retreived) - - i++ + // Increment index, and store the new value to the database. + r.totalMessagesIndex++ + fmt.Printf("*** NEXT INDEX NUMBER INCREMENTED: %v\n", r.totalMessagesIndex) + fmt.Println("---------------------------------------------------------") + r.dbUpdate(r.db, indexValueBucket, "index", []byte(strconv.Itoa(r.totalMessagesIndex))) } // When done close the buffer channel @@ -106,22 +100,52 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan subjectAndMes go func() { for v := range r.bufData { outCh <- v + + // TODO: Delete the messages here. The SAM handled here, do + // not contain the totalMessageID, so we might need to change + // the struct we pass around. + // IDEA: Add a go routine for each message handled here, and include + // a done channel in the structure, so a go routine handling the + // message will be able to signal back here that the message have + // been processed, and that we then can delete it out of the K/V Store. } close(outCh) }() } +func (r *ringBuffer) getIndexValue(indexBucket string) int { + const indexKey string = "index" + indexB, err := r.dbView(r.db, indexBucket, indexKey) + if err != nil { + log.Printf("error: getIndexValue: dbView: %v\n", err) + } + + index, err := strconv.Atoi(string(indexB)) + if err != nil { + log.Printf("error: getIndexValue: strconv.Atoi : %v\n", err) + } + + fmt.Printf("**** RETURNING INDEX, WITH VALUE = %v\n", index) + + return index +} + func (r *ringBuffer) dbView(db *bolt.DB, bucket string, key string) ([]byte, error) { var value []byte //View is a help function to get values out of the database. err := db.View(func(tx *bolt.Tx) error { //Open a bucket to get key's and values from. bu := tx.Bucket([]byte(bucket)) + if bu == nil { + log.Printf("info: no such bucket exist: %v\n", bucket) + return nil + } v := bu.Get([]byte(key)) if len(v) == 0 { - return fmt.Errorf("info: view: key not found") + log.Printf("info: view: key not found\n") + return nil } value = v @@ -153,28 +177,3 @@ func (r *ringBuffer) dbUpdate(db *bolt.DB, bucket string, key string, value []by }) return err } - -func gobEncodeSamValue(samValue samDBValue) ([]byte, error) { - var buf bytes.Buffer - gobEnc := gob.NewEncoder(&buf) - err := gobEnc.Encode(samValue) - if err != nil { - return nil, fmt.Errorf("error: gob.Encode failed: %v", err) - } - - return buf.Bytes(), nil -} - -func gobDecodeSamValue(b []byte) (samDBValue, error) { - sv := samDBValue{} - - buf := bytes.NewBuffer(b) - gobDec := gob.NewDecoder(buf) - err := gobDec.Decode(&sv) - if err != nil { - log.Printf("error: gob decoding failed: %v\n", err) - return samDBValue{}, err - } - - return sv, nil -}