1#![forbid(unsafe_code)]
9
10pub mod ascii;
11pub mod meta;
12pub mod protocol;
13
14#[cfg(not(test))]
15use byteorder::{BigEndian, ByteOrder};
16#[cfg(not(test))]
17use bytes::{Buf, BytesMut};
18#[cfg(not(test))]
19use protocol::{
20 CAS_ZERO, MAGIC_REQ, MAX_BODY_LEN, Opcode, ParseResult, Request, ResponseMeta, ST_ARGS, ST_IX,
21 ST_NF, ST_NOT_STORED, ST_OK, ST_UNK, try_parse_request, write_error_for_raw_opcode,
22 write_response, write_simple_response,
23};
24#[cfg(not(test))]
25use std::sync::atomic::{AtomicU64, Ordering};
26#[cfg(not(test))]
27use std::time::Instant;
28#[cfg(not(test))]
29use std::{
30 io::Read,
31 net::{TcpListener, TcpStream},
32 sync::Once,
33 thread,
34 time::Duration,
35};
36
37#[cfg(not(test))]
40use redis_module::{
41 Context, DetachedFromClient, RedisString, RedisValue, Status, ThreadSafeContext, redis_module,
42};
43
44#[cfg(not(test))]
51pub(crate) const KEY_PREFIX: &[u8] = b"rc:";
52
53#[cfg(not(test))]
55pub(crate) const CAS_COUNTER_KEY: &str = "redcouch:sys:cas_counter";
56
57#[cfg(not(test))]
62pub(crate) static STAT_CMD_GET: AtomicU64 = AtomicU64::new(0);
63#[cfg(not(test))]
64pub(crate) static STAT_CMD_SET: AtomicU64 = AtomicU64::new(0);
65#[cfg(not(test))]
66pub(crate) static STAT_CMD_FLUSH: AtomicU64 = AtomicU64::new(0);
67#[cfg(not(test))]
68pub(crate) static STAT_CMD_TOUCH: AtomicU64 = AtomicU64::new(0);
69#[cfg(not(test))]
70pub(crate) static STAT_GET_HITS: AtomicU64 = AtomicU64::new(0);
71#[cfg(not(test))]
72pub(crate) static STAT_GET_MISSES: AtomicU64 = AtomicU64::new(0);
73#[cfg(not(test))]
74pub(crate) static STAT_DELETE_HITS: AtomicU64 = AtomicU64::new(0);
75#[cfg(not(test))]
76pub(crate) static STAT_DELETE_MISSES: AtomicU64 = AtomicU64::new(0);
77#[cfg(not(test))]
78pub(crate) static STAT_INCR_HITS: AtomicU64 = AtomicU64::new(0);
79#[cfg(not(test))]
80pub(crate) static STAT_INCR_MISSES: AtomicU64 = AtomicU64::new(0);
81#[cfg(not(test))]
82pub(crate) static STAT_DECR_HITS: AtomicU64 = AtomicU64::new(0);
83#[cfg(not(test))]
84pub(crate) static STAT_DECR_MISSES: AtomicU64 = AtomicU64::new(0);
85#[cfg(not(test))]
86pub(crate) static STAT_CAS_HITS: AtomicU64 = AtomicU64::new(0);
87#[cfg(not(test))]
88pub(crate) static STAT_CAS_MISSES: AtomicU64 = AtomicU64::new(0);
89#[cfg(not(test))]
90pub(crate) static STAT_CAS_BADVAL: AtomicU64 = AtomicU64::new(0);
91#[cfg(not(test))]
92pub(crate) static STAT_CURR_CONNECTIONS: AtomicU64 = AtomicU64::new(0);
93#[cfg(not(test))]
94pub(crate) static STAT_TOTAL_CONNECTIONS: AtomicU64 = AtomicU64::new(0);
95#[cfg(not(test))]
96pub(crate) static STAT_AUTH_CMDS: AtomicU64 = AtomicU64::new(0);
97#[cfg(not(test))]
98pub(crate) static STAT_AUTH_ERRORS: AtomicU64 = AtomicU64::new(0);
99
100#[cfg(not(test))]
102pub(crate) static STARTUP_INSTANT: std::sync::OnceLock<Instant> = std::sync::OnceLock::new();
103
104#[cfg(not(test))]
106pub(crate) fn make_redis_key(client_key: &[u8]) -> Vec<u8> {
107 let mut rk = Vec::with_capacity(KEY_PREFIX.len() + client_key.len());
108 rk.extend_from_slice(KEY_PREFIX);
109 rk.extend_from_slice(client_key);
110 rk
111}
112
113#[cfg(not(test))]
117static HEX_DECODE_LUT: [u8; 256] = {
118 let mut t = [0xFF_u8; 256];
119 t[b'0' as usize] = 0;
120 t[b'1' as usize] = 1;
121 t[b'2' as usize] = 2;
122 t[b'3' as usize] = 3;
123 t[b'4' as usize] = 4;
124 t[b'5' as usize] = 5;
125 t[b'6' as usize] = 6;
126 t[b'7' as usize] = 7;
127 t[b'8' as usize] = 8;
128 t[b'9' as usize] = 9;
129 t[b'a' as usize] = 10;
130 t[b'b' as usize] = 11;
131 t[b'c' as usize] = 12;
132 t[b'd' as usize] = 13;
133 t[b'e' as usize] = 14;
134 t[b'f' as usize] = 15;
135 t[b'A' as usize] = 10;
136 t[b'B' as usize] = 11;
137 t[b'C' as usize] = 12;
138 t[b'D' as usize] = 13;
139 t[b'E' as usize] = 14;
140 t[b'F' as usize] = 15;
141 t
142};
143
144#[cfg(not(test))]
150pub(crate) fn hex_decode(hex: &str) -> Vec<u8> {
151 if hex.len() % 2 != 0 {
152 return Vec::new();
153 }
154 let mut out = Vec::with_capacity(hex.len() / 2);
155 let bytes = hex.as_bytes();
156 let mut i = 0;
157 while i < bytes.len() {
158 let hi = HEX_DECODE_LUT[bytes[i] as usize];
159 let lo = HEX_DECODE_LUT[bytes[i + 1] as usize];
160 if hi > 0x0F || lo > 0x0F {
161 return Vec::new();
162 }
163 out.push((hi << 4) | lo);
164 i += 2;
165 }
166 out
167}
168
169#[cfg(not(test))]
185pub(crate) const LUA_STORE: &str = r#"
186local op = ARGV[1]
187local exists = redis.call('EXISTS', KEYS[1])
188if op == 'add' and exists == 1 then return {-2, ''} end
189if op == 'replace' and exists == 0 then return {-1, ''} end
190local req_cas = ARGV[4]
191if req_cas ~= '0' then
192 if exists == 0 then return {-1, ''} end
193 local stored_cas = redis.call('HGET', KEYS[1], 'c')
194 if stored_cas ~= req_cas then return {-2, ''} end
195end
196local new_cas = redis.call('INCR', KEYS[2])
197redis.call('HSET', KEYS[1], 'v', ARGV[2], 'f', ARGV[3], 'c', tostring(new_cas))
198local exp = tonumber(ARGV[5])
199if exp ~= nil and exp > 0 then
200 if exp <= 2592000 then redis.call('EXPIRE', KEYS[1], exp)
201 else redis.call('EXPIREAT', KEYS[1], exp) end
202elseif exp == 0 and exists == 1 then
203 redis.call('PERSIST', KEYS[1])
204end
205return {0, tostring(new_cas)}
206"#;
207
208#[cfg(not(test))]
220pub(crate) const LUA_GET: &str = r#"
221if redis.call('EXISTS', KEYS[1]) == 0 then return {-1, '', '', ''} end
222local v = redis.call('HGET', KEYS[1], 'v')
223local f = redis.call('HGET', KEYS[1], 'f')
224local c = redis.call('HGET', KEYS[1], 'c')
225if v == false then v = '' end
226if f == false then f = '0' end
227if c == false then c = '0' end
228local ht = {}
229for i = 0, 255 do ht[i] = string.format('%02x', i) end
230local p = {}
231for i = 1, #v do p[i] = ht[string.byte(v, i)] end
232return {0, table.concat(p), f, c}
233"#;
234
235#[cfg(not(test))]
244pub(crate) const LUA_META_GET: &str = r#"
245if redis.call('EXISTS', KEYS[1]) == 0 then return {-1, '', '', '', -2, 0} end
246local v = redis.call('HGET', KEYS[1], 'v')
247local f = redis.call('HGET', KEYS[1], 'f')
248local c = redis.call('HGET', KEYS[1], 'c')
249if v == false then v = '' end
250if f == false then f = '0' end
251if c == false then c = '0' end
252local ttl = redis.call('TTL', KEYS[1])
253local ht = {}
254for i = 0, 255 do ht[i] = string.format('%02x', i) end
255local p = {}
256for i = 1, #v do p[i] = ht[string.byte(v, i)] end
257return {0, table.concat(p), f, c, ttl, #v}
258"#;
259
260#[cfg(not(test))]
267pub(crate) const LUA_DELETE: &str = r#"
268if redis.call('EXISTS', KEYS[1]) == 0 then return -1 end
269local req_cas = ARGV[1]
270if req_cas ~= '0' then
271 local stored_cas = redis.call('HGET', KEYS[1], 'c')
272 if stored_cas ~= req_cas then return -2 end
273end
274redis.call('DEL', KEYS[1])
275return 0
276"#;
277
278#[cfg(not(test))]
297pub(crate) const LUA_COUNTER: &str = r#"
298local exists = redis.call('EXISTS', KEYS[1])
299if exists == 0 then
300 local exp = tonumber(ARGV[4])
301 if exp == 4294967295 then return {-1, '', ''} end
302 local new_cas = redis.call('INCR', KEYS[2])
303 local init = ARGV[3]
304 redis.call('HSET', KEYS[1], 'v', init, 'f', '0', 'c', tostring(new_cas))
305 if exp ~= nil and exp > 0 then
306 if exp <= 2592000 then redis.call('EXPIRE', KEYS[1], exp)
307 else redis.call('EXPIREAT', KEYS[1], exp) end
308 end
309 return {0, init, tostring(new_cas)}
310end
311local val = redis.call('HGET', KEYS[1], 'v')
312local num = tonumber(val)
313if num == nil then return {-3, '', ''} end
314local delta = tonumber(ARGV[1])
315if ARGV[2] == '1' then
316 if num < delta then num = 0 else num = num - delta end
317else
318 num = num + delta
319end
320if num < 0 then num = 0 end
321local str_val = string.format('%.0f', num)
322local new_cas = redis.call('INCR', KEYS[2])
323redis.call('HSET', KEYS[1], 'v', str_val, 'c', tostring(new_cas))
324return {0, str_val, tostring(new_cas)}
325"#;
326
327#[cfg(not(test))]
335pub(crate) const LUA_TOUCH: &str = r#"
336if redis.call('EXISTS', KEYS[1]) == 0 then return {-1, ''} end
337local exp = tonumber(ARGV[1])
338if exp ~= nil and exp > 0 then
339 if exp <= 2592000 then redis.call('EXPIRE', KEYS[1], exp)
340 else redis.call('EXPIREAT', KEYS[1], exp) end
341elseif exp == 0 then
342 redis.call('PERSIST', KEYS[1])
343end
344local new_cas = redis.call('INCR', KEYS[2])
345redis.call('HSET', KEYS[1], 'c', tostring(new_cas))
346return {0, tostring(new_cas)}
347"#;
348
349#[cfg(not(test))]
357pub(crate) const LUA_GAT: &str = r#"
358if redis.call('EXISTS', KEYS[1]) == 0 then return {-1, '', '', ''} end
359local exp = tonumber(ARGV[1])
360if exp ~= nil and exp > 0 then
361 if exp <= 2592000 then redis.call('EXPIRE', KEYS[1], exp)
362 else redis.call('EXPIREAT', KEYS[1], exp) end
363elseif exp == 0 then
364 redis.call('PERSIST', KEYS[1])
365end
366local new_cas = redis.call('INCR', KEYS[2])
367redis.call('HSET', KEYS[1], 'c', tostring(new_cas))
368local v = redis.call('HGET', KEYS[1], 'v')
369local f = redis.call('HGET', KEYS[1], 'f')
370if v == false then v = '' end
371if f == false then f = '0' end
372local ht = {}
373for i = 0, 255 do ht[i] = string.format('%02x', i) end
374local p = {}
375for i = 1, #v do p[i] = ht[string.byte(v, i)] end
376return {0, table.concat(p), f, tostring(new_cas)}
377"#;
378
379#[cfg(not(test))]
388pub(crate) const LUA_APPEND: &str = r#"
389if redis.call('EXISTS', KEYS[1]) == 0 then return {-5, ''} end
390local req_cas = ARGV[2]
391if req_cas ~= '0' then
392 local stored_cas = redis.call('HGET', KEYS[1], 'c')
393 if stored_cas ~= req_cas then return {-2, ''} end
394end
395local old_v = redis.call('HGET', KEYS[1], 'v')
396if old_v == false then old_v = '' end
397local new_v = old_v .. ARGV[1]
398local new_cas = redis.call('INCR', KEYS[2])
399redis.call('HSET', KEYS[1], 'v', new_v, 'c', tostring(new_cas))
400return {0, tostring(new_cas)}
401"#;
402
403#[cfg(not(test))]
412pub(crate) const LUA_PREPEND: &str = r#"
413if redis.call('EXISTS', KEYS[1]) == 0 then return {-5, ''} end
414local req_cas = ARGV[2]
415if req_cas ~= '0' then
416 local stored_cas = redis.call('HGET', KEYS[1], 'c')
417 if stored_cas ~= req_cas then return {-2, ''} end
418end
419local old_v = redis.call('HGET', KEYS[1], 'v')
420if old_v == false then old_v = '' end
421local new_v = ARGV[1] .. old_v
422local new_cas = redis.call('INCR', KEYS[2])
423redis.call('HSET', KEYS[1], 'v', new_v, 'c', tostring(new_cas))
424return {0, tostring(new_cas)}
425"#;
426
427#[cfg(not(test))]
431pub(crate) const LUA_FLUSH: &str = r#"
432local cursor = '0'
433local count = 0
434repeat
435 local result = redis.call('SCAN', cursor, 'MATCH', 'rc:*', 'COUNT', 100)
436 cursor = result[1]
437 local keys = result[2]
438 if #keys > 0 then
439 for i, key in ipairs(keys) do
440 redis.call('DEL', key)
441 count = count + 1
442 end
443 end
444until cursor == '0'
445return count
446"#;
447
448#[cfg(not(test))]
455use std::sync::OnceLock;
456
457#[cfg(not(test))]
459pub(crate) struct LuaScript {
460 pub source: &'static str,
461 sha1: OnceLock<String>,
462}
463
464#[cfg(not(test))]
465impl LuaScript {
466 pub const fn new(source: &'static str) -> Self {
467 Self {
468 source,
469 sha1: OnceLock::new(),
470 }
471 }
472
473 pub fn sha1(&self) -> &str {
475 self.sha1.get_or_init(|| {
476 use sha1::Digest;
477 let hash = sha1::Sha1::digest(self.source.as_bytes());
478 let mut hex = String::with_capacity(40);
480 for byte in hash.iter() {
481 use std::fmt::Write;
482 let _ = write!(hex, "{:02x}", byte);
483 }
484 hex
485 })
486 }
487}
488
489#[cfg(not(test))]
491pub(crate) static SCRIPT_STORE: LuaScript = LuaScript::new(LUA_STORE);
492#[cfg(not(test))]
493pub(crate) static SCRIPT_GET: LuaScript = LuaScript::new(LUA_GET);
494#[cfg(not(test))]
495pub(crate) static SCRIPT_META_GET: LuaScript = LuaScript::new(LUA_META_GET);
496#[cfg(not(test))]
497pub(crate) static SCRIPT_DELETE: LuaScript = LuaScript::new(LUA_DELETE);
498#[cfg(not(test))]
499pub(crate) static SCRIPT_COUNTER: LuaScript = LuaScript::new(LUA_COUNTER);
500#[cfg(not(test))]
501pub(crate) static SCRIPT_TOUCH: LuaScript = LuaScript::new(LUA_TOUCH);
502#[cfg(not(test))]
503pub(crate) static SCRIPT_GAT: LuaScript = LuaScript::new(LUA_GAT);
504#[cfg(not(test))]
505pub(crate) static SCRIPT_APPEND: LuaScript = LuaScript::new(LUA_APPEND);
506#[cfg(not(test))]
507pub(crate) static SCRIPT_PREPEND: LuaScript = LuaScript::new(LUA_PREPEND);
508#[cfg(not(test))]
509pub(crate) static SCRIPT_FLUSH: LuaScript = LuaScript::new(LUA_FLUSH);
510#[cfg(not(test))]
511pub(crate) static SCRIPT_COUNT_ITEMS: LuaScript = LuaScript::new(LUA_COUNT_ITEMS);
512
513#[cfg(not(test))]
520pub(crate) fn eval_lua(
521 ctx: &redis_module::Context,
522 script: &LuaScript,
523 keys_and_args: &[&[u8]],
524) -> Result<RedisValue, redis_module::RedisError> {
525 let sha1 = script.sha1();
526 let sha1_bytes = sha1.as_bytes();
527
528 let mut args: Vec<&[u8]> = Vec::with_capacity(1 + keys_and_args.len());
530 args.push(sha1_bytes);
531 args.extend_from_slice(keys_and_args);
532
533 let result = ctx.call("EVALSHA", args.as_slice());
534
535 match &result {
537 Ok(RedisValue::StaticError(e)) if e.contains("NOSCRIPT") => {}
538 Err(e) => {
539 let msg = e.to_string();
540 if !msg.contains("NOSCRIPT") {
541 return result;
542 }
543 }
544 _ => return result,
545 }
546
547 let mut eval_args: Vec<&[u8]> = Vec::with_capacity(1 + keys_and_args.len());
549 eval_args.push(script.source.as_bytes());
550 eval_args.extend_from_slice(keys_and_args);
551 ctx.call("EVAL", eval_args.as_slice())
552}
553
554#[cfg(not(test))]
561const DEFAULT_BIND_ADDR: &str = "127.0.0.1:11210";
562
563#[cfg(not(test))]
566pub(crate) const MAX_CONNECTIONS: u64 = 1024;
567
568#[cfg(not(test))]
571const SOCKET_READ_TIMEOUT: Duration = Duration::from_secs(30);
572
573#[cfg(not(test))]
575const SOCKET_WRITE_TIMEOUT: Duration = Duration::from_secs(10);
576
577#[cfg(not(test))]
581const MAX_READ_BUF: usize = (MAX_BODY_LEN as usize) + protocol::HEADER_LEN + 4096;
582
583#[cfg(not(test))]
585pub(crate) static STAT_REJECTED_CONNECTIONS: AtomicU64 = AtomicU64::new(0);
586
587#[cfg(not(test))]
588static LISTENER: Once = Once::new();
589
590#[cfg(not(test))]
591fn spawn_listener() {
592 thread::spawn(|| {
593 let listener = match TcpListener::bind(DEFAULT_BIND_ADDR) {
594 Ok(l) => l,
595 Err(e) => {
596 eprintln!(
597 "[redcouch] FATAL: cannot bind {DEFAULT_BIND_ADDR}: {e} \
598 (is another instance already running?)"
599 );
600 return;
601 }
602 };
603 listener.set_nonblocking(false).ok();
605 eprintln!(
606 "[redcouch] listening on {DEFAULT_BIND_ADDR} (max_connections={MAX_CONNECTIONS})"
607 );
608
609 for stream in listener.incoming() {
610 match stream {
611 Ok(mut sock) => {
612 let current = STAT_CURR_CONNECTIONS.load(Ordering::Relaxed);
614 if current >= MAX_CONNECTIONS {
615 STAT_REJECTED_CONNECTIONS.fetch_add(1, Ordering::Relaxed);
616 eprintln!(
617 "[redcouch] connection limit reached ({MAX_CONNECTIONS}), rejecting"
618 );
619 drop(sock);
621 continue;
622 }
623
624 sock.set_nodelay(true).ok();
625 sock.set_read_timeout(Some(SOCKET_READ_TIMEOUT)).ok();
626 sock.set_write_timeout(Some(SOCKET_WRITE_TIMEOUT)).ok();
627 STAT_TOTAL_CONNECTIONS.fetch_add(1, Ordering::Relaxed);
628 STAT_CURR_CONNECTIONS.fetch_add(1, Ordering::Relaxed);
629 thread::spawn(move || {
630 if let Err(e) = handle_conn(&mut sock) {
631 if let BridgeErr::Io(ref io_err) = e {
634 if io_err.kind() == std::io::ErrorKind::WouldBlock
635 || io_err.kind() == std::io::ErrorKind::TimedOut
636 {
637 } else {
639 eprintln!("[redcouch] connection error: {e}");
640 }
641 } else {
642 eprintln!("[redcouch] connection error: {e}");
643 }
644 }
645 STAT_CURR_CONNECTIONS.fetch_sub(1, Ordering::Relaxed);
646 });
647 }
648 Err(e) => {
649 eprintln!("[redcouch] accept error: {e}");
650 thread::sleep(Duration::from_millis(100));
653 }
654 }
655 }
656 });
657}
658
659#[cfg(not(test))]
664#[derive(thiserror::Error, Debug)]
665pub(crate) enum BridgeErr {
666 #[error("io {0}")]
667 Io(#[from] std::io::Error),
668 #[error("redis {0}")]
669 Redis(String),
670}
671#[cfg(not(test))]
672pub(crate) type Br<T> = Result<T, BridgeErr>;
673
674#[cfg(not(test))]
675fn handle_conn(sock: &mut TcpStream) -> Br<()> {
676 let mut buf = BytesMut::with_capacity(16384);
677
678 loop {
681 if buf.is_empty() {
682 let mut tmp = [0u8; 16384];
683 match sock.read(&mut tmp) {
684 Ok(0) => return Ok(()),
685 Ok(n) => buf.extend_from_slice(&tmp[..n]),
686 Err(ref e)
687 if e.kind() == std::io::ErrorKind::WouldBlock
688 || e.kind() == std::io::ErrorKind::TimedOut =>
689 {
690 return Err(std::io::Error::from(std::io::ErrorKind::TimedOut).into());
691 }
692 Err(e) => return Err(e.into()),
693 }
694 }
695
696 while !buf.is_empty() && (buf[0] == b'\r' || buf[0] == b'\n') {
698 buf.advance(1);
699 }
700 if buf.is_empty() {
701 continue; }
703
704 let first = buf[0];
705 if first == MAGIC_REQ {
706 return handle_binary_conn(sock, &mut buf);
708 } else if first.is_ascii_graphic() || first == b' ' {
709 return ascii::handle_ascii_conn(sock, &mut buf);
711 } else {
712 eprintln!("[redcouch] unknown protocol byte 0x{first:02x}, closing");
714 return Ok(());
715 }
716 }
717}
718
719#[cfg(not(test))]
721fn handle_binary_conn(sock: &mut TcpStream, buf: &mut BytesMut) -> Br<()> {
722 let mut out = Vec::with_capacity(8192);
726
727 loop {
728 if buf.len() > MAX_READ_BUF {
730 eprintln!("[redcouch] read buffer exceeded {MAX_READ_BUF} bytes, closing connection");
731 return Ok(());
732 }
733
734 if buf.is_empty() {
736 let mut tmp = [0u8; 16384];
737 match sock.read(&mut tmp) {
738 Ok(0) => return Ok(()),
739 Ok(n) => buf.extend_from_slice(&tmp[..n]),
740 Err(ref e)
741 if e.kind() == std::io::ErrorKind::WouldBlock
742 || e.kind() == std::io::ErrorKind::TimedOut =>
743 {
744 return Err(std::io::Error::from(std::io::ErrorKind::TimedOut).into());
745 }
746 Err(e) => return Err(e.into()),
747 }
748 }
749
750 loop {
751 match try_parse_request(buf) {
752 ParseResult::Ok((req, used)) => {
753 handle(req, &mut out)?;
754 buf.advance(used);
755 }
756 ParseResult::Incomplete => break,
757 ParseResult::BadMagic => {
758 eprintln!("[redcouch] bad magic byte, closing connection");
759 return Ok(());
760 }
761 ParseResult::MalformedFrame {
762 opaque,
763 opcode_byte,
764 bytes_to_skip,
765 } => {
766 eprintln!(
767 "[redcouch] malformed frame (opcode 0x{opcode_byte:02x}), skipping {bytes_to_skip} bytes"
768 );
769 write_error_for_raw_opcode(
770 &mut out,
771 opcode_byte,
772 ST_ARGS,
773 opaque,
774 b"Malformed frame",
775 )?;
776 buf.advance(bytes_to_skip);
777 }
778 ParseResult::OversizedFrame {
779 opaque,
780 opcode_byte,
781 } => {
782 eprintln!(
783 "[redcouch] oversized frame (opcode 0x{opcode_byte:02x}), closing connection"
784 );
785 write_error_for_raw_opcode(
786 &mut out,
787 opcode_byte,
788 ST_ARGS,
789 opaque,
790 b"Frame too large",
791 )?;
792 if !out.is_empty() {
794 use std::io::Write;
795 sock.write_all(&out)?;
796 out.clear();
797 }
798 return Ok(());
799 }
800 }
801 }
802
803 if !out.is_empty() {
805 use std::io::Write;
806 sock.write_all(&out)?;
807 out.clear();
808 }
809
810 let mut tmp = [0u8; 16384];
812 match sock.read(&mut tmp) {
813 Ok(0) => return Ok(()),
814 Ok(n) => buf.extend_from_slice(&tmp[..n]),
815 Err(ref e)
816 if e.kind() == std::io::ErrorKind::WouldBlock
817 || e.kind() == std::io::ErrorKind::TimedOut =>
818 {
819 return Err(std::io::Error::from(std::io::ErrorKind::TimedOut).into());
820 }
821 Err(e) => return Err(e.into()),
822 }
823 }
824}
825
826#[cfg(not(test))]
831fn handle(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
832 let opcode = match req.hdr.opcode {
833 Some(op) => op,
834 None => {
835 write_error_for_raw_opcode(
837 out,
838 req.hdr.opcode_byte,
839 ST_UNK,
840 req.hdr.opaque,
841 b"Unknown command",
842 )?;
843 return Ok(());
844 }
845 };
846
847 use Opcode::*;
848 match opcode.base() {
849 Get | GetK => op_get(req, out),
850 Set | Add | Replace => op_store(req, out),
851 Delete => op_delete(req, out),
852 Increment | Decrement => op_counter(req, out),
853 Touch => op_touch(req, out),
854 GAT => op_gat(req, out),
855 Append | Prepend => op_append_prepend(req, out),
856 Flush => op_flush(req, out),
857 Version => op_version(req, out),
858 Stat => op_stat(req, out),
859 Verbosity => op_verbosity(req, out),
860 SaslListMechs => op_sasl_list_mechs(req, out),
861 SaslAuth => op_sasl_auth(req, out),
862 SaslStep => op_sasl_step(req, out),
863 Noop => {
864 write_simple_response(out, opcode, ST_OK, req.hdr.opaque, CAS_ZERO, &[])?;
865 Ok(())
866 }
867 Quit => {
868 if !opcode.is_quiet() {
869 write_simple_response(out, opcode, ST_OK, req.hdr.opaque, CAS_ZERO, &[])?;
870 }
871 Err(std::io::Error::from(std::io::ErrorKind::ConnectionAborted).into())
872 }
873 _ => {
876 write_error_for_raw_opcode(
877 out,
878 req.hdr.opcode_byte,
879 ST_UNK,
880 req.hdr.opaque,
881 b"Unknown command",
882 )?;
883 Ok(())
884 }
885 }
886}
887
888#[cfg(not(test))]
890pub(crate) fn with_ctx<T>(f: impl Fn(&redis_module::Context) -> T) -> T {
891 let tsc = ThreadSafeContext::<DetachedFromClient>::new();
892 let guard = tsc.lock();
893 f(&guard)
894}
895
896#[cfg(not(test))]
900pub(crate) fn is_redis_error(v: &RedisValue) -> bool {
901 matches!(v, RedisValue::StaticError(_))
902}
903
904#[cfg(not(test))]
906pub(crate) fn eval_int(v: &RedisValue) -> i64 {
907 match v {
908 RedisValue::Integer(n) => *n,
909 _ => 0,
910 }
911}
912
913#[cfg(not(test))]
915pub(crate) fn eval_str(v: &RedisValue) -> String {
916 match v {
917 RedisValue::BulkString(s) => s.clone(),
918 RedisValue::BulkRedisString(s) => s.to_string_lossy(),
919 RedisValue::SimpleString(s) => s.clone(),
920 RedisValue::SimpleStringStatic(s) => (*s).to_string(),
921 _ => String::new(),
922 }
923}
924
925#[cfg(not(test))]
927fn op_get(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
928 let opcode = req.hdr.opcode.unwrap();
929 STAT_CMD_GET.fetch_add(1, Ordering::Relaxed);
930 let rk = make_redis_key(req.key);
931
932 let reply = with_ctx(|ctx| {
934 let keys_and_args: &[&[u8]] = &[b"1", rk.as_slice()];
935 eval_lua(ctx, &SCRIPT_GET, keys_and_args)
936 })
937 .map_err(|e| BridgeErr::Redis(e.to_string()))?;
938
939 if is_redis_error(&reply) {
940 return Err(BridgeErr::Redis(format!("EVALSHA get error: {reply:?}")));
941 }
942
943 let fields = match &reply {
945 RedisValue::Array(arr) if arr.len() >= 4 => arr,
946 _ => {
947 return Err(BridgeErr::Redis(format!("EVAL get unexpected: {reply:?}")));
948 }
949 };
950
951 let status_code = eval_int(&fields[0]);
952 if status_code == -1 {
953 STAT_GET_MISSES.fetch_add(1, Ordering::Relaxed);
955 if opcode.is_quiet() {
956 return Ok(());
957 }
958 write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
959 return Ok(());
960 }
961
962 STAT_GET_HITS.fetch_add(1, Ordering::Relaxed);
963
964 let hex_str = eval_str(&fields[1]);
967 let value_bytes = hex_decode(&hex_str);
968
969 let flags: u32 = eval_str(&fields[2]).parse().unwrap_or(0);
970 let cas: u64 = eval_str(&fields[3]).parse().unwrap_or(0);
971
972 let extras = flags.to_be_bytes();
973 let key_part = if opcode.includes_key() { req.key } else { &[] };
974
975 write_response(
976 out,
977 opcode,
978 &ResponseMeta {
979 status: ST_OK,
980 opaque: req.hdr.opaque,
981 cas,
982 },
983 &extras,
984 key_part,
985 &value_bytes,
986 )?;
987 Ok(())
988}
989
990#[cfg(not(test))]
992fn op_store(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
993 let opcode = req.hdr.opcode.unwrap();
994 let base = opcode.base();
995 STAT_CMD_SET.fetch_add(1, Ordering::Relaxed);
996
997 if req.extras.len() != 8 {
998 write_simple_response(out, opcode, ST_ARGS, req.hdr.opaque, CAS_ZERO, &[])?;
999 return Ok(());
1000 }
1001
1002 let flags = BigEndian::read_u32(&req.extras[0..4]);
1003 let expiry = BigEndian::read_u32(&req.extras[4..8]);
1004 let rk = make_redis_key(req.key);
1005
1006 let op_name = match base {
1007 Opcode::Set => "set",
1008 Opcode::Add => "add",
1009 Opcode::Replace => "replace",
1010 _ => "set",
1011 };
1012
1013 let req_cas = req.hdr.cas.to_string();
1014 let flags_str = flags.to_string();
1015 let expiry_str = expiry.to_string();
1016
1017 let reply = with_ctx(|ctx| {
1019 let keys_and_args: &[&[u8]] = &[
1020 b"2",
1021 rk.as_slice(),
1022 CAS_COUNTER_KEY.as_bytes(),
1023 op_name.as_bytes(),
1024 req.value,
1025 flags_str.as_bytes(),
1026 req_cas.as_bytes(),
1027 expiry_str.as_bytes(),
1028 ];
1029 eval_lua(ctx, &SCRIPT_STORE, keys_and_args)
1030 })
1031 .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1032
1033 if is_redis_error(&reply) {
1034 return Err(BridgeErr::Redis(format!("EVALSHA store error: {reply:?}")));
1035 }
1036
1037 let (status_code, new_cas) = match &reply {
1039 RedisValue::Array(arr) if arr.len() >= 2 => {
1040 let st = eval_int(&arr[0]);
1041 let cas_s = eval_str(&arr[1]);
1042 let cas_val: u64 = cas_s.parse().unwrap_or(0);
1043 (st, cas_val)
1044 }
1045 _ => {
1046 return Err(BridgeErr::Redis(format!(
1047 "EVAL store unexpected: {reply:?}"
1048 )));
1049 }
1050 };
1051
1052 let is_cas_op = req.hdr.cas != 0;
1054 match status_code {
1055 0 => {
1056 if is_cas_op {
1058 STAT_CAS_HITS.fetch_add(1, Ordering::Relaxed);
1059 }
1060 if !opcode.is_quiet() {
1061 write_response(
1062 out,
1063 opcode,
1064 &ResponseMeta {
1065 status: ST_OK,
1066 opaque: req.hdr.opaque,
1067 cas: new_cas,
1068 },
1069 &[],
1070 &[],
1071 &[],
1072 )?;
1073 }
1074 }
1075 -1 => {
1076 if is_cas_op {
1078 STAT_CAS_MISSES.fetch_add(1, Ordering::Relaxed);
1079 }
1080 write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
1081 }
1082 -2 => {
1083 if is_cas_op {
1085 STAT_CAS_BADVAL.fetch_add(1, Ordering::Relaxed);
1086 }
1087 write_simple_response(out, opcode, ST_IX, req.hdr.opaque, CAS_ZERO, &[])?;
1088 }
1089 _ => {
1090 return Err(BridgeErr::Redis(format!(
1091 "EVAL store unknown status: {status_code}"
1092 )));
1093 }
1094 }
1095 Ok(())
1096}
1097
1098#[cfg(not(test))]
1100fn op_delete(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1101 let opcode = req.hdr.opcode.unwrap();
1102 let rk = make_redis_key(req.key);
1103
1104 if req.hdr.cas == 0 {
1108 let reply = with_ctx(|ctx| ctx.call("DEL", &[rk.as_slice()]))
1109 .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1110
1111 if is_redis_error(&reply) {
1112 return Err(BridgeErr::Redis(format!("DEL error: {reply:?}")));
1113 }
1114
1115 let deleted = match &reply {
1117 RedisValue::Integer(n) => *n > 0,
1118 _ => {
1119 return Err(BridgeErr::Redis(format!("DEL unexpected reply: {reply:?}")));
1120 }
1121 };
1122
1123 if deleted {
1124 STAT_DELETE_HITS.fetch_add(1, Ordering::Relaxed);
1125 if opcode.is_quiet() {
1126 return Ok(());
1127 }
1128 let new_cas = get_next_cas();
1129 write_simple_response(out, opcode, ST_OK, req.hdr.opaque, new_cas, &[])?;
1130 } else {
1131 STAT_DELETE_MISSES.fetch_add(1, Ordering::Relaxed);
1132 write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
1133 }
1134 return Ok(());
1135 }
1136
1137 let req_cas = req.hdr.cas.to_string();
1139 let reply = with_ctx(|ctx| {
1140 let keys_and_args: &[&[u8]] = &[b"1", rk.as_slice(), req_cas.as_bytes()];
1141 eval_lua(ctx, &SCRIPT_DELETE, keys_and_args)
1142 })
1143 .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1144
1145 if is_redis_error(&reply) {
1146 return Err(BridgeErr::Redis(format!("EVALSHA delete error: {reply:?}")));
1147 }
1148
1149 let status_code = match &reply {
1150 RedisValue::Integer(n) => *n,
1151 _ => {
1152 return Err(BridgeErr::Redis(format!(
1153 "EVALSHA delete unexpected: {reply:?}"
1154 )));
1155 }
1156 };
1157
1158 match status_code {
1159 0 => {
1160 STAT_DELETE_HITS.fetch_add(1, Ordering::Relaxed);
1161 if opcode.is_quiet() {
1162 return Ok(());
1163 }
1164 let new_cas = get_next_cas();
1165 write_simple_response(out, opcode, ST_OK, req.hdr.opaque, new_cas, &[])?;
1166 }
1167 -1 => {
1168 STAT_DELETE_MISSES.fetch_add(1, Ordering::Relaxed);
1169 write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
1170 }
1171 -2 => {
1172 write_simple_response(out, opcode, ST_IX, req.hdr.opaque, CAS_ZERO, &[])?;
1174 }
1175 _ => {
1176 return Err(BridgeErr::Redis(format!(
1177 "EVALSHA delete unknown status: {status_code}"
1178 )));
1179 }
1180 }
1181 Ok(())
1182}
1183
1184#[cfg(not(test))]
1186pub(crate) fn get_next_cas() -> u64 {
1187 match with_ctx(|ctx| ctx.call("INCR", &[CAS_COUNTER_KEY])) {
1188 Ok(RedisValue::Integer(n)) => n as u64,
1189 _ => 1,
1190 }
1191}
1192
1193#[cfg(not(test))]
1195fn op_counter(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1196 let opcode = req.hdr.opcode.unwrap();
1197 let base = opcode.base();
1198
1199 if req.extras.len() != 20 {
1200 write_simple_response(out, opcode, ST_ARGS, req.hdr.opaque, CAS_ZERO, &[])?;
1201 return Ok(());
1202 }
1203
1204 let delta = BigEndian::read_u64(&req.extras[0..8]);
1205 let initial = BigEndian::read_u64(&req.extras[8..16]);
1206 let expiry = BigEndian::read_u32(&req.extras[16..20]);
1207 let rk = make_redis_key(req.key);
1208
1209 let is_decr = if base == Opcode::Decrement { "1" } else { "0" };
1210 let delta_str = delta.to_string();
1211 let initial_str = initial.to_string();
1212 let expiry_str = expiry.to_string();
1213
1214 let reply = with_ctx(|ctx| {
1215 let keys_and_args: &[&[u8]] = &[
1216 b"2",
1217 rk.as_slice(),
1218 CAS_COUNTER_KEY.as_bytes(),
1219 delta_str.as_bytes(),
1220 is_decr.as_bytes(),
1221 initial_str.as_bytes(),
1222 expiry_str.as_bytes(),
1223 ];
1224 eval_lua(ctx, &SCRIPT_COUNTER, keys_and_args)
1225 })
1226 .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1227
1228 if is_redis_error(&reply) {
1229 return Err(BridgeErr::Redis(format!(
1230 "EVALSHA counter error: {reply:?}"
1231 )));
1232 }
1233
1234 let (status_code, value_str, new_cas) = match &reply {
1236 RedisValue::Array(arr) if arr.len() >= 3 => {
1237 let st = eval_int(&arr[0]);
1238 let val = eval_str(&arr[1]);
1239 let cas_s = eval_str(&arr[2]);
1240 let cas_val: u64 = cas_s.parse().unwrap_or(0);
1241 (st, val, cas_val)
1242 }
1243 _ => {
1244 return Err(BridgeErr::Redis(format!(
1245 "EVAL counter unexpected: {reply:?}"
1246 )));
1247 }
1248 };
1249
1250 let is_incr = base == Opcode::Increment;
1251 match status_code {
1252 0 => {
1253 if is_incr {
1254 STAT_INCR_HITS.fetch_add(1, Ordering::Relaxed);
1255 } else {
1256 STAT_DECR_HITS.fetch_add(1, Ordering::Relaxed);
1257 }
1258 let counter_val: u64 = value_str.parse().unwrap_or(0);
1260 if !opcode.is_quiet() {
1261 write_response(
1262 out,
1263 opcode,
1264 &ResponseMeta {
1265 status: ST_OK,
1266 opaque: req.hdr.opaque,
1267 cas: new_cas,
1268 },
1269 &[],
1270 &[],
1271 &counter_val.to_be_bytes(),
1272 )?;
1273 }
1274 }
1275 -1 => {
1276 if is_incr {
1277 STAT_INCR_MISSES.fetch_add(1, Ordering::Relaxed);
1278 } else {
1279 STAT_DECR_MISSES.fetch_add(1, Ordering::Relaxed);
1280 }
1281 write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
1283 }
1284 -3 => {
1285 write_simple_response(
1287 out,
1288 opcode,
1289 ST_ARGS,
1290 req.hdr.opaque,
1291 CAS_ZERO,
1292 b"Non-numeric value",
1293 )?;
1294 }
1295 _ => {
1296 return Err(BridgeErr::Redis(format!(
1297 "EVAL counter unknown status: {status_code}"
1298 )));
1299 }
1300 }
1301 Ok(())
1302}
1303
1304#[cfg(not(test))]
1306fn op_touch(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1307 let opcode = req.hdr.opcode.unwrap();
1308 STAT_CMD_TOUCH.fetch_add(1, Ordering::Relaxed);
1309
1310 if req.extras.len() != 4 {
1311 write_simple_response(out, opcode, ST_ARGS, req.hdr.opaque, CAS_ZERO, &[])?;
1312 return Ok(());
1313 }
1314
1315 let expiry = BigEndian::read_u32(&req.extras[0..4]);
1316 let rk = make_redis_key(req.key);
1317 let expiry_str = expiry.to_string();
1318
1319 let reply = with_ctx(|ctx| {
1320 let keys_and_args: &[&[u8]] = &[
1321 b"2",
1322 rk.as_slice(),
1323 CAS_COUNTER_KEY.as_bytes(),
1324 expiry_str.as_bytes(),
1325 ];
1326 eval_lua(ctx, &SCRIPT_TOUCH, keys_and_args)
1327 })
1328 .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1329
1330 if is_redis_error(&reply) {
1331 return Err(BridgeErr::Redis(format!("EVALSHA touch error: {reply:?}")));
1332 }
1333
1334 let (status_code, new_cas) = match &reply {
1335 RedisValue::Array(arr) if arr.len() >= 2 => {
1336 let st = eval_int(&arr[0]);
1337 let cas_s = eval_str(&arr[1]);
1338 let cas_val: u64 = cas_s.parse().unwrap_or(0);
1339 (st, cas_val)
1340 }
1341 _ => {
1342 return Err(BridgeErr::Redis(format!(
1343 "EVAL touch unexpected: {reply:?}"
1344 )));
1345 }
1346 };
1347
1348 match status_code {
1349 0 => {
1350 if !opcode.is_quiet() {
1351 write_simple_response(out, opcode, ST_OK, req.hdr.opaque, new_cas, &[])?;
1352 }
1353 }
1354 -1 => {
1355 write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
1356 }
1357 _ => {
1358 return Err(BridgeErr::Redis(format!(
1359 "EVAL touch unknown status: {status_code}"
1360 )));
1361 }
1362 }
1363 Ok(())
1364}
1365
1366#[cfg(not(test))]
1368fn op_gat(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1369 let opcode = req.hdr.opcode.unwrap();
1370 STAT_CMD_GET.fetch_add(1, Ordering::Relaxed);
1371 STAT_CMD_TOUCH.fetch_add(1, Ordering::Relaxed);
1372
1373 if req.extras.len() != 4 {
1374 write_simple_response(out, opcode, ST_ARGS, req.hdr.opaque, CAS_ZERO, &[])?;
1375 return Ok(());
1376 }
1377
1378 let expiry = BigEndian::read_u32(&req.extras[0..4]);
1379 let rk = make_redis_key(req.key);
1380 let expiry_str = expiry.to_string();
1381
1382 let reply = with_ctx(|ctx| {
1383 let keys_and_args: &[&[u8]] = &[
1384 b"2",
1385 rk.as_slice(),
1386 CAS_COUNTER_KEY.as_bytes(),
1387 expiry_str.as_bytes(),
1388 ];
1389 eval_lua(ctx, &SCRIPT_GAT, keys_and_args)
1390 })
1391 .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1392
1393 if is_redis_error(&reply) {
1394 return Err(BridgeErr::Redis(format!("EVALSHA gat error: {reply:?}")));
1395 }
1396
1397 let fields = match &reply {
1398 RedisValue::Array(arr) if arr.len() >= 4 => arr,
1399 _ => {
1400 return Err(BridgeErr::Redis(format!("EVAL gat unexpected: {reply:?}")));
1401 }
1402 };
1403
1404 let status_code = eval_int(&fields[0]);
1405 if status_code == -1 {
1406 if opcode.is_quiet() {
1407 return Ok(());
1408 }
1409 write_simple_response(out, opcode, ST_NF, req.hdr.opaque, CAS_ZERO, &[])?;
1410 return Ok(());
1411 }
1412
1413 let hex_str = eval_str(&fields[1]);
1414 let value_bytes = hex_decode(&hex_str);
1415 let flags: u32 = eval_str(&fields[2]).parse().unwrap_or(0);
1416 let cas: u64 = eval_str(&fields[3]).parse().unwrap_or(0);
1417
1418 let extras = flags.to_be_bytes();
1419 let key_part = if opcode.includes_key() { req.key } else { &[] };
1420
1421 write_response(
1422 out,
1423 opcode,
1424 &ResponseMeta {
1425 status: ST_OK,
1426 opaque: req.hdr.opaque,
1427 cas,
1428 },
1429 &extras,
1430 key_part,
1431 &value_bytes,
1432 )?;
1433 Ok(())
1434}
1435
1436#[cfg(not(test))]
1438fn op_append_prepend(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1439 let opcode = req.hdr.opcode.unwrap();
1440 let base = opcode.base();
1441 STAT_CMD_SET.fetch_add(1, Ordering::Relaxed);
1444
1445 if !req.extras.is_empty() {
1447 write_simple_response(out, opcode, ST_ARGS, req.hdr.opaque, CAS_ZERO, &[])?;
1448 return Ok(());
1449 }
1450
1451 let rk = make_redis_key(req.key);
1452 let req_cas = req.hdr.cas.to_string();
1453
1454 let script = if base == Opcode::Append {
1455 &SCRIPT_APPEND
1456 } else {
1457 &SCRIPT_PREPEND
1458 };
1459
1460 let reply = with_ctx(|ctx| {
1461 let keys_and_args: &[&[u8]] = &[
1462 b"2",
1463 rk.as_slice(),
1464 CAS_COUNTER_KEY.as_bytes(),
1465 req.value,
1466 req_cas.as_bytes(),
1467 ];
1468 eval_lua(ctx, script, keys_and_args)
1469 })
1470 .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1471
1472 if is_redis_error(&reply) {
1473 return Err(BridgeErr::Redis(format!(
1474 "EVALSHA append/prepend error: {reply:?}"
1475 )));
1476 }
1477
1478 let (status_code, new_cas) = match &reply {
1479 RedisValue::Array(arr) if arr.len() >= 2 => {
1480 let st = eval_int(&arr[0]);
1481 let cas_s = eval_str(&arr[1]);
1482 let cas_val: u64 = cas_s.parse().unwrap_or(0);
1483 (st, cas_val)
1484 }
1485 _ => {
1486 return Err(BridgeErr::Redis(format!(
1487 "EVAL append/prepend unexpected: {reply:?}"
1488 )));
1489 }
1490 };
1491
1492 match status_code {
1493 0 => {
1494 if !opcode.is_quiet() {
1495 write_response(
1496 out,
1497 opcode,
1498 &ResponseMeta {
1499 status: ST_OK,
1500 opaque: req.hdr.opaque,
1501 cas: new_cas,
1502 },
1503 &[],
1504 &[],
1505 &[],
1506 )?;
1507 }
1508 }
1509 -5 => {
1510 write_simple_response(out, opcode, ST_NOT_STORED, req.hdr.opaque, CAS_ZERO, &[])?;
1512 }
1513 -2 => {
1514 write_simple_response(out, opcode, ST_IX, req.hdr.opaque, CAS_ZERO, &[])?;
1516 }
1517 _ => {
1518 return Err(BridgeErr::Redis(format!(
1519 "EVAL append/prepend unknown status: {status_code}"
1520 )));
1521 }
1522 }
1523 Ok(())
1524}
1525
1526#[cfg(not(test))]
1528fn op_flush(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1529 let opcode = req.hdr.opcode.unwrap();
1530 STAT_CMD_FLUSH.fetch_add(1, Ordering::Relaxed);
1531
1532 with_ctx(|ctx| {
1534 let keys_and_args: &[&[u8]] = &[b"0"];
1535 eval_lua(ctx, &SCRIPT_FLUSH, keys_and_args)
1536 })
1537 .map_err(|e| BridgeErr::Redis(e.to_string()))?;
1538
1539 if !opcode.is_quiet() {
1540 write_simple_response(out, opcode, ST_OK, req.hdr.opaque, CAS_ZERO, &[])?;
1541 }
1542 Ok(())
1543}
1544
1545#[cfg(not(test))]
1547fn op_version(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1548 let opcode = req.hdr.opcode.unwrap();
1549 let version = b"RedCouch 0.1.0";
1550 write_simple_response(out, opcode, ST_OK, req.hdr.opaque, CAS_ZERO, version)?;
1551 Ok(())
1552}
1553
1554#[cfg(not(test))]
1560fn op_sasl_list_mechs(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1561 let opcode = req.hdr.opcode.unwrap();
1562 STAT_AUTH_CMDS.fetch_add(1, Ordering::Relaxed);
1563 write_simple_response(out, opcode, ST_OK, req.hdr.opaque, CAS_ZERO, b"PLAIN")?;
1564 Ok(())
1565}
1566
1567#[cfg(not(test))]
1573fn op_sasl_auth(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1574 let opcode = req.hdr.opcode.unwrap();
1575 STAT_AUTH_CMDS.fetch_add(1, Ordering::Relaxed);
1576
1577 let mechanism = std::str::from_utf8(req.key).unwrap_or("");
1579 if mechanism != "PLAIN" {
1580 STAT_AUTH_ERRORS.fetch_add(1, Ordering::Relaxed);
1581 write_simple_response(
1582 out,
1583 opcode,
1584 protocol::ST_AUTH_ERROR,
1585 req.hdr.opaque,
1586 CAS_ZERO,
1587 b"Unsupported SASL mechanism",
1588 )?;
1589 return Ok(());
1590 }
1591
1592 write_simple_response(
1595 out,
1596 opcode,
1597 ST_OK,
1598 req.hdr.opaque,
1599 CAS_ZERO,
1600 b"Authenticated",
1601 )?;
1602 Ok(())
1603}
1604
1605#[cfg(not(test))]
1609fn op_sasl_step(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1610 let opcode = req.hdr.opcode.unwrap();
1611 STAT_AUTH_CMDS.fetch_add(1, Ordering::Relaxed);
1612 STAT_AUTH_ERRORS.fetch_add(1, Ordering::Relaxed);
1613 write_simple_response(
1614 out,
1615 opcode,
1616 protocol::ST_AUTH_ERROR,
1617 req.hdr.opaque,
1618 CAS_ZERO,
1619 b"SASL step not expected for PLAIN mechanism",
1620 )?;
1621 Ok(())
1622}
1623
1624#[cfg(not(test))]
1628pub(crate) const LUA_COUNT_ITEMS: &str = r#"
1629local cursor = '0'
1630local count = 0
1631repeat
1632 local result = redis.call('SCAN', cursor, 'MATCH', 'rc:*', 'COUNT', 1000)
1633 cursor = result[1]
1634 count = count + #result[2]
1635until cursor == '0'
1636return count
1637"#;
1638
1639#[cfg(not(test))]
1649fn op_stat(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1650 let opcode = req.hdr.opcode.unwrap();
1651 let stat_key = std::str::from_utf8(req.key).unwrap_or("");
1652
1653 if stat_key.is_empty() {
1654 let uptime = STARTUP_INSTANT
1656 .get()
1657 .map(|t| t.elapsed().as_secs())
1658 .unwrap_or(0);
1659 let pid = std::process::id();
1660
1661 let curr_items: u64 = match with_ctx(|ctx| {
1663 let keys_and_args: &[&[u8]] = &[b"0"];
1664 eval_lua(ctx, &SCRIPT_COUNT_ITEMS, keys_and_args)
1665 }) {
1666 Ok(RedisValue::Integer(n)) => n as u64,
1667 _ => 0,
1668 };
1669
1670 let stats: Vec<(&str, String)> = vec![
1671 ("pid", pid.to_string()),
1672 ("uptime", uptime.to_string()),
1673 (
1674 "time",
1675 std::time::SystemTime::now()
1676 .duration_since(std::time::UNIX_EPOCH)
1677 .map(|d| d.as_secs())
1678 .unwrap_or(0)
1679 .to_string(),
1680 ),
1681 ("version", "RedCouch 0.1.0".to_string()),
1682 ("curr_items", curr_items.to_string()),
1683 (
1684 "curr_connections",
1685 STAT_CURR_CONNECTIONS.load(Ordering::Relaxed).to_string(),
1686 ),
1687 (
1688 "total_connections",
1689 STAT_TOTAL_CONNECTIONS.load(Ordering::Relaxed).to_string(),
1690 ),
1691 ("cmd_get", STAT_CMD_GET.load(Ordering::Relaxed).to_string()),
1692 ("cmd_set", STAT_CMD_SET.load(Ordering::Relaxed).to_string()),
1693 (
1694 "cmd_flush",
1695 STAT_CMD_FLUSH.load(Ordering::Relaxed).to_string(),
1696 ),
1697 (
1698 "cmd_touch",
1699 STAT_CMD_TOUCH.load(Ordering::Relaxed).to_string(),
1700 ),
1701 (
1702 "get_hits",
1703 STAT_GET_HITS.load(Ordering::Relaxed).to_string(),
1704 ),
1705 (
1706 "get_misses",
1707 STAT_GET_MISSES.load(Ordering::Relaxed).to_string(),
1708 ),
1709 (
1710 "delete_hits",
1711 STAT_DELETE_HITS.load(Ordering::Relaxed).to_string(),
1712 ),
1713 (
1714 "delete_misses",
1715 STAT_DELETE_MISSES.load(Ordering::Relaxed).to_string(),
1716 ),
1717 (
1718 "incr_hits",
1719 STAT_INCR_HITS.load(Ordering::Relaxed).to_string(),
1720 ),
1721 (
1722 "incr_misses",
1723 STAT_INCR_MISSES.load(Ordering::Relaxed).to_string(),
1724 ),
1725 (
1726 "decr_hits",
1727 STAT_DECR_HITS.load(Ordering::Relaxed).to_string(),
1728 ),
1729 (
1730 "decr_misses",
1731 STAT_DECR_MISSES.load(Ordering::Relaxed).to_string(),
1732 ),
1733 (
1734 "cas_hits",
1735 STAT_CAS_HITS.load(Ordering::Relaxed).to_string(),
1736 ),
1737 (
1738 "cas_misses",
1739 STAT_CAS_MISSES.load(Ordering::Relaxed).to_string(),
1740 ),
1741 (
1742 "cas_badval",
1743 STAT_CAS_BADVAL.load(Ordering::Relaxed).to_string(),
1744 ),
1745 (
1746 "auth_cmds",
1747 STAT_AUTH_CMDS.load(Ordering::Relaxed).to_string(),
1748 ),
1749 (
1750 "auth_errors",
1751 STAT_AUTH_ERRORS.load(Ordering::Relaxed).to_string(),
1752 ),
1753 (
1754 "rejected_connections",
1755 STAT_REJECTED_CONNECTIONS
1756 .load(Ordering::Relaxed)
1757 .to_string(),
1758 ),
1759 ("max_connections", MAX_CONNECTIONS.to_string()),
1760 ];
1761
1762 let stat_meta = ResponseMeta {
1763 status: ST_OK,
1764 opaque: req.hdr.opaque,
1765 cas: CAS_ZERO,
1766 };
1767 for (name, value) in &stats {
1768 write_response(
1769 out,
1770 opcode,
1771 &stat_meta,
1772 &[],
1773 name.as_bytes(),
1774 value.as_bytes(),
1775 )?;
1776 }
1777 }
1778 write_response(
1784 out,
1785 opcode,
1786 &ResponseMeta {
1787 status: ST_OK,
1788 opaque: req.hdr.opaque,
1789 cas: CAS_ZERO,
1790 },
1791 &[],
1792 &[],
1793 &[],
1794 )?;
1795 Ok(())
1796}
1797
1798#[cfg(not(test))]
1805fn op_verbosity(req: Request<'_>, out: &mut Vec<u8>) -> Br<()> {
1806 let opcode = req.hdr.opcode.unwrap();
1807 write_simple_response(out, opcode, ST_OK, req.hdr.opaque, CAS_ZERO, &[])?;
1808 Ok(())
1809}
1810
1811#[cfg(not(test))]
1816fn module_init(ctx: &Context, _args: &[RedisString]) -> Status {
1817 let _ = STARTUP_INSTANT.set(Instant::now());
1818 LISTENER.call_once(spawn_listener);
1819 ctx.log_notice(&format!(
1820 "redcouch: listener started on {} (max_connections={}, read_timeout={}s, write_timeout={}s, max_body={})",
1821 DEFAULT_BIND_ADDR,
1822 MAX_CONNECTIONS,
1823 SOCKET_READ_TIMEOUT.as_secs(),
1824 SOCKET_WRITE_TIMEOUT.as_secs(),
1825 MAX_BODY_LEN,
1826 ));
1827 Status::Ok
1828}
1829
1830#[cfg(not(test))]
1831redis_module! {
1832 name: "redcouch",
1833 version: 1,
1834 allocator: (
1835 redis_module::alloc::RedisAlloc,
1836 redis_module::alloc::RedisAlloc
1837 ),
1838 data_types: [],
1839 init: module_init,
1840 commands: [],
1841}