1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: bring more clarity when replayer fails (#2933)

This commit is contained in:
Roman Gershman 2024-04-19 13:49:32 +03:00 committed by GitHub
parent e08cf3a4ee
commit c42b3dc02f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 19 additions and 1 deletions

View file

@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"log"
"math"
"os"
"sync"
@ -86,7 +87,7 @@ func NewClient(w *FileWorker) *ClientWorker {
func (w *FileWorker) Run(file string, wg *sync.WaitGroup) {
clients := make(map[uint32]*ClientWorker, 0)
parseRecords(file, func(r Record) bool {
err := parseRecords(file, func(r Record) bool {
client, ok := clients[r.Client]
if !ok {
client = NewClient(w)
@ -98,6 +99,10 @@ func (w *FileWorker) Run(file string, wg *sync.WaitGroup) {
return true
})
if err != nil {
log.Fatalf("Could not parse records!")
}
for _, client := range clients {
close(client.incoming)
}

View file

@ -3,7 +3,9 @@ package main
import (
"bufio"
"encoding/binary"
"errors"
"io"
"log"
"os"
)
@ -33,6 +35,14 @@ func parseStrings(file io.Reader) (out []interface{}, err error) {
if err != nil {
return nil, err
}
if strLen > 100000000 {
log.Printf("Bad string length %v, index %v out of %v", strLen, i, num)
for j := 0; j < i; j++ {
log.Printf("Str %v %v", j, out[j])
}
return nil, errors.New("failed to parse a string len ")
}
out[i] = kBigEmptyBytes[:strLen]
continue
}
@ -56,6 +66,7 @@ func parseRecords(filename string, cb func(Record) bool) error {
defer file.Close()
reader := bufio.NewReader(file)
recordNum := 0
for {
var rec Record
err := binary.Read(reader, binary.LittleEndian, &rec.RecordHeader)
@ -68,12 +79,14 @@ func parseRecords(filename string, cb func(Record) bool) error {
rec.values, err = parseStrings(reader)
if err != nil {
log.Printf("Could not parse %vth record", recordNum)
return err
}
if !cb(rec) {
return nil
}
recordNum++
}
return nil