1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: simple traffic logger (#2378)

* feat: simple traffic logger

Controls: 
```
DEBUG TRAFFIC <base_path> | [STOP]
```
---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Signed-off-by: Roman Gershman <roman@dragonflydb.io>
Co-authored-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Vladislav 2024-01-10 15:56:56 +03:00 committed by GitHub
parent e9453e62e4
commit f4ea42f2f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 563 additions and 2 deletions

View file

@ -1,3 +1,6 @@
go 1.20
use ./contrib/charts/dragonfly
use (
./contrib/charts/dragonfly
./tools/replay
)

View file

@ -13,8 +13,10 @@
#include "absl/strings/str_cat.h"
#include "base/flags.h"
#include "base/io_buf.h"
#include "base/logging.h"
#include "core/heap_size.h"
#include "core/uring.h"
#include "facade/conn_context.h"
#include "facade/dragonfly_listener.h"
#include "facade/memcache_parser.h"
@ -100,6 +102,129 @@ void UpdateIoBufCapacity(const base::IoBuf& io_buf, ConnectionStats* stats,
}
}
struct TrafficLogger {
// protects agains closing the file while writing or data races when opening the file.
// Also, makes sure that LogTraffic are executed atomically.
fb2::Mutex mutex;
unique_ptr<io::WriteFile> log_file;
void ResetLocked();
// Returns true if Write succeeded, false if it failed and the recording should be aborted.
bool Write(string_view blob);
bool Write(iovec* blobs, size_t len);
};
void TrafficLogger::ResetLocked() {
if (log_file) {
log_file->Close();
log_file.reset();
}
}
// Returns true if Write succeeded, false if it failed and the recording should be aborted.
bool TrafficLogger::Write(string_view blob) {
auto ec = log_file->Write(io::Buffer(blob));
if (ec) {
LOG(ERROR) << "Error writing to traffic log: " << ec;
ResetLocked();
return false;
}
return true;
}
bool TrafficLogger::Write(iovec* blobs, size_t len) {
auto ec = log_file->Write(blobs, len);
if (ec) {
LOG(ERROR) << "Error writing to traffic log: " << ec;
ResetLocked();
return false;
}
return true;
}
thread_local TrafficLogger tl_traffic_logger{}; // nullopt while disabled
void OpenTrafficLogger(string_view base_path) {
unique_lock lk{tl_traffic_logger.mutex};
if (tl_traffic_logger.log_file)
return;
// Open file with append mode, without it concurrent fiber writes seem to conflict
string path = absl::StrCat(
base_path, "-", absl::Dec(ProactorBase::me()->GetPoolIndex(), absl::kZeroPad3), ".bin");
auto file = util::fb2::OpenWrite(path, io::WriteFile::Options{/*.append = */ false});
if (!file) {
LOG(ERROR) << "Error opening a file " << path << " for traffic logging: " << file.error();
return;
}
tl_traffic_logger.log_file = unique_ptr<io::WriteFile>{file.value()};
}
void LogTraffic(uint32_t id, bool has_more, absl::Span<RespExpr> resp) {
string_view cmd = resp.front().GetView();
if (absl::EqualsIgnoreCase(cmd, "debug"sv))
return;
DVLOG(2) << "Recording " << cmd;
char stack_buf[1024];
char* next = stack_buf;
// We write id, timestamp, has_more, num_parts, part_len, part_len, part_len, ...
// And then all the part blobs concatenated together.
auto write_u32 = [&next](uint32_t i) {
absl::little_endian::Store32(next, i);
next += 4;
};
write_u32(id);
absl::little_endian::Store64(next, absl::GetCurrentTimeNanos());
next += 8;
write_u32(has_more ? 1 : 0);
write_u32(uint32_t(resp.size()));
// Grab the lock and check if the file is still open.
lock_guard lk{tl_traffic_logger.mutex};
if (!tl_traffic_logger.log_file)
return;
// Proceed with writing the blob lengths.
for (auto part : resp) {
if (size_t(next - stack_buf + 4) > sizeof(stack_buf)) {
if (!tl_traffic_logger.Write(string_view{stack_buf, size_t(next - stack_buf)})) {
return;
}
next = stack_buf;
}
write_u32(part.GetView().size());
}
// Write the data itself.
std::array<iovec, 16> blobs;
unsigned index = 0;
if (next != stack_buf) {
blobs[index++] = iovec{.iov_base = stack_buf, .iov_len = size_t(next - stack_buf)};
}
for (auto part : resp) {
blobs[index++] = iovec{.iov_base = const_cast<char*>(part.GetView().data()),
.iov_len = part.GetView().size()};
if (index >= blobs.size()) {
if (!tl_traffic_logger.Write(blobs.data(), blobs.size())) {
return;
}
index = 0;
}
}
if (index) {
tl_traffic_logger.Write(blobs.data(), index);
}
}
constexpr size_t kMinReadSize = 256;
thread_local uint32_t free_req_release_weight = 0;
@ -691,6 +816,11 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
void Connection::DispatchCommand(uint32_t consumed, mi_heap_t* heap) {
bool can_dispatch_sync = (consumed >= io_buf_.InputLen());
if (tl_traffic_logger.log_file) {
// Log command as soon as we receive it
LogTraffic(id_, !can_dispatch_sync, absl::MakeSpan(tmp_parse_args_));
}
// Avoid sync dispatch if an async dispatch is already in progress, or else they'll interleave.
if (cc_->async_dispatch)
can_dispatch_sync = false;
@ -1397,6 +1527,15 @@ bool Connection::IsTrackingOn() const {
return tracking_enabled_;
}
void Connection::StartTrafficLogging(string_view path) {
OpenTrafficLogger(path);
}
void Connection::StopTrafficLogging() {
lock_guard lk(tl_traffic_logger.mutex);
tl_traffic_logger.ResetLocked();
}
Connection::MemoryUsage Connection::GetMemoryUsage() const {
size_t mem = sizeof(*this) + dfly::HeapSize(dispatch_q_) + dfly::HeapSize(name_) +
dfly::HeapSize(tmp_parse_args_) + dfly::HeapSize(tmp_cmd_vec_) +

View file

@ -276,6 +276,14 @@ class Connection : public util::Connection {
bool IsTrackingOn() const;
// Starts traffic logging in the calling thread. Must be a proactor thread.
// Each thread creates its own log file combining requests from all the connections in
// that thread. A noop if the thread is already logging.
static void StartTrafficLogging(std::string_view base_path);
// Stops traffic logging in this thread. A noop if the thread is not logging.
static void StopTrafficLogging();
protected:
void OnShutdown() override;
void OnPreMigrateThread() override;

View file

@ -35,11 +35,15 @@ class RespExpr {
return Buffer{reinterpret_cast<uint8_t*>(s->data()), s->size()};
}
std::string GetString() const {
std::string_view GetView() const {
Buffer buffer = GetBuf();
return {reinterpret_cast<const char*>(buffer.data()), buffer.size()};
}
std::string GetString() const {
return std::string(GetView());
}
Buffer GetBuf() const {
return std::get<Buffer>(u);
}

View file

@ -255,6 +255,9 @@ void DebugCmd::Run(CmdArgList args) {
" Prints memory usage and key stats per shard, as well as min/max indicators.",
"TX",
" Performs transaction analysis per shard.",
"TRAFFIC <path> | [STOP]"
" Starts traffic logging to the specified path. If path is not specified,"
" traffic logging is stopped.",
"HELP",
" Prints this help.",
};
@ -308,10 +311,20 @@ void DebugCmd::Run(CmdArgList args) {
if (subcmd == "EXEC") {
return Exec();
}
if (subcmd == "TRAFFIC") {
return LogTraffic(args.subspan(1));
}
string reply = UnknownSubCmd(subcmd, "DEBUG");
return cntx_->SendError(reply, kSyntaxErrType);
}
void DebugCmd::Shutdown() {
// disable traffic logging
shard_set->pool()->AwaitFiberOnAll([](auto*) { facade::Connection::StopTrafficLogging(); });
}
void DebugCmd::Reload(CmdArgList args) {
bool save = true;
@ -590,6 +603,24 @@ void DebugCmd::Exec() {
rb->SendVerbatimString(res);
}
void DebugCmd::LogTraffic(CmdArgList args) {
optional<string> path;
if (args.size() == 1 && absl::AsciiStrToUpper(facade::ToSV(args.front())) != "STOP"sv) {
path = ArgS(args, 0);
LOG(INFO) << "Logging to traffic to " << *path << "*.bin";
} else {
LOG(INFO) << "Traffic logging stopped";
}
shard_set->pool()->AwaitFiberOnAll([path](auto*) {
if (path)
facade::Connection::StartTrafficLogging(*path);
else
facade::Connection::StopTrafficLogging();
});
cntx_->SendOk();
}
void DebugCmd::Inspect(string_view key) {
EngineShardSet& ess = *shard_set;
ShardId sid = Shard(key, ess.size());

View file

@ -28,6 +28,8 @@ class DebugCmd {
void Run(CmdArgList args);
static void Shutdown();
private:
void Populate(CmdArgList args);
std::optional<PopulateOptions> ParsePopulateArgs(CmdArgList args);
@ -43,6 +45,7 @@ class DebugCmd {
void ObjHist();
void Stacktrace();
void Shards();
void LogTraffic(CmdArgList);
ServerFamily& sf_;
ConnectionContext* cntx_;

View file

@ -565,6 +565,7 @@ void ServerFamily::Shutdown() {
}
dfly_cmd_->Shutdown();
DebugCmd::Shutdown();
});
}

22
tools/replay/go.mod Normal file
View file

@ -0,0 +1,22 @@
module dragonfydb.io/traffic-replay
go 1.20
require (
atomicgo.dev/cursor v0.2.0 // indirect
atomicgo.dev/keyboard v0.2.9 // indirect
atomicgo.dev/schedule v0.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/console v1.0.3 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/gookit/color v1.5.4 // indirect
github.com/lithammer/fuzzysearch v1.1.8 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/pterm/pterm v0.12.74 // indirect
github.com/redis/go-redis/v9 v9.4.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
)

109
tools/replay/go.sum Normal file
View file

@ -0,0 +1,109 @@
atomicgo.dev/cursor v0.2.0 h1:H6XN5alUJ52FZZUkI7AlJbUc1aW38GWZalpYRPpoPOw=
atomicgo.dev/cursor v0.2.0/go.mod h1:Lr4ZJB3U7DfPPOkbH7/6TOtJ4vFGHlgj1nc+n900IpU=
atomicgo.dev/keyboard v0.2.9 h1:tOsIid3nlPLZ3lwgG8KZMp/SFmr7P0ssEN5JUsm78K8=
atomicgo.dev/keyboard v0.2.9/go.mod h1:BC4w9g00XkxH/f1HXhW2sXmJFOCWbKn9xrOunSFtExQ=
atomicgo.dev/schedule v0.1.0 h1:nTthAbhZS5YZmgYbb2+DH8uQIZcTlIrd4eYr3UQxEjs=
atomicgo.dev/schedule v0.1.0/go.mod h1:xeUa3oAkiuHYh8bKiQBRojqAMq3PXXbJujjb0hw8pEU=
github.com/MarvinJWendt/testza v0.1.0/go.mod h1:7AxNvlfeHP7Z/hDQ5JtE3OKYT3XFUeLCDE2DQninSqs=
github.com/MarvinJWendt/testza v0.2.1/go.mod h1:God7bhG8n6uQxwdScay+gjm9/LnO4D3kkcZX4hv9Rp8=
github.com/MarvinJWendt/testza v0.2.8/go.mod h1:nwIcjmr0Zz+Rcwfh3/4UhBp7ePKVhuBExvZqnKYWlII=
github.com/MarvinJWendt/testza v0.2.10/go.mod h1:pd+VWsoGUiFtq+hRKSU1Bktnn+DMCSrDrXDpX2bG66k=
github.com/MarvinJWendt/testza v0.2.12/go.mod h1:JOIegYyV7rX+7VZ9r77L/eH6CfJHHzXjB69adAhzZkI=
github.com/MarvinJWendt/testza v0.3.0/go.mod h1:eFcL4I0idjtIx8P9C6KkAuLgATNKpX4/2oUqKc6bF2c=
github.com/MarvinJWendt/testza v0.4.2/go.mod h1:mSdhXiKH8sg/gQehJ63bINcCKp7RtYewEjXsvsVUPbE=
github.com/atomicgo/cursor v0.0.1/go.mod h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/containerd/console v1.0.3 h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw=
github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/gookit/color v1.4.2/go.mod h1:fqRyamkC1W8uxl+lxCQxOT09l/vYfZ+QeiX3rKQHCoQ=
github.com/gookit/color v1.5.0/go.mod h1:43aQb+Zerm/BWh2GnrgOQm7ffz7tvQXEKV6BFMl7wAo=
github.com/gookit/color v1.5.4 h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0=
github.com/gookit/color v1.5.4/go.mod h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/QJi9w=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.10/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/fuzzysearch v1.1.8 h1:/HIuJnjHuXS8bKaiTMeeDlW2/AyIWk2brx1V8LFgLN4=
github.com/lithammer/fuzzysearch v1.1.8/go.mod h1:IdqeyBClc3FFqSzYq/MXESsS4S0FsZ5ajtkr5xPLts4=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pterm/pterm v0.12.27/go.mod h1:PhQ89w4i95rhgE+xedAoqous6K9X+r6aSOI2eFF7DZI=
github.com/pterm/pterm v0.12.29/go.mod h1:WI3qxgvoQFFGKGjGnJR849gU0TsEOvKn5Q8LlY1U7lg=
github.com/pterm/pterm v0.12.30/go.mod h1:MOqLIyMOgmTDz9yorcYbcw+HsgoZo3BQfg2wtl3HEFE=
github.com/pterm/pterm v0.12.31/go.mod h1:32ZAWZVXD7ZfG0s8qqHXePte42kdz8ECtRyEejaWgXU=
github.com/pterm/pterm v0.12.33/go.mod h1:x+h2uL+n7CP/rel9+bImHD5lF3nM9vJj80k9ybiiTTE=
github.com/pterm/pterm v0.12.36/go.mod h1:NjiL09hFhT/vWjQHSj1athJpx6H8cjpHXNAK5bUw8T8=
github.com/pterm/pterm v0.12.40/go.mod h1:ffwPLwlbXxP+rxT0GsgDTzS3y3rmpAO1NMjUkGTYf8s=
github.com/pterm/pterm v0.12.74 h1:fPsds9KisCyJh4NyY6bv8QJt3FLHceb5DxI6W0An9cc=
github.com/pterm/pterm v0.12.74/go.mod h1:+M33aZWQVpmLmLbvjykyGZ4gAfeebznRo8JMbabaxQU=
github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk=
github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs=
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no=
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211013075003-97ac67df715c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

161
tools/replay/main.go Normal file
View file

@ -0,0 +1,161 @@
package main
import (
"context"
"flag"
"fmt"
"math"
"os"
"sync"
"sync/atomic"
"time"
"github.com/pterm/pterm"
"github.com/redis/go-redis/v9"
)
var fHost = flag.String("host", "127.0.0.1:6379", "Redis host")
var fClientBuffer = flag.Int("buffer", 100, "How many records to buffer per client")
type RecordHeader struct {
Client uint32
Time uint64
HasMore uint32
}
type Record struct {
RecordHeader
values []interface{} // instead of []string to unwrap into variadic
}
// Determine earliest time
func DetermineBaseTime(files []string) time.Time {
var minTime uint64 = math.MaxUint64
for _, file := range files {
parseRecords(file, func(r Record) bool {
if r.Time < minTime {
minTime = r.Time
}
return false
})
}
return time.Unix(0, int64(minTime))
}
// Handles a single connection/client
type ClientWorker struct {
redis *redis.Client
incoming chan Record
}
// Handles a single file and distributes messages to clients
type FileWorker struct {
clientGroup sync.WaitGroup
timeOffset time.Duration
// stats for output, updated by clients, read by rendering goroutine
processed atomic.Uint64
delayed atomic.Uint64
parsed atomic.Uint64
clients atomic.Uint64
}
func (c ClientWorker) Run(worker *FileWorker) {
for msg := range c.incoming {
lag := time.Until(worker.HappensAt(time.Unix(0, int64(msg.Time))))
if lag < 0 {
worker.delayed.Add(1)
}
time.Sleep(lag)
c.redis.Do(context.Background(), msg.values...).Result()
worker.processed.Add(1)
}
worker.clientGroup.Done()
}
func NewClient(w *FileWorker) *ClientWorker {
client := &ClientWorker{
redis: redis.NewClient(&redis.Options{Addr: *fHost, PoolSize: 1, DisableIndentity: true}),
incoming: make(chan Record, *fClientBuffer),
}
w.clients.Add(1)
w.clientGroup.Add(1)
go client.Run(w)
return client
}
func (w *FileWorker) Run(file string, wg *sync.WaitGroup) {
clients := make(map[uint32]*ClientWorker, 0)
parseRecords(file, func(r Record) bool {
client, ok := clients[r.Client]
if !ok {
client = NewClient(w)
clients[r.Client] = client
}
w.parsed.Add(1)
client.incoming <- r
return true
})
for _, client := range clients {
close(client.incoming)
}
w.clientGroup.Wait()
wg.Done()
}
func (w *FileWorker) HappensAt(recordTime time.Time) time.Time {
return recordTime.Add(w.timeOffset)
}
func RenderTable(area *pterm.AreaPrinter, files []string, workers []FileWorker) {
tableData := pterm.TableData{{"file", "parsed", "processed", "delayed", "clients"}}
for i := range workers {
tableData = append(tableData, []string{
files[i],
fmt.Sprint(workers[i].parsed.Load()),
fmt.Sprint(workers[i].processed.Load()),
fmt.Sprint(workers[i].delayed.Load()),
fmt.Sprint(workers[i].clients.Load()),
})
}
content, _ := pterm.DefaultTable.WithHasHeader().WithBoxed().WithData(tableData).Srender()
area.Update(content)
}
func main() {
flag.Parse()
files := os.Args[1:]
timeOffset := time.Now().Add(500 * time.Millisecond).Sub(DetermineBaseTime(files))
fmt.Println("Offset -> ", timeOffset)
// Start a worker for every file. They take care of spawning client workers.
var wg sync.WaitGroup
workers := make([]FileWorker, len(files))
for i := range workers {
workers[i] = FileWorker{timeOffset: timeOffset}
wg.Add(1)
go workers[i].Run(files[i], &wg)
}
wgDone := make(chan bool)
go func() {
wg.Wait()
wgDone <- true
}()
// Render table while running
area, _ := pterm.DefaultArea.WithCenter().Start()
for running := true; running; {
select {
case <-wgDone:
running = false
case <-time.After(100 * time.Millisecond):
RenderTable(area, files, workers)
}
}
RenderTable(area, files, workers) // to show last stats
}

80
tools/replay/parsing.go Normal file
View file

@ -0,0 +1,80 @@
package main
import (
"bufio"
"encoding/binary"
"io"
"os"
)
var kBigEmptyBytes = make([]byte, 100_000)
func parseStrings(file io.Reader) (out []interface{}, err error) {
var num, strLen uint32
err = binary.Read(file, binary.LittleEndian, &num)
if err != nil {
return nil, err
}
out = make([]interface{}, num)
for i := range out {
err = binary.Read(file, binary.LittleEndian, &strLen)
if err != nil {
return nil, err
}
out[i] = strLen
}
for i := range out {
strLen = out[i].(uint32)
if strLen == 0 {
err = binary.Read(file, binary.LittleEndian, &strLen)
if err != nil {
return nil, err
}
out[i] = kBigEmptyBytes[:strLen]
continue
}
buf := make([]byte, strLen)
_, err := io.ReadFull(file, buf)
if err != nil {
return nil, err
}
out[i] = string(buf)
}
return
}
func parseRecords(filename string, cb func(Record) bool) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close()
reader := bufio.NewReader(file)
for {
var rec Record
err := binary.Read(reader, binary.LittleEndian, &rec.RecordHeader)
if err != nil {
if err == io.EOF {
break
}
return err
}
rec.values, err = parseStrings(reader)
if err != nil {
return err
}
if !cb(rec) {
return nil
}
}
return nil
}