mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
Small fixes
1. Remove CO::STALE modifier since it's not relevant for now. 2. Propertly wire CallFromScript function to be called from redis.call. 3. Define 3rd party lua dependency as part of dragonfly project. 4. Add ARGV/KEYS arrays to lua scripts
This commit is contained in:
parent
ce721ced90
commit
ab5031472e
13 changed files with 127 additions and 63 deletions
|
@ -18,6 +18,17 @@ option(BUILD_SHARED_LIBS "Build shared libraries" OFF)
|
|||
include(third_party)
|
||||
include(internal)
|
||||
|
||||
add_third_party(
|
||||
lua
|
||||
URL https://github.com/lua/lua/archive/refs/tags/v5.4.4.tar.gz
|
||||
PATCH_COMMAND patch -p1 -i "${CMAKE_CURRENT_SOURCE_DIR}/patches/lua-v5.4.4.patch"
|
||||
CONFIGURE_COMMAND echo
|
||||
BUILD_IN_SOURCE 1
|
||||
INSTALL_COMMAND cp <SOURCE_DIR>/liblua.a ${THIRD_PARTY_LIB_DIR}/lua/lib/
|
||||
COMMAND cp <SOURCE_DIR>/lualib.h <SOURCE_DIR>/lua.h <SOURCE_DIR>/lauxlib.h
|
||||
<SOURCE_DIR>/luaconf.h ${THIRD_PARTY_LIB_DIR}/lua/include
|
||||
)
|
||||
|
||||
Message(STATUS "THIRD_PARTY_LIB_DIR ${THIRD_PARTY_LIB_DIR}")
|
||||
|
||||
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
|
||||
|
|
|
@ -41,16 +41,11 @@ class RedisTranslator : public ObjectExplorer {
|
|||
|
||||
private:
|
||||
void ArrayPre() {
|
||||
/*if (!array_index_.empty()) {
|
||||
lua_pushnumber(lua_, array_index_.back());
|
||||
array_index_.back()++;
|
||||
}*/
|
||||
}
|
||||
|
||||
void ArrayPost() {
|
||||
if (!array_index_.empty()) {
|
||||
lua_rawseti(lua_, -2, array_index_.back()++); /* set table at key `i' */
|
||||
// lua_settable(lua_, -3);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -137,6 +132,30 @@ void Require(lua_State* lua, const char* name, lua_CFunction openf) {
|
|||
lua_pop(lua, 1); /* remove lib */
|
||||
}
|
||||
|
||||
string_view TopSv(lua_State* lua) {
|
||||
return string_view{lua_tostring(lua, -1), lua_rawlen(lua, -1)};
|
||||
}
|
||||
|
||||
optional<int> FetchKey(lua_State* lua, const char* key) {
|
||||
lua_pushstring(lua, key);
|
||||
int type = lua_gettable(lua, -2);
|
||||
if (type == LUA_TNIL) {
|
||||
lua_pop(lua, 1);
|
||||
return nullopt;
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
void SetGlobalArrayInternal(lua_State* lua, const char* name, Interpreter::MutSliceSpan args) {
|
||||
lua_newtable(lua);
|
||||
for (size_t j = 0; j < args.size(); j++) {
|
||||
lua_pushlstring(lua, args[j].data(), args[j].size());
|
||||
lua_rawseti(lua, -2, j + 1);
|
||||
}
|
||||
lua_setglobal(lua, name);
|
||||
}
|
||||
|
||||
#if 0
|
||||
/*
|
||||
* Save the give pointer on Lua registry, used to save the Lua context and
|
||||
* function context so we can retrieve them from lua_State.
|
||||
|
@ -169,6 +188,7 @@ void* GetFromRegistry(lua_State* lua, const char* name) {
|
|||
|
||||
return ptr;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* This function is used in order to push an error on the Lua stack in the
|
||||
* format used by redis.pcall to return errors, which is a lua table
|
||||
|
@ -270,20 +290,6 @@ void ToHex(const uint8_t* src, char* dest) {
|
|||
dest[40] = '\0';
|
||||
}
|
||||
|
||||
string_view TopSv(lua_State* lua) {
|
||||
return string_view{lua_tostring(lua, -1), lua_rawlen(lua, -1)};
|
||||
}
|
||||
|
||||
optional<int> FetchKey(lua_State* lua, const char* key) {
|
||||
lua_pushstring(lua, key);
|
||||
int type = lua_gettable(lua, -2);
|
||||
if (type == LUA_TNIL) {
|
||||
lua_pop(lua, 1);
|
||||
return nullopt;
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
int RedisSha1Command(lua_State* lua) {
|
||||
int argc = lua_gettop(lua);
|
||||
if (argc != 1) {
|
||||
|
@ -335,14 +341,16 @@ int RedisStatusReplyCommand(lua_State* lua) {
|
|||
return SingleFieldTable(lua, "ok");
|
||||
}
|
||||
|
||||
const char* kInstanceKey = "_INSTANCE";
|
||||
// const char* kInstanceKey = "_INSTANCE";
|
||||
|
||||
} // namespace
|
||||
|
||||
Interpreter::Interpreter() {
|
||||
lua_ = luaL_newstate();
|
||||
InitLua(lua_);
|
||||
SaveOnRegistry(lua_, kInstanceKey, this);
|
||||
void** ptr = static_cast<void**>(lua_getextraspace(lua_));
|
||||
*ptr = this;
|
||||
// SaveOnRegistry(lua_, kInstanceKey, this);
|
||||
|
||||
/* Register the redis commands table and fields */
|
||||
lua_newtable(lua_);
|
||||
|
@ -416,6 +424,10 @@ bool Interpreter::RunFunction(const char* f_id, std::string* error) {
|
|||
return err == 0;
|
||||
}
|
||||
|
||||
void Interpreter::SetGlobalArray(const char* name, MutSliceSpan args) {
|
||||
SetGlobalArrayInternal(lua_, name, args);
|
||||
}
|
||||
|
||||
bool Interpreter::Execute(string_view body, char f_id[43], string* error) {
|
||||
lua_getglobal(lua_, "__redis__err__handler");
|
||||
Fingerprint(body, f_id);
|
||||
|
@ -619,13 +631,13 @@ int Interpreter::RedisGenericCommand(bool raise_error) {
|
|||
}
|
||||
|
||||
int Interpreter::RedisCallCommand(lua_State* lua) {
|
||||
void* me = GetFromRegistry(lua, kInstanceKey);
|
||||
return reinterpret_cast<Interpreter*>(me)->RedisGenericCommand(true);
|
||||
void** ptr = static_cast<void**>(lua_getextraspace(lua));
|
||||
return reinterpret_cast<Interpreter*>(*ptr)->RedisGenericCommand(true);
|
||||
}
|
||||
|
||||
int Interpreter::RedisPCallCommand(lua_State* lua) {
|
||||
void* me = GetFromRegistry(lua, kInstanceKey);
|
||||
return reinterpret_cast<Interpreter*>(me)->RedisGenericCommand(false);
|
||||
void** ptr = static_cast<void**>(lua_getextraspace(lua));
|
||||
return reinterpret_cast<Interpreter*>(*ptr)->RedisGenericCommand(false);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -31,6 +31,10 @@ class ObjectExplorer {
|
|||
|
||||
class Interpreter {
|
||||
public:
|
||||
using MutableSlice = absl::Span<char>;
|
||||
using MutSliceSpan = absl::Span<MutableSlice>;
|
||||
using RedisFunc = std::function<void(MutSliceSpan, ObjectExplorer*)>;
|
||||
|
||||
Interpreter();
|
||||
~Interpreter();
|
||||
|
||||
|
@ -51,6 +55,8 @@ class Interpreter {
|
|||
// Returns: true if the call succeeded, otherwise fills error and returns false.
|
||||
bool RunFunction(const char* f_id, std::string* err);
|
||||
|
||||
void SetGlobalArray(const char* name, MutSliceSpan args);
|
||||
|
||||
bool Execute(std::string_view body, char f_id[43], std::string* err);
|
||||
bool Serialize(ObjectExplorer* serializer, std::string* err);
|
||||
|
||||
|
@ -58,11 +64,7 @@ class Interpreter {
|
|||
// fp[42] will be set to '\0'.
|
||||
static void Fingerprint(std::string_view body, char* fp);
|
||||
|
||||
using MutableSlice = absl::Span<char>;
|
||||
using MutSliceSpan = absl::Span<MutableSlice>;
|
||||
using RedisFunc = std::function<void(MutSliceSpan, ObjectExplorer*)>;
|
||||
|
||||
template<typename U> void SetRedisFunc(U&& u) {
|
||||
template <typename U> void SetRedisFunc(U&& u) {
|
||||
redis_func_ = std::forward<U>(u);
|
||||
}
|
||||
|
||||
|
|
|
@ -84,13 +84,24 @@ class InterpreterTest : public ::testing::Test {
|
|||
return res;
|
||||
}
|
||||
|
||||
void SetGlobalArray(const char* name, vector<string> vec);
|
||||
|
||||
bool Execute(string_view script);
|
||||
|
||||
|
||||
Interpreter intptr_;
|
||||
TestSerializer ser_;
|
||||
string error_;
|
||||
};
|
||||
|
||||
void InterpreterTest::SetGlobalArray(const char* name, vector<string> vec) {
|
||||
vector<Interpreter::MutableSlice> slices(vec.size());
|
||||
for (size_t i = 0; i < vec.size(); ++i) {
|
||||
slices[i] = Interpreter::MutableSlice{vec[i]};
|
||||
}
|
||||
intptr_.SetGlobalArray(name, Interpreter::MutSliceSpan{slices});
|
||||
}
|
||||
|
||||
bool InterpreterTest::Execute(string_view script) {
|
||||
char buf[48];
|
||||
|
||||
|
@ -236,4 +247,13 @@ TEST_F(InterpreterTest, CallArray) {
|
|||
EXPECT_EQ("[str(table) [[[bool(0) str(s2)]] i(42)]]", ser_.res);
|
||||
}
|
||||
|
||||
TEST_F(InterpreterTest, ArgKeys) {
|
||||
vector<string> vec_arr{};
|
||||
vector<Interpreter::MutableSlice> slices;
|
||||
SetGlobalArray("ARGV", {"foo", "bar"});
|
||||
SetGlobalArray("KEYS", {"key1", "key2"});
|
||||
EXPECT_TRUE(Execute("return {ARGV[1], KEYS[1], KEYS[2]}"));
|
||||
EXPECT_EQ("[str(foo) str(key1) str(key2)]", ser_.res);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
2
helio
2
helio
|
@ -1 +1 @@
|
|||
Subproject commit a300a704e193d115333f41a81438ba74d3df8c51
|
||||
Subproject commit 847c612488e152e8e12df98c3c26f222d6e7ac13
|
16
patches/lua-v5.4.4.patch
Normal file
16
patches/lua-v5.4.4.patch
Normal file
|
@ -0,0 +1,16 @@
|
|||
diff --git a/makefile b/makefile
|
||||
index d46e650c..85e6b637 100644
|
||||
--- a/makefile
|
||||
+++ b/makefile
|
||||
@@ -66,9 +66,9 @@ LOCAL = $(TESTS) $(CWARNS)
|
||||
|
||||
|
||||
# enable Linux goodies
|
||||
-MYCFLAGS= $(LOCAL) -std=c99 -DLUA_USE_LINUX -DLUA_USE_READLINE
|
||||
+MYCFLAGS= $(LOCAL) -std=c99 -DLUA_USE_LINUX
|
||||
MYLDFLAGS= $(LOCAL) -Wl,-E
|
||||
-MYLIBS= -ldl -lreadline
|
||||
+MYLIBS= -ldl
|
||||
|
||||
|
||||
CC= gcc
|
|
@ -30,7 +30,7 @@ uint32_t CommandId::OptCount(uint32_t mask) {
|
|||
}
|
||||
|
||||
CommandRegistry::CommandRegistry() {
|
||||
CommandId cd("COMMAND", CO::RANDOM | CO::LOADING | CO::STALE, 0, 0, 0, 0);
|
||||
CommandId cd("COMMAND", CO::RANDOM | CO::LOADING, 0, 0, 0, 0);
|
||||
|
||||
cd.SetHandler([this](const auto& args, auto* cntx) { return Command(args, cntx); });
|
||||
const char* nm = cd.name();
|
||||
|
@ -84,8 +84,6 @@ const char* OptName(CO::CommandOpt fl) {
|
|||
return "denyoom";
|
||||
case FAST:
|
||||
return "fast";
|
||||
case STALE:
|
||||
return "stale";
|
||||
case LOADING:
|
||||
return "loading";
|
||||
case RANDOM:
|
||||
|
|
|
@ -24,7 +24,6 @@ enum CommandOpt : uint32_t {
|
|||
WRITE = 4,
|
||||
LOADING = 8,
|
||||
DENYOOM = 0x10, // use-memory in redis.
|
||||
STALE = 0x20,
|
||||
RANDOM = 0x40,
|
||||
ADMIN = 0x80, // implies NOSCRIPT,
|
||||
NOSCRIPT = 0x100,
|
||||
|
|
|
@ -407,7 +407,6 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
(*cntx)->StartArray(2);
|
||||
string res("*2\r\n$");
|
||||
(*cntx)->SendSimpleString(absl::StrCat(cursor));
|
||||
(*cntx)->StartArray(keys.size());
|
||||
for (const auto& k : keys) {
|
||||
|
@ -547,10 +546,14 @@ using CI = CommandId;
|
|||
#define HFUNC(x) SetHandler(&GenericFamily::x)
|
||||
|
||||
void GenericFamily::Register(CommandRegistry* registry) {
|
||||
constexpr auto kSelectOpts = CO::LOADING | CO::FAST | CO::STALE;
|
||||
constexpr auto kSelectOpts = CO::LOADING | CO::FAST;
|
||||
*registry << CI{"DEL", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del)
|
||||
<< CI{"PING", CO::STALE | CO::FAST, -1, 0, 0, 0}.HFUNC(Ping)
|
||||
<< CI{"ECHO", CO::READONLY | CO::FAST, 2, 0, 0, 0}.HFUNC(Echo)
|
||||
/* Redis compaitibility:
|
||||
* We don't allow PING during loading since in Redis PING is used as
|
||||
* failure detection, and a loading server is considered to be
|
||||
* not available. */
|
||||
<< CI{"PING", CO::FAST, -1, 0, 0, 0}.HFUNC(Ping)
|
||||
<< CI{"ECHO", CO::LOADING | CO::FAST, 2, 0, 0, 0}.HFUNC(Echo)
|
||||
<< CI{"EXISTS", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists)
|
||||
<< CI{"EXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Expire)
|
||||
<< CI{"EXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(ExpireAt)
|
||||
|
|
|
@ -98,11 +98,6 @@ class EvalSerializer : public ObjectExplorer {
|
|||
RedisReplyBuilder* rb_;
|
||||
};
|
||||
|
||||
void CallFromScript(CmdArgList args, ObjectExplorer* reply, ConnectionContext* cntx,
|
||||
Service* service) {
|
||||
reply->OnInt(42);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
Service::Service(ProactorPool* pp) : pp_(*pp), shard_set_(pp), server_family_(this) {
|
||||
|
@ -343,6 +338,10 @@ void Service::Multi(CmdArgList args, ConnectionContext* cntx) {
|
|||
return (*cntx)->SendOk();
|
||||
}
|
||||
|
||||
void Service::CallFromScript(CmdArgList args, ObjectExplorer* reply, ConnectionContext* cntx) {
|
||||
reply->OnInt(42);
|
||||
}
|
||||
|
||||
void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view body = ArgS(args, 1);
|
||||
string_view num_keys_str = ArgS(args, 2);
|
||||
|
@ -359,6 +358,12 @@ void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
|
|||
ServerState* ss = ServerState::tlocal();
|
||||
lock_guard lk(ss->interpreter_mutex);
|
||||
Interpreter& script = ss->GetInterpreter();
|
||||
|
||||
CmdArgList eval_keys = args.subspan(3, num_keys);
|
||||
CmdArgList eval_args = args.subspan(3 + num_keys);
|
||||
|
||||
script.SetGlobalArray("KEYS", eval_keys);
|
||||
script.SetGlobalArray("ARGV", eval_args);
|
||||
string error;
|
||||
char f_id[48];
|
||||
bool success = script.Execute(body, f_id, &error);
|
||||
|
@ -366,8 +371,8 @@ void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
|
|||
EvalSerializer ser{static_cast<RedisReplyBuilder*>(cntx->reply_builder())};
|
||||
string error;
|
||||
|
||||
script.SetRedisFunc([cntx](CmdArgList args, ObjectExplorer* reply) {
|
||||
CallFromScript(args, reply, cntx, nullptr);
|
||||
script.SetRedisFunc([cntx, this](CmdArgList args, ObjectExplorer* reply) {
|
||||
CallFromScript(args, reply, cntx);
|
||||
});
|
||||
|
||||
if (!script.Serialize(&ser, &error)) {
|
||||
|
@ -436,21 +441,19 @@ VarzValue::Map Service::GetVarzStats() {
|
|||
using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx);
|
||||
|
||||
#define HFUNC(x) SetHandler(&Service::x)
|
||||
#define MFUNC(x) SetHandler([this](CmdArgList sp, ConnectionContext* cntx) { \
|
||||
this->x(std::move(sp), cntx); })
|
||||
|
||||
void Service::RegisterCommands() {
|
||||
using CI = CommandId;
|
||||
|
||||
constexpr auto kExecMask = CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS;
|
||||
|
||||
auto cb_exec = [this](CmdArgList sp, ConnectionContext* cntx) {
|
||||
this->Exec(std::move(sp), cntx);
|
||||
};
|
||||
|
||||
registry_ << CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit)
|
||||
<< CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING | CO::STALE, 1, 0, 0, 0}.HFUNC(
|
||||
<< CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(
|
||||
Multi)
|
||||
<< CI{"EVAL", CO::NOSCRIPT, -3, 0, 0, 0}.HFUNC(Eval)
|
||||
<< CI{"EXEC", kExecMask, 1, 0, 0, 0}.SetHandler(cb_exec);
|
||||
<< CI{"EVAL", CO::NOSCRIPT, -3, 0, 0, 0}.MFUNC(Eval)
|
||||
<< CI{"EXEC", kExecMask, 1, 0, 0, 0}.MFUNC(Exec);
|
||||
|
||||
StringFamily::Register(®istry_);
|
||||
GenericFamily::Register(®istry_);
|
||||
|
|
|
@ -17,6 +17,8 @@ class AcceptServer;
|
|||
|
||||
namespace dfly {
|
||||
|
||||
class ObjectExplorer; // for Interpreter
|
||||
|
||||
class Service {
|
||||
public:
|
||||
using error_code = std::error_code;
|
||||
|
@ -64,9 +66,10 @@ class Service {
|
|||
private:
|
||||
static void Quit(CmdArgList args, ConnectionContext* cntx);
|
||||
static void Multi(CmdArgList args, ConnectionContext* cntx);
|
||||
static void Eval(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
void Eval(CmdArgList args, ConnectionContext* cntx);
|
||||
void Exec(CmdArgList args, ConnectionContext* cntx);
|
||||
void CallFromScript(CmdArgList args, ObjectExplorer* reply, ConnectionContext* cntx);
|
||||
|
||||
void RegisterCommands();
|
||||
base::VarzValue::Map GetVarzStats();
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright 2021, Roman Gershman. All rights reserved.
|
||||
// Copyright 2022, Roman Gershman. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
#include <optional>
|
||||
|
@ -6,7 +6,6 @@
|
|||
|
||||
#include "core/op_status.h"
|
||||
#include "io/sync_stream_interface.h"
|
||||
// #include "server/common_types.h"
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
|
|
@ -390,17 +390,15 @@ void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs,
|
|||
#define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x))
|
||||
|
||||
void ServerFamily::Register(CommandRegistry* registry) {
|
||||
constexpr auto kReplicaOpts = CO::ADMIN | CO::STALE | CO::GLOBAL_TRANS;
|
||||
constexpr auto kReplicaOpts = CO::ADMIN | CO::GLOBAL_TRANS;
|
||||
*registry << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize)
|
||||
<< CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug)
|
||||
<< CI{"DEBUG", CO::RANDOM | CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug)
|
||||
<< CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb)
|
||||
<< CI{"FLUSHALL", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0}.HFUNC(FlushAll)
|
||||
<< CI{"INFO", CO::LOADING | CO::STALE, -1, 0, 0, 0}.HFUNC(Info)
|
||||
<< CI{"LASTSAVE", CO::LOADING | CO::STALE | CO::RANDOM | CO::FAST, 1, 0, 0, 0}.HFUNC(
|
||||
LastSave)
|
||||
<< CI{"INFO", CO::LOADING, -1, 0, 0, 0}.HFUNC(Info)
|
||||
<< CI{"LASTSAVE", CO::LOADING | CO::RANDOM | CO::FAST, 1, 0, 0, 0}.HFUNC(LastSave)
|
||||
<< CI{"SAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save)
|
||||
<< CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING | CO::STALE, 1, 0, 0, 0}.HFUNC(
|
||||
_Shutdown)
|
||||
<< CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING, 1, 0, 0, 0}.HFUNC(_Shutdown)
|
||||
<< CI{"SLAVEOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf)
|
||||
<< CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf)
|
||||
<< CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync)
|
||||
|
|
Loading…
Reference in a new issue