red_couch/
lib.rs

1//! RedCouch – Redis module that bridges Couchbase / Memcached
2//!            binary-protocol clients to Redis using a hash-per-item
3//!            data model.
4//!
5//! Build :  cargo build --release
6//! Run   :  redis-server --loadmodule ./target/release/libred_couch.dylib
7
8#![forbid(unsafe_code)]
9
10pub mod ascii;
11pub mod meta;
12pub mod protocol;
13
14#[cfg(not(test))]
15use byteorder::{BigEndian, ByteOrder};
16#[cfg(not(test))]
17use bytes::{Buf, BytesMut};
18#[cfg(not(test))]
19use protocol::{
20    CAS_ZERO, MAGIC_REQ, MAX_BODY_LEN, Opcode, ParseResult, Request, ResponseMeta, ST_ARGS, ST_IX,
21    ST_NF, ST_NOT_STORED, ST_OK, ST_UNK, try_parse_request, write_error_for_raw_opcode,
22    write_response, write_simple_response,
23};
24#[cfg(not(test))]
25use std::sync::atomic::{AtomicU64, Ordering};
26#[cfg(not(test))]
27use std::time::Instant;
28#[cfg(not(test))]
29use std::{
30    io::Read,
31    net::{TcpListener, TcpStream},
32    sync::Once,
33    thread,
34    time::Duration,
35};
36
37// Redis-module imports are only needed when building the actual module,
38// not during `cargo test`.
39#[cfg(not(test))]
40use redis_module::{
41    Context, DetachedFromClient, RedisString, RedisValue, Status, ThreadSafeContext, redis_module,
42};
43
44/* ============================================================
45Key namespace and system keys
46========================================================= */
47
48/// Prefix for user item keys in Redis.  Client key `foo` maps to
49/// Redis key `rc:foo`.
50#[cfg(not(test))]
51pub(crate) const KEY_PREFIX: &[u8] = b"rc:";
52
53/// Redis key for the monotonic CAS counter.
54#[cfg(not(test))]
55pub(crate) const CAS_COUNTER_KEY: &str = "redcouch:sys:cas_counter";
56
57/* ============================================================
58Runtime stats counters
59========================================================= */
60
61#[cfg(not(test))]
62pub(crate) static STAT_CMD_GET: AtomicU64 = AtomicU64::new(0);
63#[cfg(not(test))]
64pub(crate) static STAT_CMD_SET: AtomicU64 = AtomicU64::new(0);
65#[cfg(not(test))]
66pub(crate) static STAT_CMD_FLUSH: AtomicU64 = AtomicU64::new(0);
67#[cfg(not(test))]
68pub(crate) static STAT_CMD_TOUCH: AtomicU64 = AtomicU64::new(0);
69#[cfg(not(test))]
70pub(crate) static STAT_GET_HITS: AtomicU64 = AtomicU64::new(0);
71#[cfg(not(test))]
72pub(crate) static STAT_GET_MISSES: AtomicU64 = AtomicU64::new(0);
73#[cfg(not(test))]
74pub(crate) static STAT_DELETE_HITS: AtomicU64 = AtomicU64::new(0);
75#[cfg(not(test))]
76pub(crate) static STAT_DELETE_MISSES: AtomicU64 = AtomicU64::new(0);
77#[cfg(not(test))]
78pub(crate) static STAT_INCR_HITS: AtomicU64 = AtomicU64::new(0);
79#[cfg(not(test))]
80pub(crate) static STAT_INCR_MISSES: AtomicU64 = AtomicU64::new(0);
81#[cfg(not(test))]
82pub(crate) static STAT_DECR_HITS: AtomicU64 = AtomicU64::new(0);
83#[cfg(not(test))]
84pub(crate) static STAT_DECR_MISSES: AtomicU64 = AtomicU64::new(0);
85#[cfg(not(test))]
86pub(crate) static STAT_CAS_HITS: AtomicU64 = AtomicU64::new(0);
87#[cfg(not(test))]
88pub(crate) static STAT_CAS_MISSES: AtomicU64 = AtomicU64::new(0);
89#[cfg(not(test))]
90pub(crate) static STAT_CAS_BADVAL: AtomicU64 = AtomicU64::new(0);
91#[cfg(not(test))]
92pub(crate) static STAT_CURR_CONNECTIONS: AtomicU64 = AtomicU64::new(0);
93#[cfg(not(test))]
94pub(crate) static STAT_TOTAL_CONNECTIONS: AtomicU64 = AtomicU64::new(0);
95#[cfg(not(test))]
96pub(crate) static STAT_AUTH_CMDS: AtomicU64 = AtomicU64::new(0);
97#[cfg(not(test))]
98pub(crate) static STAT_AUTH_ERRORS: AtomicU64 = AtomicU64::new(0);
99
100/// Module startup time — set in `module_init`.
101#[cfg(not(test))]
102pub(crate) static STARTUP_INSTANT: std::sync::OnceLock<Instant> = std::sync::OnceLock::new();
103
104/// Build the namespaced Redis key for a client key.
105#[cfg(not(test))]
106pub(crate) fn make_redis_key(client_key: &[u8]) -> Vec<u8> {
107    let mut rk = Vec::with_capacity(KEY_PREFIX.len() + client_key.len());
108    rk.extend_from_slice(KEY_PREFIX);
109    rk.extend_from_slice(client_key);
110    rk
111}
112
113/// Lookup table for hex digit → nibble value (0xFF = invalid).
114/// Using a static LUT eliminates per-nibble match branching on the
115/// hot GET/GAT decode path.
116#[cfg(not(test))]
117static HEX_DECODE_LUT: [u8; 256] = {
118    let mut t = [0xFF_u8; 256];
119    t[b'0' as usize] = 0;
120    t[b'1' as usize] = 1;
121    t[b'2' as usize] = 2;
122    t[b'3' as usize] = 3;
123    t[b'4' as usize] = 4;
124    t[b'5' as usize] = 5;
125    t[b'6' as usize] = 6;
126    t[b'7' as usize] = 7;
127    t[b'8' as usize] = 8;
128    t[b'9' as usize] = 9;
129    t[b'a' as usize] = 10;
130    t[b'b' as usize] = 11;
131    t[b'c' as usize] = 12;
132    t[b'd' as usize] = 13;
133    t[b'e' as usize] = 14;
134    t[b'f' as usize] = 15;
135    t[b'A' as usize] = 10;
136    t[b'B' as usize] = 11;
137    t[b'C' as usize] = 12;
138    t[b'D' as usize] = 13;
139    t[b'E' as usize] = 14;
140    t[b'F' as usize] = 15;
141    t
142};
143
144/// Decode a hex string (pairs of hex digits) into raw bytes.
145/// Returns an empty Vec if the input is not valid hex.
146///
147/// Uses a static 256-byte lookup table instead of per-nibble match
148/// chains for better throughput on the hot GET/GAT response path.
149#[cfg(not(test))]
150pub(crate) fn hex_decode(hex: &str) -> Vec<u8> {
151    if hex.len() % 2 != 0 {
152        return Vec::new();
153    }
154    let mut out = Vec::with_capacity(hex.len() / 2);
155    let bytes = hex.as_bytes();
156    let mut i = 0;
157    while i < bytes.len() {
158        let hi = HEX_DECODE_LUT[bytes[i] as usize];
159        let lo = HEX_DECODE_LUT[bytes[i + 1] as usize];
160        if hi > 0x0F || lo > 0x0F {
161            return Vec::new();
162        }
163        out.push((hi << 4) | lo);
164        i += 2;
165    }
166    out
167}
168
169/* ============================================================
170Lua scripts for atomic operations
171========================================================= */
172
173/// Lua script for SET/ADD/REPLACE with atomic CAS check.
174///
175/// KEYS[1] = item key, KEYS[2] = CAS counter key
176/// ARGV[1] = op ("set"|"add"|"replace")
177/// ARGV[2] = value bytes
178/// ARGV[3] = flags (decimal string)
179/// ARGV[4] = request CAS (decimal string, "0" = skip check)
180/// ARGV[5] = expiry (decimal string)
181///
182/// Returns: {status, new_cas_string}
183///   status: 0=OK, -1=NOT_FOUND, -2=KEY_EXISTS
184#[cfg(not(test))]
185pub(crate) const LUA_STORE: &str = r#"
186local op = ARGV[1]
187local exists = redis.call('EXISTS', KEYS[1])
188if op == 'add' and exists == 1 then return {-2, ''} end
189if op == 'replace' and exists == 0 then return {-1, ''} end
190local req_cas = ARGV[4]
191if req_cas ~= '0' then
192  if exists == 0 then return {-1, ''} end
193  local stored_cas = redis.call('HGET', KEYS[1], 'c')
194  if stored_cas ~= req_cas then return {-2, ''} end
195end
196local new_cas = redis.call('INCR', KEYS[2])
197redis.call('HSET', KEYS[1], 'v', ARGV[2], 'f', ARGV[3], 'c', tostring(new_cas))
198local exp = tonumber(ARGV[5])
199if exp ~= nil and exp > 0 then
200  if exp <= 2592000 then redis.call('EXPIRE', KEYS[1], exp)
201  else redis.call('EXPIREAT', KEYS[1], exp) end
202elseif exp == 0 and exists == 1 then
203  redis.call('PERSIST', KEYS[1])
204end
205return {0, tostring(new_cas)}
206"#;
207
208/// Lua script for GET — returns value as hex-encoded string to avoid
209/// redis-module UTF-8 conversion panics on binary payloads.
210///
211/// KEYS[1] = item key
212///
213/// Returns: {status, hex_value, flags_string, cas_string}
214///   status: 0=OK, -1=NOT_FOUND
215///   hex_value: value bytes encoded as lowercase hex pairs
216///
217/// Performance: uses a pre-built 256-entry lookup table and
218/// table.concat instead of gsub + string.format per byte.
219#[cfg(not(test))]
220pub(crate) const LUA_GET: &str = r#"
221if redis.call('EXISTS', KEYS[1]) == 0 then return {-1, '', '', ''} end
222local v = redis.call('HGET', KEYS[1], 'v')
223local f = redis.call('HGET', KEYS[1], 'f')
224local c = redis.call('HGET', KEYS[1], 'c')
225if v == false then v = '' end
226if f == false then f = '0' end
227if c == false then c = '0' end
228local ht = {}
229for i = 0, 255 do ht[i] = string.format('%02x', i) end
230local p = {}
231for i = 1, #v do p[i] = ht[string.byte(v, i)] end
232return {0, table.concat(p), f, c}
233"#;
234
235/// Lua script for meta GET — like LUA_GET but also returns TTL and value size.
236///
237/// KEYS[1] = item key
238///
239/// Returns: {status, hex_value, flags_string, cas_string, ttl, size}
240///   status: 0=OK, -1=NOT_FOUND
241///   ttl: remaining TTL in seconds (-1 = no expiry, -2 = not found)
242///   size: byte length of raw value
243#[cfg(not(test))]
244pub(crate) const LUA_META_GET: &str = r#"
245if redis.call('EXISTS', KEYS[1]) == 0 then return {-1, '', '', '', -2, 0} end
246local v = redis.call('HGET', KEYS[1], 'v')
247local f = redis.call('HGET', KEYS[1], 'f')
248local c = redis.call('HGET', KEYS[1], 'c')
249if v == false then v = '' end
250if f == false then f = '0' end
251if c == false then c = '0' end
252local ttl = redis.call('TTL', KEYS[1])
253local ht = {}
254for i = 0, 255 do ht[i] = string.format('%02x', i) end
255local p = {}
256for i = 1, #v do p[i] = ht[string.byte(v, i)] end
257return {0, table.concat(p), f, c, ttl, #v}
258"#;
259
260/// Lua script for DELETE with CAS check.
261///
262/// KEYS[1] = item key
263/// ARGV[1] = request CAS (decimal string, "0" = skip check)
264///
265/// Returns: 0=OK, -1=NOT_FOUND, -2=KEY_EXISTS (CAS mismatch)
266#[cfg(not(test))]
267pub(crate) const LUA_DELETE: &str = r#"
268if redis.call('EXISTS', KEYS[1]) == 0 then return -1 end
269local req_cas = ARGV[1]
270if req_cas ~= '0' then
271  local stored_cas = redis.call('HGET', KEYS[1], 'c')
272  if stored_cas ~= req_cas then return -2 end
273end
274redis.call('DEL', KEYS[1])
275return 0
276"#;
277
278/// Lua script for INCR/DECR with u64 semantics.
279///
280/// KEYS[1] = item key, KEYS[2] = CAS counter key
281/// ARGV[1] = delta (decimal string)
282/// ARGV[2] = is_decrement ("0"|"1")
283/// ARGV[3] = initial value (decimal string)
284/// ARGV[4] = expiry (decimal string)
285///
286/// Precision note: Lua 5.1 (embedded in Redis) uses IEEE 754 double-precision
287/// floats, so integers above 2^53 (9007199254740992) lose precision.  Counter
288/// values within [0, 2^53) are exact.  Values at or above 2^53 may round;
289/// the module does NOT attempt string-based big-integer math in Lua because
290/// the performance and complexity tradeoffs are not justified for GA.
291/// Overflow past the Lua precision boundary wraps modulo the double
292/// representation.  Underflow on DECR clamps to 0.
293///
294/// Returns: {status, value_string, cas_string}
295///   status: 0=OK, -1=NOT_FOUND, -3=NON_NUMERIC
296#[cfg(not(test))]
297pub(crate) const LUA_COUNTER: &str = r#"
298local exists = redis.call('EXISTS', KEYS[1])
299if exists == 0 then
300  local exp = tonumber(ARGV[4])
301  if exp == 4294967295 then return {-1, '', ''} end
302  local new_cas = redis.call('INCR', KEYS[2])
303  local init = ARGV[3]
304  redis.call('HSET', KEYS[1], 'v', init, 'f', '0', 'c', tostring(new_cas))
305  if exp ~= nil and exp > 0 then
306    if exp <= 2592000 then redis.call('EXPIRE', KEYS[1], exp)
307    else redis.call('EXPIREAT', KEYS[1], exp) end
308  end
309  return {0, init, tostring(new_cas)}
310end
311local val = redis.call('HGET', KEYS[1], 'v')
312local num = tonumber(val)
313if num == nil then return {-3, '', ''} end
314local delta = tonumber(ARGV[1])
315if ARGV[2] == '1' then
316  if num < delta then num = 0 else num = num - delta end
317else
318  num = num + delta
319end
320if num < 0 then num = 0 end
321local str_val = string.format('%.0f', num)
322local new_cas = redis.call('INCR', KEYS[2])
323redis.call('HSET', KEYS[1], 'v', str_val, 'c', tostring(new_cas))
324return {0, str_val, tostring(new_cas)}
325"#;
326
327/// Lua script for TOUCH — update expiry on an existing key.
328///
329/// KEYS[1] = item key, KEYS[2] = CAS counter key
330/// ARGV[1] = expiry (decimal string)
331///
332/// Returns: {status, cas_string}
333///   status: 0=OK, -1=NOT_FOUND
334#[cfg(not(test))]
335pub(crate) const LUA_TOUCH: &str = r#"
336if redis.call('EXISTS', KEYS[1]) == 0 then return {-1, ''} end
337local exp = tonumber(ARGV[1])
338if exp ~= nil and exp > 0 then
339  if exp <= 2592000 then redis.call('EXPIRE', KEYS[1], exp)
340  else redis.call('EXPIREAT', KEYS[1], exp) end
341elseif exp == 0 then
342  redis.call('PERSIST', KEYS[1])
343end
344local new_cas = redis.call('INCR', KEYS[2])
345redis.call('HSET', KEYS[1], 'c', tostring(new_cas))
346return {0, tostring(new_cas)}
347"#;
348
349/// Lua script for GAT (Get And Touch) — fetch value and update expiry atomically.
350///
351/// KEYS[1] = item key, KEYS[2] = CAS counter key
352/// ARGV[1] = expiry (decimal string)
353///
354/// Returns: {status, hex_value, flags_string, cas_string}
355///   status: 0=OK, -1=NOT_FOUND
356#[cfg(not(test))]
357pub(crate) const LUA_GAT: &str = r#"
358if redis.call('EXISTS', KEYS[1]) == 0 then return {-1, '', '', ''} end
359local exp = tonumber(ARGV[1])
360if exp ~= nil and exp > 0 then
361  if exp <= 2592000 then redis.call('EXPIRE', KEYS[1], exp)
362  else redis.call('EXPIREAT', KEYS[1], exp) end
363elseif exp == 0 then
364  redis.call('PERSIST', KEYS[1])
365end
366local new_cas = redis.call('INCR', KEYS[2])
367redis.call('HSET', KEYS[1], 'c', tostring(new_cas))
368local v = redis.call('HGET', KEYS[1], 'v')
369local f = redis.call('HGET', KEYS[1], 'f')
370if v == false then v = '' end
371if f == false then f = '0' end
372local ht = {}
373for i = 0, 255 do ht[i] = string.format('%02x', i) end
374local p = {}
375for i = 1, #v do p[i] = ht[string.byte(v, i)] end
376return {0, table.concat(p), f, tostring(new_cas)}
377"#;
378
379/// Lua script for APPEND — append data to existing value.
380///
381/// KEYS[1] = item key, KEYS[2] = CAS counter key
382/// ARGV[1] = data to append
383/// ARGV[2] = request CAS (decimal string, "0" = skip check)
384///
385/// Returns: {status, cas_string}
386///   status: 0=OK, -1=NOT_FOUND, -2=KEY_EXISTS (CAS mismatch)
387#[cfg(not(test))]
388pub(crate) const LUA_APPEND: &str = r#"
389if redis.call('EXISTS', KEYS[1]) == 0 then return {-5, ''} end
390local req_cas = ARGV[2]
391if req_cas ~= '0' then
392  local stored_cas = redis.call('HGET', KEYS[1], 'c')
393  if stored_cas ~= req_cas then return {-2, ''} end
394end
395local old_v = redis.call('HGET', KEYS[1], 'v')
396if old_v == false then old_v = '' end
397local new_v = old_v .. ARGV[1]
398local new_cas = redis.call('INCR', KEYS[2])
399redis.call('HSET', KEYS[1], 'v', new_v, 'c', tostring(new_cas))
400return {0, tostring(new_cas)}
401"#;
402
403/// Lua script for PREPEND — prepend data to existing value.
404///
405/// KEYS[1] = item key, KEYS[2] = CAS counter key
406/// ARGV[1] = data to prepend
407/// ARGV[2] = request CAS (decimal string, "0" = skip check)
408///
409/// Returns: {status, cas_string}
410///   status: 0=OK, -1=NOT_FOUND, -2=KEY_EXISTS (CAS mismatch)
411#[cfg(not(test))]
412pub(crate) const LUA_PREPEND: &str = r#"
413if redis.call('EXISTS', KEYS[1]) == 0 then return {-5, ''} end
414local req_cas = ARGV[2]
415if req_cas ~= '0' then
416  local stored_cas = redis.call('HGET', KEYS[1], 'c')
417  if stored_cas ~= req_cas then return {-2, ''} end
418end
419local old_v = redis.call('HGET', KEYS[1], 'v')
420if old_v == false then old_v = '' end
421local new_v = ARGV[1] .. old_v
422local new_cas = redis.call('INCR', KEYS[2])
423redis.call('HSET', KEYS[1], 'v', new_v, 'c', tostring(new_cas))
424return {0, tostring(new_cas)}
425"#;
426
427/// Lua script for FLUSH — atomically scans and deletes all rc:* keys.
428///
429/// Returns: count of deleted keys
430#[cfg(not(test))]
431pub(crate) const LUA_FLUSH: &str = r#"
432local cursor = '0'
433local count = 0
434repeat
435  local result = redis.call('SCAN', cursor, 'MATCH', 'rc:*', 'COUNT', 100)
436  cursor = result[1]
437  local keys = result[2]
438  if #keys > 0 then
439    for i, key in ipairs(keys) do
440      redis.call('DEL', key)
441      count = count + 1
442    end
443  end
444until cursor == '0'
445return count
446"#;
447
448/* ============================================================
449EVALSHA infrastructure — precompute SHA1 of each Lua script
450and use EVALSHA with NOSCRIPT fallback for reduced network
451overhead on every Redis call.
452========================================================= */
453
454#[cfg(not(test))]
455use std::sync::OnceLock;
456
457/// Holds a Lua script source and caches its SHA1 hex digest.
458#[cfg(not(test))]
459pub(crate) struct LuaScript {
460    pub source: &'static str,
461    sha1: OnceLock<String>,
462}
463
464#[cfg(not(test))]
465impl LuaScript {
466    pub const fn new(source: &'static str) -> Self {
467        Self {
468            source,
469            sha1: OnceLock::new(),
470        }
471    }
472
473    /// Return the SHA1 hex digest, computing it lazily on first call.
474    pub fn sha1(&self) -> &str {
475        self.sha1.get_or_init(|| {
476            use sha1::Digest;
477            let hash = sha1::Sha1::digest(self.source.as_bytes());
478            // Format as 40-char lowercase hex.
479            let mut hex = String::with_capacity(40);
480            for byte in hash.iter() {
481                use std::fmt::Write;
482                let _ = write!(hex, "{:02x}", byte);
483            }
484            hex
485        })
486    }
487}
488
489/// Static script instances for EVALSHA.
490#[cfg(not(test))]
491pub(crate) static SCRIPT_STORE: LuaScript = LuaScript::new(LUA_STORE);
492#[cfg(not(test))]
493pub(crate) static SCRIPT_GET: LuaScript = LuaScript::new(LUA_GET);
494#[cfg(not(test))]
495pub(crate) static SCRIPT_META_GET: LuaScript = LuaScript::new(LUA_META_GET);
496#[cfg(not(test))]
497pub(crate) static SCRIPT_DELETE: LuaScript = LuaScript::new(LUA_DELETE);
498#[cfg(not(test))]
499pub(crate) static SCRIPT_COUNTER: LuaScript = LuaScript::new(LUA_COUNTER);
500#[cfg(not(test))]
501pub(crate) static SCRIPT_TOUCH: LuaScript = LuaScript::new(LUA_TOUCH);
502#[cfg(not(test))]
503pub(crate) static SCRIPT_GAT: LuaScript = LuaScript::new(LUA_GAT);
504#[cfg(not(test))]
505pub(crate) static SCRIPT_APPEND: LuaScript = LuaScript::new(LUA_APPEND);
506#[cfg(not(test))]
507pub(crate) static SCRIPT_PREPEND: LuaScript = LuaScript::new(LUA_PREPEND);
508#[cfg(not(test))]
509pub(crate) static SCRIPT_FLUSH: LuaScript = LuaScript::new(LUA_FLUSH);
510#[cfg(not(test))]
511pub(crate) static SCRIPT_COUNT_ITEMS: LuaScript = LuaScript::new(LUA_COUNT_ITEMS);
512
513/// Execute a Lua script via EVALSHA with automatic NOSCRIPT fallback.
514///
515/// `keys_and_args` should contain everything after the script text/hash
516/// in the EVAL argument list: `[numkeys, key1, ..., arg1, ...]`.
517///
518/// On NOSCRIPT error from EVALSHA, transparently retries with full EVAL.
519#[cfg(not(test))]
520pub(crate) fn eval_lua(
521    ctx: &redis_module::Context,
522    script: &LuaScript,
523    keys_and_args: &[&[u8]],
524) -> Result<RedisValue, redis_module::RedisError> {
525    let sha1 = script.sha1();
526    let sha1_bytes = sha1.as_bytes();
527
528    // Build EVALSHA argument list: [sha1, numkeys, key1, ..., arg1, ...]
529    let mut args: Vec<&[u8]> = Vec::with_capacity(1 + keys_and_args.len());
530    args.push(sha1_bytes);
531    args.extend_from_slice(keys_and_args);
532
533    let result = ctx.call("EVALSHA", args.as_slice());
534
535    // Check for NOSCRIPT — script not in cache, fall back to EVAL.
536    match &result {
537        Ok(RedisValue::StaticError(e)) if e.contains("NOSCRIPT") => {}
538        Err(e) => {
539            let msg = e.to_string();
540            if !msg.contains("NOSCRIPT") {
541                return result;
542            }
543        }
544        _ => return result,
545    }
546
547    // Fallback: full EVAL with script source.
548    let mut eval_args: Vec<&[u8]> = Vec::with_capacity(1 + keys_and_args.len());
549    eval_args.push(script.source.as_bytes());
550    eval_args.extend_from_slice(keys_and_args);
551    ctx.call("EVAL", eval_args.as_slice())
552}
553
554/* ============================================================
555TCP listener (started once from module_init)
556========================================================= */
557
558/// Default bind address — loopback only to avoid accidental public
559/// exposure.  Override via module args if needed in future.
560#[cfg(not(test))]
561const DEFAULT_BIND_ADDR: &str = "127.0.0.1:11210";
562
563/// Maximum number of concurrent client connections.  Beyond this limit
564/// new connections are accepted and immediately closed with an error log.
565#[cfg(not(test))]
566pub(crate) const MAX_CONNECTIONS: u64 = 1024;
567
568/// Socket read timeout — how long a connection can be idle before being
569/// closed.  30 seconds is generous for interactive memcached workloads.
570#[cfg(not(test))]
571const SOCKET_READ_TIMEOUT: Duration = Duration::from_secs(30);
572
573/// Socket write timeout — prevents a stuck client from blocking a thread.
574#[cfg(not(test))]
575const SOCKET_WRITE_TIMEOUT: Duration = Duration::from_secs(10);
576
577/// Maximum read buffer size per connection.  If the buffer grows beyond
578/// this without producing a complete frame, the connection is closed.
579/// This prevents a slow-drip attack from consuming unbounded memory.
580#[cfg(not(test))]
581const MAX_READ_BUF: usize = (MAX_BODY_LEN as usize) + protocol::HEADER_LEN + 4096;
582
583/// Connection rejection counter.
584#[cfg(not(test))]
585pub(crate) static STAT_REJECTED_CONNECTIONS: AtomicU64 = AtomicU64::new(0);
586
587#[cfg(not(test))]
588static LISTENER: Once = Once::new();
589
590#[cfg(not(test))]
591fn spawn_listener() {
592    thread::spawn(|| {
593        let listener = match TcpListener::bind(DEFAULT_BIND_ADDR) {
594            Ok(l) => l,
595            Err(e) => {
596                eprintln!(
597                    "[redcouch] FATAL: cannot bind {DEFAULT_BIND_ADDR}: {e}  \
598                     (is another instance already running?)"
599                );
600                return;
601            }
602        };
603        // Use blocking accept — avoids busy-wait polling.
604        listener.set_nonblocking(false).ok();
605        eprintln!(
606            "[redcouch] listening on {DEFAULT_BIND_ADDR} (max_connections={MAX_CONNECTIONS})"
607        );
608
609        for stream in listener.incoming() {
610            match stream {
611                Ok(mut sock) => {
612                    // Enforce connection limit.
613                    let current = STAT_CURR_CONNECTIONS.load(Ordering::Relaxed);
614                    if current >= MAX_CONNECTIONS {
615                        STAT_REJECTED_CONNECTIONS.fetch_add(1, Ordering::Relaxed);
616                        eprintln!(
617                            "[redcouch] connection limit reached ({MAX_CONNECTIONS}), rejecting"
618                        );
619                        // Drop the socket immediately — client sees connection reset.
620                        drop(sock);
621                        continue;
622                    }
623
624                    sock.set_nodelay(true).ok();
625                    sock.set_read_timeout(Some(SOCKET_READ_TIMEOUT)).ok();
626                    sock.set_write_timeout(Some(SOCKET_WRITE_TIMEOUT)).ok();
627                    STAT_TOTAL_CONNECTIONS.fetch_add(1, Ordering::Relaxed);
628                    STAT_CURR_CONNECTIONS.fetch_add(1, Ordering::Relaxed);
629                    thread::spawn(move || {
630                        if let Err(e) = handle_conn(&mut sock) {
631                            // Don't log read timeouts as errors — they are expected
632                            // for idle connections.
633                            if let BridgeErr::Io(ref io_err) = e {
634                                if io_err.kind() == std::io::ErrorKind::WouldBlock
635                                    || io_err.kind() == std::io::ErrorKind::TimedOut
636                                {
637                                    // Normal idle timeout, suppress log.
638                                } else {
639                                    eprintln!("[redcouch] connection error: {e}");
640                                }
641                            } else {
642                                eprintln!("[redcouch] connection error: {e}");
643                            }
644                        }
645                        STAT_CURR_CONNECTIONS.fetch_sub(1, Ordering::Relaxed);
646                    });
647                }
648                Err(e) => {
649                    eprintln!("[redcouch] accept error: {e}");
650                    // Transient accept errors (e.g. fd exhaustion) — sleep
651                    // briefly and retry rather than killing the listener.
652                    thread::sleep(Duration::from_millis(100));
653                }
654            }
655        }
656    });
657}
658
659/* ============================================================
660Connection state machine
661========================================================= */
662
663#[cfg(not(test))]
664#[derive(thiserror::Error, Debug)]
665pub(crate) enum BridgeErr {
666    #[error("io {0}")]
667    Io(#[from] std::io::Error),
668    #[error("redis {0}")]
669    Redis(String),
670}
671#[cfg(not(test))]
672pub(crate) type Br<T> = Result<T, BridgeErr>;
673
674#[cfg(not(test))]
675fn handle_conn(sock: &mut TcpStream) -> Br<()> {
676    let mut buf = BytesMut::with_capacity(16384);
677
678    // Read the first byte(s) to detect the protocol.
679    // Loop to skip leading \r\n (empty lines before a real command).
680    loop {
681        if buf.is_empty() {
682            let mut tmp = [0u8; 16384];
683            match sock.read(&mut tmp) {
684                Ok(0) => return Ok(()),
685                Ok(n) => buf.extend_from_slice(&tmp[..n]),
686                Err(ref e)
687                    if e.kind() == std::io::ErrorKind::WouldBlock
688                        || e.kind() == std::io::ErrorKind::TimedOut =>
689                {
690                    return Err(std::io::Error::from(std::io::ErrorKind::TimedOut).into());
691                }
692                Err(e) => return Err(e.into()),
693            }
694        }
695
696        // Skip leading \r and \n bytes (empty lines before protocol detection).
697        while !buf.is_empty() && (buf[0] == b'\r' || buf[0] == b'\n') {
698            buf.advance(1);
699        }
700        if buf.is_empty() {
701            continue; // Read more data
702        }
703
704        let first = buf[0];
705        if first == MAGIC_REQ {
706            // Binary protocol
707            return handle_binary_conn(sock, &mut buf);
708        } else if first.is_ascii_graphic() || first == b' ' {
709            // Text protocol (ASCII or future meta)
710            return ascii::handle_ascii_conn(sock, &mut buf);
711        } else {
712            // Unknown protocol byte — close.
713            eprintln!("[redcouch] unknown protocol byte 0x{first:02x}, closing");
714            return Ok(());
715        }
716    }
717}
718
719/// Binary protocol connection handler — the original request/response loop.
720#[cfg(not(test))]
721fn handle_binary_conn(sock: &mut TcpStream, buf: &mut BytesMut) -> Br<()> {
722    // Response write buffer — all responses for a batch of parsed requests
723    // are collected here and flushed in a single write_all() call, reducing
724    // the number of syscalls from O(responses * 4) to O(1) per read cycle.
725    let mut out = Vec::with_capacity(8192);
726
727    loop {
728        // Guard against unbounded buffer growth.
729        if buf.len() > MAX_READ_BUF {
730            eprintln!("[redcouch] read buffer exceeded {MAX_READ_BUF} bytes, closing connection");
731            return Ok(());
732        }
733
734        // If we don't have enough data from the initial read, read more.
735        if buf.is_empty() {
736            let mut tmp = [0u8; 16384];
737            match sock.read(&mut tmp) {
738                Ok(0) => return Ok(()),
739                Ok(n) => buf.extend_from_slice(&tmp[..n]),
740                Err(ref e)
741                    if e.kind() == std::io::ErrorKind::WouldBlock
742                        || e.kind() == std::io::ErrorKind::TimedOut =>
743                {
744                    return Err(std::io::Error::from(std::io::ErrorKind::TimedOut).into());
745                }
746                Err(e) => return Err(e.into()),
747            }
748        }
749
750        loop {
751            match try_parse_request(buf) {
752                ParseResult::Ok((req, used)) => {
753                    handle(req, &mut out)?;
754                    buf.advance(used);
755                }
756                ParseResult::Incomplete => break,
757                ParseResult::BadMagic => {
758                    eprintln!("[redcouch] bad magic byte, closing connection");
759                    return Ok(());
760                }
761                ParseResult::MalformedFrame {
762                    opaque,
763                    opcode_byte,
764                    bytes_to_skip,
765                } => {
766                    eprintln!(
767                        "[redcouch] malformed frame (opcode 0x{opcode_byte:02x}), skipping {bytes_to_skip} bytes"
768                    );
769                    write_error_for_raw_opcode(
770                        &mut out,
771                        opcode_byte,
772                        ST_ARGS,
773                        opaque,
774                        b"Malformed frame",
775                    )?;
776                    buf.advance(bytes_to_skip);
777                }
778                ParseResult::OversizedFrame {
779                    opaque,
780                    opcode_byte,
781                } => {
782                    eprintln!(
783                        "[redcouch] oversized frame (opcode 0x{opcode_byte:02x}), closing connection"
784                    );
785                    write_error_for_raw_opcode(
786                        &mut out,
787                        opcode_byte,
788                        ST_ARGS,
789                        opaque,
790                        b"Frame too large",
791                    )?;
792                    // Flush the error response before closing.
793                    if !out.is_empty() {
794                        use std::io::Write;
795                        sock.write_all(&out)?;
796                        out.clear();
797                    }
798                    return Ok(());
799                }
800            }
801        }
802
803        // Flush all batched responses in a single write_all() call.
804        if !out.is_empty() {
805            use std::io::Write;
806            sock.write_all(&out)?;
807            out.clear();
808        }
809
810        // Read more data for the next iteration.
811        let mut tmp = [0u8; 16384];
812        match sock.read(&mut tmp) {
813            Ok(0) => return Ok(()),
814            Ok(n) => buf.extend_from_slice(&tmp[..n]),
815            Err(ref e)
816                if e.kind() == std::io::ErrorKind::WouldBlock
817                    || e.kind() == std::io::ErrorKind::TimedOut =>
818            {
819                return Err(std::io::Error::from(std::io::ErrorKind::TimedOut).into());
820            }
821            Err(e) => return Err(e.into()),
822        }
823    }
824}
825
826/* ============================================================
827Command handlers
828========================================================= */
829
830#[cfg(not(test))]
831fn handle(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
832    let opcode = match req.hdr.opcode {
833        Some(op) => op,
834        None => {
835            // Unknown opcode — respond with ST_UNK and continue.
836            write_error_for_raw_opcode(
837                out,
838                req.hdr.opcode_byte,
839                ST_UNK,
840                req.hdr.opaque,
841                b"Unknown command",
842            )?;
843            return Ok(());
844        }
845    };
846
847    use Opcode::*;
848    match opcode.base() {
849        Get | GetK => op_get(req, out),
850        Set | Add | Replace => op_store(req, out),
851        Delete => op_delete(req, out),
852        Increment | Decrement => op_counter(req, out),
853        Touch => op_touch(req, out),
854        GAT => op_gat(req, out),
855        Append | Prepend => op_append_prepend(req, out),
856        Flush => op_flush(req, out),
857        Version => op_version(req, out),
858        Stat => op_stat(req, out),
859        Verbosity => op_verbosity(req, out),
860        SaslListMechs => op_sasl_list_mechs(req, out),
861        SaslAuth => op_sasl_auth(req, out),
862        SaslStep => op_sasl_step(req, out),
863        Noop => {
864            write_simple_response(out, opcode, ST_OK, req.hdr.opaque, CAS_ZERO, &[])?;
865            Ok(())
866        }
867        Quit => {
868            if !opcode.is_quiet() {
869                write_simple_response(out, opcode, ST_OK, req.hdr.opaque, CAS_ZERO, &[])?;
870            }
871            Err(std::io::Error::from(std::io::ErrorKind::ConnectionAborted).into())
872        }
873        // base() maps all quiet variants to their loud base, so the
874        // remaining arms are unreachable.
875        _ => {
876            write_error_for_raw_opcode(
877                out,
878                req.hdr.opcode_byte,
879                ST_UNK,
880                req.hdr.opaque,
881                b"Unknown command",
882            )?;
883            Ok(())
884        }
885    }
886}
887
888/* ------------ helpers to run a Redis command -------------- */
889#[cfg(not(test))]
890pub(crate) fn with_ctx<T>(f: impl Fn(&redis_module::Context) -> T) -> T {
891    let tsc = ThreadSafeContext::<DetachedFromClient>::new();
892    let guard = tsc.lock();
893    f(&guard)
894}
895
896/// Helper to check if a RedisValue represents an error (including the
897/// `Ok(RedisValue::StaticError(...))` pattern from redis-module 2.0.7
898/// when `RedisModule_Call` returns NULL).
899#[cfg(not(test))]
900pub(crate) fn is_redis_error(v: &RedisValue) -> bool {
901    matches!(v, RedisValue::StaticError(_))
902}
903
904/// Extract an integer from a Redis EVAL array result element.
905#[cfg(not(test))]
906pub(crate) fn eval_int(v: &RedisValue) -> i64 {
907    match v {
908        RedisValue::Integer(n) => *n,
909        _ => 0,
910    }
911}
912
913/// Extract a bulk string from a Redis EVAL array result element.
914#[cfg(not(test))]
915pub(crate) fn eval_str(v: &RedisValue) -> String {
916    match v {
917        RedisValue::BulkString(s) => s.clone(),
918        RedisValue::BulkRedisString(s) => s.to_string_lossy(),
919        RedisValue::SimpleString(s) => s.clone(),
920        RedisValue::SimpleStringStatic(s) => (*s).to_string(),
921        _ => String::new(),
922    }
923}
924
925/* ------------ GET / GETQ / GETK / GETKQ ------------ */
926#[cfg(not(test))]
927fn op_get(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
928    let opcode = req.hdr.opcode.unwrap();
929    STAT_CMD_GET.fetch_add(1, Ordering::Relaxed);
930    let rk = make_redis_key(req.key);
931
932    // Use EVALSHA to atomically fetch value, flags, CAS (NOSCRIPT fallback to EVAL).
933    let reply = with_ctx(|ctx| {
934        let keys_and_args: &[&[u8]] = &[b"1", rk.as_slice()];
935        eval_lua(ctx, &SCRIPT_GET, keys_and_args)
936    })
937    .map_err(|e| BridgeErr::Redis(e.to_string()))?;
938
939    if is_redis_error(&reply) {
940        return Err(BridgeErr::Redis(format!("EVALSHA get error: {reply:?}")));
941    }
942
943    // Parse result: {status, value, flags_string, cas_string}
944    let fields = match &reply {
945        RedisValue::Array(arr) if arr.len() >= 4 => arr,
946        _ => {
947            return Err(BridgeErr::Redis(format!("EVAL get unexpected: {reply:?}")));
948        }
949    };
950
951    let status_code = eval_int(&fields[0]);
952    if status_code == -1 {
953        // NOT_FOUND
954        STAT_GET_MISSES.fetch_add(1, Ordering::Relaxed);
955        if opcode.is_quiet() {
956            return Ok(());
957        }
958        write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
959        return Ok(());
960    }
961
962    STAT_GET_HITS.fetch_add(1, Ordering::Relaxed);
963
964    // Value is hex-encoded by the Lua script to avoid redis-module
965    // UTF-8 conversion panics on binary payloads.  Decode it here.
966    let hex_str = eval_str(&fields[1]);
967    let value_bytes = hex_decode(&hex_str);
968
969    let flags: u32 = eval_str(&fields[2]).parse().unwrap_or(0);
970    let cas: u64 = eval_str(&fields[3]).parse().unwrap_or(0);
971
972    let extras = flags.to_be_bytes();
973    let key_part = if opcode.includes_key() { req.key } else { &[] };
974
975    write_response(
976        out,
977        opcode,
978        &ResponseMeta {
979            status: ST_OK,
980            opaque: req.hdr.opaque,
981            cas,
982        },
983        &extras,
984        key_part,
985        &value_bytes,
986    )?;
987    Ok(())
988}
989
990/* ------ SET / ADD / REPLACE (and quiet variants) ------ */
991#[cfg(not(test))]
992fn op_store(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
993    let opcode = req.hdr.opcode.unwrap();
994    let base = opcode.base();
995    STAT_CMD_SET.fetch_add(1, Ordering::Relaxed);
996
997    if req.extras.len() != 8 {
998        write_simple_response(out, opcode, ST_ARGS, req.hdr.opaque, CAS_ZERO, &[])?;
999        return Ok(());
1000    }
1001
1002    let flags = BigEndian::read_u32(&req.extras[0..4]);
1003    let expiry = BigEndian::read_u32(&req.extras[4..8]);
1004    let rk = make_redis_key(req.key);
1005
1006    let op_name = match base {
1007        Opcode::Set => "set",
1008        Opcode::Add => "add",
1009        Opcode::Replace => "replace",
1010        _ => "set",
1011    };
1012
1013    let req_cas = req.hdr.cas.to_string();
1014    let flags_str = flags.to_string();
1015    let expiry_str = expiry.to_string();
1016
1017    // EVALSHA LUA_STORE 2 <key> <cas_counter> <op> <value> <flags> <cas> <expiry>
1018    let reply = with_ctx(|ctx| {
1019        let keys_and_args: &[&[u8]] = &[
1020            b"2",
1021            rk.as_slice(),
1022            CAS_COUNTER_KEY.as_bytes(),
1023            op_name.as_bytes(),
1024            req.value,
1025            flags_str.as_bytes(),
1026            req_cas.as_bytes(),
1027            expiry_str.as_bytes(),
1028        ];
1029        eval_lua(ctx, &SCRIPT_STORE, keys_and_args)
1030    })
1031    .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1032
1033    if is_redis_error(&reply) {
1034        return Err(BridgeErr::Redis(format!("EVALSHA store error: {reply:?}")));
1035    }
1036
1037    // Parse the Lua result: {status, new_cas_string}
1038    let (status_code, new_cas) = match &reply {
1039        RedisValue::Array(arr) if arr.len() >= 2 => {
1040            let st = eval_int(&arr[0]);
1041            let cas_s = eval_str(&arr[1]);
1042            let cas_val: u64 = cas_s.parse().unwrap_or(0);
1043            (st, cas_val)
1044        }
1045        _ => {
1046            return Err(BridgeErr::Redis(format!(
1047                "EVAL store unexpected: {reply:?}"
1048            )));
1049        }
1050    };
1051
1052    // Track CAS stats for CAS-conditional stores.
1053    let is_cas_op = req.hdr.cas != 0;
1054    match status_code {
1055        0 => {
1056            // Success
1057            if is_cas_op {
1058                STAT_CAS_HITS.fetch_add(1, Ordering::Relaxed);
1059            }
1060            if !opcode.is_quiet() {
1061                write_response(
1062                    out,
1063                    opcode,
1064                    &ResponseMeta {
1065                        status: ST_OK,
1066                        opaque: req.hdr.opaque,
1067                        cas: new_cas,
1068                    },
1069                    &[],
1070                    &[],
1071                    &[],
1072                )?;
1073            }
1074        }
1075        -1 => {
1076            // NOT_FOUND (REPLACE on missing key, or CAS on missing key)
1077            if is_cas_op {
1078                STAT_CAS_MISSES.fetch_add(1, Ordering::Relaxed);
1079            }
1080            write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
1081        }
1082        -2 => {
1083            // KEY_EXISTS (ADD on existing key, or CAS mismatch)
1084            if is_cas_op {
1085                STAT_CAS_BADVAL.fetch_add(1, Ordering::Relaxed);
1086            }
1087            write_simple_response(out, opcode, ST_IX, req.hdr.opaque, CAS_ZERO, &[])?;
1088        }
1089        _ => {
1090            return Err(BridgeErr::Redis(format!(
1091                "EVAL store unknown status: {status_code}"
1092            )));
1093        }
1094    }
1095    Ok(())
1096}
1097
1098/* ------------- DELETE / DELETEQ ------------- */
1099#[cfg(not(test))]
1100fn op_delete(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1101    let opcode = req.hdr.opcode.unwrap();
1102    let rk = make_redis_key(req.key);
1103
1104    // Non-CAS DELETE bypass: when request CAS is 0 (no CAS check), use
1105    // a direct DEL command instead of the Lua script.  This is the common
1106    // path — most clients don't set CAS on delete.
1107    if req.hdr.cas == 0 {
1108        let reply = with_ctx(|ctx| ctx.call("DEL", &[rk.as_slice()]))
1109            .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1110
1111        if is_redis_error(&reply) {
1112            return Err(BridgeErr::Redis(format!("DEL error: {reply:?}")));
1113        }
1114
1115        // DEL returns the count of keys deleted: 1 = found+deleted, 0 = not found.
1116        let deleted = match &reply {
1117            RedisValue::Integer(n) => *n > 0,
1118            _ => {
1119                return Err(BridgeErr::Redis(format!("DEL unexpected reply: {reply:?}")));
1120            }
1121        };
1122
1123        if deleted {
1124            STAT_DELETE_HITS.fetch_add(1, Ordering::Relaxed);
1125            if opcode.is_quiet() {
1126                return Ok(());
1127            }
1128            let new_cas = get_next_cas();
1129            write_simple_response(out, opcode, ST_OK, req.hdr.opaque, new_cas, &[])?;
1130        } else {
1131            STAT_DELETE_MISSES.fetch_add(1, Ordering::Relaxed);
1132            write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
1133        }
1134        return Ok(());
1135    }
1136
1137    // CAS-checked DELETE: use Lua script for atomic CAS comparison.
1138    let req_cas = req.hdr.cas.to_string();
1139    let reply = with_ctx(|ctx| {
1140        let keys_and_args: &[&[u8]] = &[b"1", rk.as_slice(), req_cas.as_bytes()];
1141        eval_lua(ctx, &SCRIPT_DELETE, keys_and_args)
1142    })
1143    .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1144
1145    if is_redis_error(&reply) {
1146        return Err(BridgeErr::Redis(format!("EVALSHA delete error: {reply:?}")));
1147    }
1148
1149    let status_code = match &reply {
1150        RedisValue::Integer(n) => *n,
1151        _ => {
1152            return Err(BridgeErr::Redis(format!(
1153                "EVALSHA delete unexpected: {reply:?}"
1154            )));
1155        }
1156    };
1157
1158    match status_code {
1159        0 => {
1160            STAT_DELETE_HITS.fetch_add(1, Ordering::Relaxed);
1161            if opcode.is_quiet() {
1162                return Ok(());
1163            }
1164            let new_cas = get_next_cas();
1165            write_simple_response(out, opcode, ST_OK, req.hdr.opaque, new_cas, &[])?;
1166        }
1167        -1 => {
1168            STAT_DELETE_MISSES.fetch_add(1, Ordering::Relaxed);
1169            write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
1170        }
1171        -2 => {
1172            // KEY_EXISTS (CAS mismatch)
1173            write_simple_response(out, opcode, ST_IX, req.hdr.opaque, CAS_ZERO, &[])?;
1174        }
1175        _ => {
1176            return Err(BridgeErr::Redis(format!(
1177                "EVALSHA delete unknown status: {status_code}"
1178            )));
1179        }
1180    }
1181    Ok(())
1182}
1183
1184/// Get the next CAS value from the Redis counter.
1185#[cfg(not(test))]
1186pub(crate) fn get_next_cas() -> u64 {
1187    match with_ctx(|ctx| ctx.call("INCR", &[CAS_COUNTER_KEY])) {
1188        Ok(RedisValue::Integer(n)) => n as u64,
1189        _ => 1,
1190    }
1191}
1192
1193/* --------- INCR / DECR (and quiet variants) --------- */
1194#[cfg(not(test))]
1195fn op_counter(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1196    let opcode = req.hdr.opcode.unwrap();
1197    let base = opcode.base();
1198
1199    if req.extras.len() != 20 {
1200        write_simple_response(out, opcode, ST_ARGS, req.hdr.opaque, CAS_ZERO, &[])?;
1201        return Ok(());
1202    }
1203
1204    let delta = BigEndian::read_u64(&req.extras[0..8]);
1205    let initial = BigEndian::read_u64(&req.extras[8..16]);
1206    let expiry = BigEndian::read_u32(&req.extras[16..20]);
1207    let rk = make_redis_key(req.key);
1208
1209    let is_decr = if base == Opcode::Decrement { "1" } else { "0" };
1210    let delta_str = delta.to_string();
1211    let initial_str = initial.to_string();
1212    let expiry_str = expiry.to_string();
1213
1214    let reply = with_ctx(|ctx| {
1215        let keys_and_args: &[&[u8]] = &[
1216            b"2",
1217            rk.as_slice(),
1218            CAS_COUNTER_KEY.as_bytes(),
1219            delta_str.as_bytes(),
1220            is_decr.as_bytes(),
1221            initial_str.as_bytes(),
1222            expiry_str.as_bytes(),
1223        ];
1224        eval_lua(ctx, &SCRIPT_COUNTER, keys_and_args)
1225    })
1226    .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1227
1228    if is_redis_error(&reply) {
1229        return Err(BridgeErr::Redis(format!(
1230            "EVALSHA counter error: {reply:?}"
1231        )));
1232    }
1233
1234    // Parse result: {status, value_string, cas_string}
1235    let (status_code, value_str, new_cas) = match &reply {
1236        RedisValue::Array(arr) if arr.len() >= 3 => {
1237            let st = eval_int(&arr[0]);
1238            let val = eval_str(&arr[1]);
1239            let cas_s = eval_str(&arr[2]);
1240            let cas_val: u64 = cas_s.parse().unwrap_or(0);
1241            (st, val, cas_val)
1242        }
1243        _ => {
1244            return Err(BridgeErr::Redis(format!(
1245                "EVAL counter unexpected: {reply:?}"
1246            )));
1247        }
1248    };
1249
1250    let is_incr = base == Opcode::Increment;
1251    match status_code {
1252        0 => {
1253            if is_incr {
1254                STAT_INCR_HITS.fetch_add(1, Ordering::Relaxed);
1255            } else {
1256                STAT_DECR_HITS.fetch_add(1, Ordering::Relaxed);
1257            }
1258            // Parse the counter value as u64.
1259            let counter_val: u64 = value_str.parse().unwrap_or(0);
1260            if !opcode.is_quiet() {
1261                write_response(
1262                    out,
1263                    opcode,
1264                    &ResponseMeta {
1265                        status: ST_OK,
1266                        opaque: req.hdr.opaque,
1267                        cas: new_cas,
1268                    },
1269                    &[],
1270                    &[],
1271                    &counter_val.to_be_bytes(),
1272                )?;
1273            }
1274        }
1275        -1 => {
1276            if is_incr {
1277                STAT_INCR_MISSES.fetch_add(1, Ordering::Relaxed);
1278            } else {
1279                STAT_DECR_MISSES.fetch_add(1, Ordering::Relaxed);
1280            }
1281            // NOT_FOUND (0xFFFFFFFF expiry)
1282            write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
1283        }
1284        -3 => {
1285            // Non-numeric value
1286            write_simple_response(
1287                out,
1288                opcode,
1289                ST_ARGS,
1290                req.hdr.opaque,
1291                CAS_ZERO,
1292                b"Non-numeric value",
1293            )?;
1294        }
1295        _ => {
1296            return Err(BridgeErr::Redis(format!(
1297                "EVAL counter unknown status: {status_code}"
1298            )));
1299        }
1300    }
1301    Ok(())
1302}
1303
1304/* ------------- TOUCH ------------- */
1305#[cfg(not(test))]
1306fn op_touch(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1307    let opcode = req.hdr.opcode.unwrap();
1308    STAT_CMD_TOUCH.fetch_add(1, Ordering::Relaxed);
1309
1310    if req.extras.len() != 4 {
1311        write_simple_response(out, opcode, ST_ARGS, req.hdr.opaque, CAS_ZERO, &[])?;
1312        return Ok(());
1313    }
1314
1315    let expiry = BigEndian::read_u32(&req.extras[0..4]);
1316    let rk = make_redis_key(req.key);
1317    let expiry_str = expiry.to_string();
1318
1319    let reply = with_ctx(|ctx| {
1320        let keys_and_args: &[&[u8]] = &[
1321            b"2",
1322            rk.as_slice(),
1323            CAS_COUNTER_KEY.as_bytes(),
1324            expiry_str.as_bytes(),
1325        ];
1326        eval_lua(ctx, &SCRIPT_TOUCH, keys_and_args)
1327    })
1328    .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1329
1330    if is_redis_error(&reply) {
1331        return Err(BridgeErr::Redis(format!("EVALSHA touch error: {reply:?}")));
1332    }
1333
1334    let (status_code, new_cas) = match &reply {
1335        RedisValue::Array(arr) if arr.len() >= 2 => {
1336            let st = eval_int(&arr[0]);
1337            let cas_s = eval_str(&arr[1]);
1338            let cas_val: u64 = cas_s.parse().unwrap_or(0);
1339            (st, cas_val)
1340        }
1341        _ => {
1342            return Err(BridgeErr::Redis(format!(
1343                "EVAL touch unexpected: {reply:?}"
1344            )));
1345        }
1346    };
1347
1348    match status_code {
1349        0 => {
1350            if !opcode.is_quiet() {
1351                write_simple_response(out, opcode, ST_OK, req.hdr.opaque, new_cas, &[])?;
1352            }
1353        }
1354        -1 => {
1355            write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
1356        }
1357        _ => {
1358            return Err(BridgeErr::Redis(format!(
1359                "EVAL touch unknown status: {status_code}"
1360            )));
1361        }
1362    }
1363    Ok(())
1364}
1365
1366/* ------------- GAT (Get And Touch) ------------- */
1367#[cfg(not(test))]
1368fn op_gat(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1369    let opcode = req.hdr.opcode.unwrap();
1370    STAT_CMD_GET.fetch_add(1, Ordering::Relaxed);
1371    STAT_CMD_TOUCH.fetch_add(1, Ordering::Relaxed);
1372
1373    if req.extras.len() != 4 {
1374        write_simple_response(out, opcode, ST_ARGS, req.hdr.opaque, CAS_ZERO, &[])?;
1375        return Ok(());
1376    }
1377
1378    let expiry = BigEndian::read_u32(&req.extras[0..4]);
1379    let rk = make_redis_key(req.key);
1380    let expiry_str = expiry.to_string();
1381
1382    let reply = with_ctx(|ctx| {
1383        let keys_and_args: &[&[u8]] = &[
1384            b"2",
1385            rk.as_slice(),
1386            CAS_COUNTER_KEY.as_bytes(),
1387            expiry_str.as_bytes(),
1388        ];
1389        eval_lua(ctx, &SCRIPT_GAT, keys_and_args)
1390    })
1391    .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1392
1393    if is_redis_error(&reply) {
1394        return Err(BridgeErr::Redis(format!("EVALSHA gat error: {reply:?}")));
1395    }
1396
1397    let fields = match &reply {
1398        RedisValue::Array(arr) if arr.len() >= 4 => arr,
1399        _ => {
1400            return Err(BridgeErr::Redis(format!("EVAL gat unexpected: {reply:?}")));
1401        }
1402    };
1403
1404    let status_code = eval_int(&fields[0]);
1405    if status_code == -1 {
1406        if opcode.is_quiet() {
1407            return Ok(());
1408        }
1409        write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
1410        return Ok(());
1411    }
1412
1413    let hex_str = eval_str(&fields[1]);
1414    let value_bytes = hex_decode(&hex_str);
1415    let flags: u32 = eval_str(&fields[2]).parse().unwrap_or(0);
1416    let cas: u64 = eval_str(&fields[3]).parse().unwrap_or(0);
1417
1418    let extras = flags.to_be_bytes();
1419    let key_part = if opcode.includes_key() { req.key } else { &[] };
1420
1421    write_response(
1422        out,
1423        opcode,
1424        &ResponseMeta {
1425            status: ST_OK,
1426            opaque: req.hdr.opaque,
1427            cas,
1428        },
1429        &extras,
1430        key_part,
1431        &value_bytes,
1432    )?;
1433    Ok(())
1434}
1435
1436/* ------------- APPEND / PREPEND ------------- */
1437#[cfg(not(test))]
1438fn op_append_prepend(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1439    let opcode = req.hdr.opcode.unwrap();
1440    let base = opcode.base();
1441    // Append/Prepend are mutation commands; count under cmd_set
1442    // (matches standard memcached stat semantics).
1443    STAT_CMD_SET.fetch_add(1, Ordering::Relaxed);
1444
1445    // Append/Prepend: no extras expected, key + value in body.
1446    if !req.extras.is_empty() {
1447        write_simple_response(out, opcode, ST_ARGS, req.hdr.opaque, CAS_ZERO, &[])?;
1448        return Ok(());
1449    }
1450
1451    let rk = make_redis_key(req.key);
1452    let req_cas = req.hdr.cas.to_string();
1453
1454    let script = if base == Opcode::Append {
1455        &SCRIPT_APPEND
1456    } else {
1457        &SCRIPT_PREPEND
1458    };
1459
1460    let reply = with_ctx(|ctx| {
1461        let keys_and_args: &[&[u8]] = &[
1462            b"2",
1463            rk.as_slice(),
1464            CAS_COUNTER_KEY.as_bytes(),
1465            req.value,
1466            req_cas.as_bytes(),
1467        ];
1468        eval_lua(ctx, script, keys_and_args)
1469    })
1470    .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1471
1472    if is_redis_error(&reply) {
1473        return Err(BridgeErr::Redis(format!(
1474            "EVALSHA append/prepend error: {reply:?}"
1475        )));
1476    }
1477
1478    let (status_code, new_cas) = match &reply {
1479        RedisValue::Array(arr) if arr.len() >= 2 => {
1480            let st = eval_int(&arr[0]);
1481            let cas_s = eval_str(&arr[1]);
1482            let cas_val: u64 = cas_s.parse().unwrap_or(0);
1483            (st, cas_val)
1484        }
1485        _ => {
1486            return Err(BridgeErr::Redis(format!(
1487                "EVAL append/prepend unexpected: {reply:?}"
1488            )));
1489        }
1490    };
1491
1492    match status_code {
1493        0 => {
1494            if !opcode.is_quiet() {
1495                write_response(
1496                    out,
1497                    opcode,
1498                    &ResponseMeta {
1499                        status: ST_OK,
1500                        opaque: req.hdr.opaque,
1501                        cas: new_cas,
1502                    },
1503                    &[],
1504                    &[],
1505                    &[],
1506                )?;
1507            }
1508        }
1509        -5 => {
1510            // NOT_STORED — key does not exist
1511            write_simple_response(out, opcode, ST_NOT_STORED, req.hdr.opaque, CAS_ZERO, &[])?;
1512        }
1513        -2 => {
1514            // CAS mismatch
1515            write_simple_response(out, opcode, ST_IX, req.hdr.opaque, CAS_ZERO, &[])?;
1516        }
1517        _ => {
1518            return Err(BridgeErr::Redis(format!(
1519                "EVAL append/prepend unknown status: {status_code}"
1520            )));
1521        }
1522    }
1523    Ok(())
1524}
1525
1526/* ------------- FLUSH ------------- */
1527#[cfg(not(test))]
1528fn op_flush(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1529    let opcode = req.hdr.opcode.unwrap();
1530    STAT_CMD_FLUSH.fetch_add(1, Ordering::Relaxed);
1531
1532    // Flush deletes all rc: prefixed keys via EVALSHA (NOSCRIPT fallback).
1533    with_ctx(|ctx| {
1534        let keys_and_args: &[&[u8]] = &[b"0"];
1535        eval_lua(ctx, &SCRIPT_FLUSH, keys_and_args)
1536    })
1537    .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1538
1539    if !opcode.is_quiet() {
1540        write_simple_response(out, opcode, ST_OK, req.hdr.opaque, CAS_ZERO, &[])?;
1541    }
1542    Ok(())
1543}
1544
1545/* ------------- VERSION ------------- */
1546#[cfg(not(test))]
1547fn op_version(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1548    let opcode = req.hdr.opcode.unwrap();
1549    let version = b"RedCouch 0.1.0";
1550    write_simple_response(out, opcode, ST_OK, req.hdr.opaque, CAS_ZERO, version)?;
1551    Ok(())
1552}
1553
1554/* ------------- SASL auth (GA: permissive, no auth enforcement) ------------- */
1555
1556/// SASL_LIST_MECHS — return the list of supported SASL mechanisms.
1557/// GA behavior: returns "PLAIN" so clients know the mechanism is available.
1558/// No actual authentication is enforced in this GA release.
1559#[cfg(not(test))]
1560fn op_sasl_list_mechs(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1561    let opcode = req.hdr.opcode.unwrap();
1562    STAT_AUTH_CMDS.fetch_add(1, Ordering::Relaxed);
1563    write_simple_response(out, opcode, ST_OK, req.hdr.opaque, CAS_ZERO, b"PLAIN")?;
1564    Ok(())
1565}
1566
1567/// SASL_AUTH — accept PLAIN authentication.
1568/// GA behavior: accepts any credentials and returns ST_OK.  This lets
1569/// SASL-requiring clients (e.g. Couchbase SDKs) connect without error.
1570/// When real auth enforcement is needed post-GA, this handler should
1571/// validate credentials against a configured source.
1572#[cfg(not(test))]
1573fn op_sasl_auth(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1574    let opcode = req.hdr.opcode.unwrap();
1575    STAT_AUTH_CMDS.fetch_add(1, Ordering::Relaxed);
1576
1577    // The mechanism name is in the key field.
1578    let mechanism = std::str::from_utf8(req.key).unwrap_or("");
1579    if mechanism != "PLAIN" {
1580        STAT_AUTH_ERRORS.fetch_add(1, Ordering::Relaxed);
1581        write_simple_response(
1582            out,
1583            opcode,
1584            protocol::ST_AUTH_ERROR,
1585            req.hdr.opaque,
1586            CAS_ZERO,
1587            b"Unsupported SASL mechanism",
1588        )?;
1589        return Ok(());
1590    }
1591
1592    // PLAIN auth: accept any credentials for GA.
1593    // The value field contains the PLAIN payload: \0<username>\0<password>
1594    write_simple_response(
1595        out,
1596        opcode,
1597        ST_OK,
1598        req.hdr.opaque,
1599        CAS_ZERO,
1600        b"Authenticated",
1601    )?;
1602    Ok(())
1603}
1604
1605/// SASL_STEP — continue a multi-step SASL handshake.
1606/// GA behavior: PLAIN is single-step, so any SASL_STEP is unexpected.
1607/// Return ST_AUTH_ERROR to signal the handshake is already complete.
1608#[cfg(not(test))]
1609fn op_sasl_step(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1610    let opcode = req.hdr.opcode.unwrap();
1611    STAT_AUTH_CMDS.fetch_add(1, Ordering::Relaxed);
1612    STAT_AUTH_ERRORS.fetch_add(1, Ordering::Relaxed);
1613    write_simple_response(
1614        out,
1615        opcode,
1616        protocol::ST_AUTH_ERROR,
1617        req.hdr.opaque,
1618        CAS_ZERO,
1619        b"SASL step not expected for PLAIN mechanism",
1620    )?;
1621    Ok(())
1622}
1623
1624/* ------------- STAT ------------- */
1625
1626/// Lua script to count current items (rc:* keys).
1627#[cfg(not(test))]
1628pub(crate) const LUA_COUNT_ITEMS: &str = r#"
1629local cursor = '0'
1630local count = 0
1631repeat
1632  local result = redis.call('SCAN', cursor, 'MATCH', 'rc:*', 'COUNT', 1000)
1633  cursor = result[1]
1634  count = count + #result[2]
1635until cursor == '0'
1636return count
1637"#;
1638
1639/// STAT — return server statistics.
1640///
1641/// Binary STAT protocol:
1642/// - Each stat is a response with opcode=STAT, the stat name in the key
1643///   field, and the stat value in the value field.
1644/// - The sequence ends with a response that has empty key and empty value.
1645/// - If the request key is empty, return general stats.
1646/// - If the request key names a group we don't support, return empty
1647///   (just the terminator).
1648#[cfg(not(test))]
1649fn op_stat(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1650    let opcode = req.hdr.opcode.unwrap();
1651    let stat_key = std::str::from_utf8(req.key).unwrap_or("");
1652
1653    if stat_key.is_empty() {
1654        // General stats.
1655        let uptime = STARTUP_INSTANT
1656            .get()
1657            .map(|t| t.elapsed().as_secs())
1658            .unwrap_or(0);
1659        let pid = std::process::id();
1660
1661        // Count current items via EVALSHA SCAN (NOSCRIPT fallback).
1662        let curr_items: u64 = match with_ctx(|ctx| {
1663            let keys_and_args: &[&[u8]] = &[b"0"];
1664            eval_lua(ctx, &SCRIPT_COUNT_ITEMS, keys_and_args)
1665        }) {
1666            Ok(RedisValue::Integer(n)) => n as u64,
1667            _ => 0,
1668        };
1669
1670        let stats: Vec<(&str, String)> = vec![
1671            ("pid", pid.to_string()),
1672            ("uptime", uptime.to_string()),
1673            (
1674                "time",
1675                std::time::SystemTime::now()
1676                    .duration_since(std::time::UNIX_EPOCH)
1677                    .map(|d| d.as_secs())
1678                    .unwrap_or(0)
1679                    .to_string(),
1680            ),
1681            ("version", "RedCouch 0.1.0".to_string()),
1682            ("curr_items", curr_items.to_string()),
1683            (
1684                "curr_connections",
1685                STAT_CURR_CONNECTIONS.load(Ordering::Relaxed).to_string(),
1686            ),
1687            (
1688                "total_connections",
1689                STAT_TOTAL_CONNECTIONS.load(Ordering::Relaxed).to_string(),
1690            ),
1691            ("cmd_get", STAT_CMD_GET.load(Ordering::Relaxed).to_string()),
1692            ("cmd_set", STAT_CMD_SET.load(Ordering::Relaxed).to_string()),
1693            (
1694                "cmd_flush",
1695                STAT_CMD_FLUSH.load(Ordering::Relaxed).to_string(),
1696            ),
1697            (
1698                "cmd_touch",
1699                STAT_CMD_TOUCH.load(Ordering::Relaxed).to_string(),
1700            ),
1701            (
1702                "get_hits",
1703                STAT_GET_HITS.load(Ordering::Relaxed).to_string(),
1704            ),
1705            (
1706                "get_misses",
1707                STAT_GET_MISSES.load(Ordering::Relaxed).to_string(),
1708            ),
1709            (
1710                "delete_hits",
1711                STAT_DELETE_HITS.load(Ordering::Relaxed).to_string(),
1712            ),
1713            (
1714                "delete_misses",
1715                STAT_DELETE_MISSES.load(Ordering::Relaxed).to_string(),
1716            ),
1717            (
1718                "incr_hits",
1719                STAT_INCR_HITS.load(Ordering::Relaxed).to_string(),
1720            ),
1721            (
1722                "incr_misses",
1723                STAT_INCR_MISSES.load(Ordering::Relaxed).to_string(),
1724            ),
1725            (
1726                "decr_hits",
1727                STAT_DECR_HITS.load(Ordering::Relaxed).to_string(),
1728            ),
1729            (
1730                "decr_misses",
1731                STAT_DECR_MISSES.load(Ordering::Relaxed).to_string(),
1732            ),
1733            (
1734                "cas_hits",
1735                STAT_CAS_HITS.load(Ordering::Relaxed).to_string(),
1736            ),
1737            (
1738                "cas_misses",
1739                STAT_CAS_MISSES.load(Ordering::Relaxed).to_string(),
1740            ),
1741            (
1742                "cas_badval",
1743                STAT_CAS_BADVAL.load(Ordering::Relaxed).to_string(),
1744            ),
1745            (
1746                "auth_cmds",
1747                STAT_AUTH_CMDS.load(Ordering::Relaxed).to_string(),
1748            ),
1749            (
1750                "auth_errors",
1751                STAT_AUTH_ERRORS.load(Ordering::Relaxed).to_string(),
1752            ),
1753            (
1754                "rejected_connections",
1755                STAT_REJECTED_CONNECTIONS
1756                    .load(Ordering::Relaxed)
1757                    .to_string(),
1758            ),
1759            ("max_connections", MAX_CONNECTIONS.to_string()),
1760        ];
1761
1762        let stat_meta = ResponseMeta {
1763            status: ST_OK,
1764            opaque: req.hdr.opaque,
1765            cas: CAS_ZERO,
1766        };
1767        for (name, value) in &stats {
1768            write_response(
1769                out,
1770                opcode,
1771                &stat_meta,
1772                &[],
1773                name.as_bytes(),
1774                value.as_bytes(),
1775            )?;
1776        }
1777    }
1778    // Unsupported stat groups — return just the terminator.
1779    // This includes "settings", "items", "slabs", "conns", etc.
1780    // These are intentionally unsupported in the GA release.
1781
1782    // Terminator: empty key + empty value.
1783    write_response(
1784        out,
1785        opcode,
1786        &ResponseMeta {
1787            status: ST_OK,
1788            opaque: req.hdr.opaque,
1789            cas: CAS_ZERO,
1790        },
1791        &[],
1792        &[],
1793        &[],
1794    )?;
1795    Ok(())
1796}
1797
1798/* ------------- VERBOSITY ------------- */
1799
1800/// VERBOSITY — set server verbosity level.
1801/// GA behavior: accept and return ST_OK as a no-op.  RedCouch does not
1802/// currently support dynamic verbosity levels; logging is controlled
1803/// by Redis module logging.
1804#[cfg(not(test))]
1805fn op_verbosity(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1806    let opcode = req.hdr.opcode.unwrap();
1807    write_simple_response(out, opcode, ST_OK, req.hdr.opaque, CAS_ZERO, &[])?;
1808    Ok(())
1809}
1810
1811/* ============================================================
1812Module declaration  (allocator + init)
1813========================================================= */
1814
1815#[cfg(not(test))]
1816fn module_init(ctx: &Context, _args: &[RedisString]) -> Status {
1817    let _ = STARTUP_INSTANT.set(Instant::now());
1818    LISTENER.call_once(spawn_listener);
1819    ctx.log_notice(&format!(
1820        "redcouch: listener started on {} (max_connections={}, read_timeout={}s, write_timeout={}s, max_body={})",
1821        DEFAULT_BIND_ADDR,
1822        MAX_CONNECTIONS,
1823        SOCKET_READ_TIMEOUT.as_secs(),
1824        SOCKET_WRITE_TIMEOUT.as_secs(),
1825        MAX_BODY_LEN,
1826    ));
1827    Status::Ok
1828}
1829
1830#[cfg(not(test))]
1831redis_module! {
1832    name: "redcouch",
1833    version: 1,
1834    allocator: (
1835        redis_module::alloc::RedisAlloc,
1836        redis_module::alloc::RedisAlloc
1837    ),
1838    data_types: [],
1839    init: module_init,
1840    commands: [],
1841}