mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
84a697dd75
Add run, print, analyze commands to traffic logger; add support for pipelines
178 lines
3.7 KiB
Go
178 lines
3.7 KiB
Go
package main
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pterm/pterm"
|
|
)
|
|
|
|
var fHost = flag.String("host", "127.0.0.1:6379", "Redis host")
|
|
var fClientBuffer = flag.Int("buffer", 100, "How many records to buffer per client")
|
|
|
|
func RenderTable(area *pterm.AreaPrinter, files []string, workers []FileWorker) {
|
|
tableData := pterm.TableData{{"file", "parsed", "processed", "delayed", "clients"}}
|
|
for i := range workers {
|
|
tableData = append(tableData, []string{
|
|
files[i],
|
|
fmt.Sprint(atomic.LoadUint64(&workers[i].parsed)),
|
|
fmt.Sprint(atomic.LoadUint64(&workers[i].processed)),
|
|
fmt.Sprint(atomic.LoadUint64(&workers[i].delayed)),
|
|
fmt.Sprint(atomic.LoadUint64(&workers[i].clients)),
|
|
})
|
|
}
|
|
content, _ := pterm.DefaultTable.WithHasHeader().WithBoxed().WithData(tableData).Srender()
|
|
area.Update(content)
|
|
}
|
|
|
|
func Run(files []string) {
|
|
timeOffset := time.Now().Add(500 * time.Millisecond).Sub(DetermineBaseTime(files))
|
|
fmt.Println("Offset -> ", timeOffset)
|
|
|
|
// Start a worker for every file. They take care of spawning client workers.
|
|
var wg sync.WaitGroup
|
|
workers := make([]FileWorker, len(files))
|
|
for i := range workers {
|
|
workers[i] = FileWorker{timeOffset: timeOffset}
|
|
wg.Add(1)
|
|
go workers[i].Run(files[i], &wg)
|
|
}
|
|
|
|
wgDone := make(chan bool)
|
|
go func() {
|
|
wg.Wait()
|
|
wgDone <- true
|
|
}()
|
|
|
|
// Render table while running
|
|
area, _ := pterm.DefaultArea.WithCenter().Start()
|
|
for running := true; running; {
|
|
select {
|
|
case <-wgDone:
|
|
running = false
|
|
case <-time.After(100 * time.Millisecond):
|
|
RenderTable(area, files, workers)
|
|
}
|
|
}
|
|
|
|
RenderTable(area, files, workers) // to show last stats
|
|
}
|
|
|
|
func Print(files []string) {
|
|
type StreamTop struct {
|
|
record Record
|
|
ch chan Record
|
|
}
|
|
|
|
// Start file reader goroutines
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(files))
|
|
|
|
tops := make([]StreamTop, len(files))
|
|
for i, file := range files {
|
|
tops[i].ch = make(chan Record, 100)
|
|
go func(ch chan Record, file string) {
|
|
parseRecords(file, func(r Record) bool {
|
|
ch <- r
|
|
return true
|
|
})
|
|
close(ch)
|
|
wg.Done()
|
|
}(tops[i].ch, file)
|
|
}
|
|
|
|
// Pick record with minimum time from each channel
|
|
for {
|
|
minTime := ^uint64(0)
|
|
minIndex := -1
|
|
for i := range tops {
|
|
if tops[i].record.Time == 0 {
|
|
if r, ok := <-tops[i].ch; ok {
|
|
tops[i].record = r
|
|
}
|
|
}
|
|
|
|
if rt := tops[i].record.Time; rt > 0 && rt < minTime {
|
|
minTime = rt
|
|
minIndex = i
|
|
}
|
|
}
|
|
|
|
if minIndex == -1 {
|
|
break
|
|
}
|
|
|
|
fmt.Println(tops[minIndex].record.values...)
|
|
tops[minIndex].record = Record{}
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func Analyze(files []string) {
|
|
total := 0
|
|
chained := 0
|
|
clients := 0
|
|
cmdCounts := make(map[string]uint)
|
|
|
|
// count stats
|
|
for _, file := range files {
|
|
fileClients := make(map[uint32]bool)
|
|
|
|
parseRecords(file, func(r Record) bool {
|
|
total += 1
|
|
if r.HasMore > 0 {
|
|
chained += 1
|
|
}
|
|
|
|
fileClients[r.Client] = true
|
|
cmdCounts[r.values[0].(string)] += 1
|
|
|
|
return true
|
|
})
|
|
|
|
clients += len(fileClients)
|
|
}
|
|
|
|
// sort commands by frequencies
|
|
type Freq struct {
|
|
cmd string
|
|
count uint
|
|
}
|
|
var sortedCmds []Freq
|
|
for cmd, count := range cmdCounts {
|
|
sortedCmds = append(sortedCmds, Freq{cmd, count})
|
|
}
|
|
sort.Slice(sortedCmds, func(i, j int) bool {
|
|
return sortedCmds[i].count > sortedCmds[j].count
|
|
})
|
|
|
|
// Print all the info
|
|
fmt.Println("Total commands", total)
|
|
fmt.Println("Has more%", 100*float32(chained)/float32(total))
|
|
fmt.Println("Total clients", clients)
|
|
|
|
for _, freq := range sortedCmds {
|
|
fmt.Printf("%8d | %v \n", freq.count, freq.cmd)
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
cmd := flag.Arg(0)
|
|
files := flag.Args()[1:]
|
|
|
|
switch strings.ToLower(cmd) {
|
|
case "run":
|
|
Run(files)
|
|
case "print":
|
|
Print(files)
|
|
case "analyze":
|
|
Analyze(files)
|
|
}
|
|
}
|