scufflecloud_redis_module/
quota.rs

1use std::collections::BTreeSet;
2use std::num::NonZeroU64;
3use std::time::Duration;
4
5use deepsize::DeepSizeOf;
6use fnv::FnvHashMap;
7use redis_module::key::RedisKeyWritable;
8use redis_module::{Context, NextArg, RedisError, RedisResult, RedisString, RedisValue};
9use redis_module_ext::data_type::{AofRewriteIo, RdbLoadIo, RdbSaveIo};
10use redis_module_ext::prelude::{IoLoggingExt, RedisKeyWritableExt};
11use redis_module_ext::{CommandArgs, redis_command, redis_data_type, redis_module};
12
13use crate::utils::{self, SetExt, Str, now};
14
15#[derive(Clone, bincode::Encode, bincode::Decode, deepsize::DeepSizeOf)]
16struct Lease {
17    id: Str,
18    qty: u64,
19    expires_at: Option<NonZeroU64>,
20    dims: Vec<Str>,
21}
22
23impl Lease {
24    fn restore_args(&self) -> impl Iterator<Item = String> {
25        [
26            self.id.to_string(),
27            self.qty.to_string(),
28            if let Some(expiry) = self.expires_at {
29                expiry.to_string()
30            } else {
31                "-1".to_string()
32            },
33        ]
34        .into_iter()
35        .chain(self.dims.iter().map(|dim| dim.to_string()))
36    }
37}
38
39const QUOTA_TYPE_VERSION: i32 = 0;
40
41#[redis_data_type(
42    name = "scufquota",
43    version = QUOTA_TYPE_VERSION,
44    methods(
45        rdb_load,
46        rdb_save,
47        aof_rewrite,
48        mem_usage,
49    )
50)]
51#[derive(Default, Clone, deepsize::DeepSizeOf)]
52struct Quota {
53    leases: FnvHashMap<Str, Lease>,
54    dim_count: FnvHashMap<Str, u64>,
55    expiry_timeouts: BTreeSet<(NonZeroU64, Str)>,
56}
57
58impl bincode::Encode for Quota {
59    fn encode<E: bincode::enc::Encoder>(&self, encoder: &mut E) -> Result<(), bincode::error::EncodeError> {
60        let length = self.leases.len();
61
62        (length as u64).encode(encoder)?;
63
64        for lease in self.leases.values() {
65            lease.encode(encoder)?;
66        }
67
68        Ok(())
69    }
70}
71
72impl<C> bincode::Decode<C> for Quota {
73    fn decode<D: bincode::de::Decoder<Context = C>>(decoder: &mut D) -> Result<Self, bincode::error::DecodeError> {
74        let length = u64::decode(decoder)?;
75
76        let mut quota = Self::default();
77        let now = utils::now();
78
79        for _ in 0..length {
80            let lease = Lease::decode(decoder)?;
81            if lease.expires_at.is_some_and(|expiry_ts| expiry_ts < now) {
82                continue;
83            }
84
85            quota.insert_lease(lease);
86        }
87
88        Ok(quota)
89    }
90}
91
92const BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard();
93
94impl Quota {
95    fn rdb_load(rdb: &mut RdbLoadIo, version: i32) -> RedisResult<Self> {
96        if version != QUOTA_TYPE_VERSION {
97            return Err(RedisError::String(format!(
98                "unsupported quota type version expected: {QUOTA_TYPE_VERSION}, actual: {version}"
99            )));
100        }
101
102        let buf = rdb.load_string_buffer()?;
103        let quota: Quota = match bincode::decode_from_slice(buf.as_ref(), BINCODE_CONFIG) {
104            Ok((v, _)) => v,
105            Err(err) => return Err(RedisError::String(format!("failed to decode quota: {err}"))),
106        };
107
108        Ok(quota)
109    }
110
111    fn rdb_save(&mut self, rdb: &mut RdbSaveIo) {
112        self.gc(now());
113
114        match bincode::encode_to_vec(&*self, BINCODE_CONFIG) {
115            Ok(buf) => {
116                rdb.save_slice(buf);
117            }
118            Err(err) => {
119                rdb.log_warning(format!("failed to encode: {err}"));
120            }
121        }
122    }
123
124    fn aof_rewrite(&mut self, aof: &mut AofRewriteIo, key: RedisString) -> RedisResult<()> {
125        self.gc(utils::now());
126        if self.leases.is_empty() {
127            return Ok(());
128        }
129
130        for lease in self.leases.values() {
131            aof.emit_command(c"quota.restore")
132                .arg(&*key)
133                .args(lease.restore_args())
134                .dispatch()?;
135        }
136
137        Ok(())
138    }
139
140    fn mem_usage(&self) -> usize {
141        self.deep_size_of()
142    }
143}
144
145enum QuotaMutRef<'a> {
146    Owned(Quota),
147    Ref(&'a mut Quota),
148}
149
150impl std::ops::Deref for QuotaMutRef<'_> {
151    type Target = Quota;
152
153    fn deref(&self) -> &Self::Target {
154        match self {
155            Self::Owned(o) => o,
156            Self::Ref(r) => r,
157        }
158    }
159}
160
161impl std::ops::DerefMut for QuotaMutRef<'_> {
162    fn deref_mut(&mut self) -> &mut Self::Target {
163        match self {
164            Self::Owned(o) => o,
165            Self::Ref(r) => r,
166        }
167    }
168}
169
170fn quota_scope(
171    key: &RedisKeyWritable,
172    mut quota: QuotaMutRef<'_>,
173    op: impl FnOnce(&mut Quota, NonZeroU64) -> RedisResult,
174) -> RedisResult {
175    let now = utils::now();
176    quota.gc(now);
177    let ret = op(&mut quota, now);
178
179    if quota.leases.is_empty() || quota.expires_at().is_some_and(|expiry| expiry < now) {
180        if !key.is_empty() {
181            key.delete()?;
182        }
183    } else {
184        let expires = quota.expires_at();
185        if let QuotaMutRef::Owned(o) = quota {
186            key.set(o)?;
187        }
188
189        if let Some(seconds) = expires.map(|ex| ex.get() - now.get()) {
190            key.set_expire(Duration::from_secs(seconds))?;
191        } else {
192            key.remove_expire()?;
193        }
194    }
195
196    ret
197}
198
199#[redis_command(
200    name = "quota.lease",
201    summary = "aquire a new lease",
202    complexity = "O(log(N) + D): Where N is the number of pending leases and D is the number of dimentions.",
203    flags(Write, DenyOOM, Fast),
204    arity = -7,
205    key_spec(
206        flags(ReadWrite, Access, Update),
207        begin_search(index = 1),
208        find_keys(range(last_key = 1, steps = 1, limit = 1))
209    ),
210    arg(
211        name = "KEY",
212        kind = Key,
213        key_spec_index = 0,
214    ),
215    arg(
216        name = "ID",
217        kind = String,
218    ),
219    arg(
220        name = "QTY",
221        kind = Integer,
222    ),
223    arg(
224        name = "TTL",
225        kind = Integer,
226    ),
227    arg(
228        name = "dim_limit",
229        kind = Block,
230        flags = 2,
231        arg(
232            name = "DIM",
233            kind = String,
234        ),
235        arg(
236            name = "LIMIT",
237            kind = Integer,
238        ),
239    )
240)]
241fn quota_lease(ctx: &Context, args: CommandArgs) -> RedisResult {
242    let mut args = args.into_iter().skip(1).peekable();
243    let resource = args.next_arg()?;
244    let key = ctx.open_key_writable(&resource);
245
246    let quota = if let Some(value) = key.get::<Quota>()? {
247        QuotaMutRef::Ref(value)
248    } else {
249        QuotaMutRef::Owned(Quota::default())
250    };
251
252    quota_scope(&key, quota, |quota, now| {
253        let lease_id = Str::from(args.next_str()?);
254        if quota.leases.contains_key(&lease_id) {
255            return Err(RedisError::Str("LEASE_ALREADY_EXISTS"));
256        }
257
258        let qty = args.next_u64()?;
259        let expires_at = args.next_i64()?;
260        let expires_at = if expires_at < 0 {
261            None
262        } else if let Some(expiry_ts) = NonZeroU64::new(expires_at as u64) {
263            let Some(expires_at) = expiry_ts.checked_add(now.get()) else {
264                return Err(RedisError::Str("TTL_EXCEEDS_MAX_TIMESTAMP"));
265            };
266            Some(expires_at)
267        } else {
268            return Ok(RedisValue::Integer(0));
269        };
270
271        let mut dims = Vec::new();
272        while args.peek().is_some() {
273            let dim = args.next_str()?;
274            let limit = args.next_u64()?;
275            if quota.dim_count.get(dim).is_some_and(|count| *count + qty > limit) {
276                return Err(RedisError::String(format!("QUOTA_EXCEEDED:{dim}")));
277            }
278
279            dims.push(Str::from(dim));
280        }
281
282        let lease = Lease {
283            id: lease_id,
284            dims,
285            expires_at,
286            qty,
287        };
288
289        let args = lease.restore_args().map(|arg| ctx.create_string(arg)).collect::<Vec<_>>();
290        let args = std::iter::once(&resource).chain(args.iter()).collect::<Vec<_>>();
291
292        ctx.replicate("quota.restore", args.as_slice());
293        quota.insert_lease(lease);
294
295        Ok(RedisValue::Integer(1))
296    })
297}
298
299#[redis_command(
300    name = "quota.restore",
301    flags(Internal),
302    arity = -7,
303    key_spec(
304        flags(ReadWrite, Access, Update),
305        begin_search(index = 1),
306        find_keys(range(last_key = 1, steps = 1, limit = 1)),
307    )
308)]
309fn quota_restore(ctx: &Context, args: CommandArgs) -> RedisResult {
310    let mut args = args.into_iter().skip(1).peekable();
311    let resource = args.next_arg()?;
312    let key = ctx.open_key_writable(&resource);
313
314    let quota = if let Some(value) = key.get::<Quota>()? {
315        QuotaMutRef::Ref(value)
316    } else {
317        QuotaMutRef::Owned(Quota::default())
318    };
319
320    quota_scope(&key, quota, |quota, now| {
321        let lease_id = Str::from(args.next_str()?);
322        quota.remove_lease(&lease_id);
323
324        let qty = args.next_u64()?;
325        let expires_at = if let Some(expires_at) = NonZeroU64::new(args.next_u64()?) {
326            if expires_at < now {
327                return Ok(RedisValue::Integer(0));
328            }
329
330            Some(expires_at)
331        } else {
332            None
333        };
334
335        let mut dims = Vec::new();
336        while args.peek().is_some() {
337            dims.push(Str::from(args.next_str()?));
338        }
339
340        quota.insert_lease(Lease {
341            dims,
342            expires_at,
343            id: lease_id,
344            qty,
345        });
346
347        Ok(RedisValue::Integer(1))
348    })
349}
350
351#[redis_command(
352    name = "quota.renew",
353    summary = "renew a previously allocated lease",
354    complexity = "O(log(N)): Where N is the number of pending leases.",
355    flags(Write, DenyOOM),
356    arity = 4,
357    key_spec(
358        flags(ReadWrite, Access, Update),
359        begin_search(index = 1),
360        find_keys(range(last_key = 1, steps = 1, limit = 1)),
361    ),
362    arg(
363        name = "KEY",
364        kind = Key,
365        key_spec_index = 0,
366    ),
367    arg(
368        name = "ID",
369        kind = String,
370    ),
371    arg(
372        name = "TTL",
373        kind = Integer,
374    ),
375)]
376fn quota_renew(ctx: &Context, args: CommandArgs) -> RedisResult {
377    let mut args = args.into_iter().skip(1).peekable();
378    let resource = args.next_arg()?;
379    let key = ctx.open_key_writable(&resource);
380
381    let Some(quota) = key.get::<Quota>()? else {
382        return Ok(RedisValue::Integer(0));
383    };
384
385    quota_scope(&key, QuotaMutRef::Ref(quota), |quota, now| {
386        let lease_id = args.next_str()?;
387        match NonZeroU64::new(args.next_u64()?).map(|ts| ts.checked_add(now.get())) {
388            Some(Some(expire_ts)) => {
389                if quota.renew_lease(lease_id, now, Some(expire_ts)) {
390                    ctx.replicate(
391                        "quota.renew_at",
392                        &[resource.as_slice(), lease_id.as_bytes(), expire_ts.to_string().as_bytes()],
393                    );
394
395                    Ok(RedisValue::Integer(1))
396                } else {
397                    Ok(RedisValue::Integer(0))
398                }
399            }
400            Some(None) => Err(RedisError::Str("TTL_EXCEEDS_MAX_TIMESTAMP")),
401            None => {
402                if quota.remove_lease(lease_id) {
403                    ctx.replicate("quota.free", &[resource.as_slice(), lease_id.as_bytes()]);
404
405                    Ok(RedisValue::Integer(1))
406                } else {
407                    Ok(RedisValue::Integer(0))
408                }
409            }
410        }
411    })
412}
413
414#[redis_command(
415    name = "quota.renew_at",
416    summary = "renew a previously allocated lease",
417    complexity = "O(log(N)): Where N is the number of pending leases.",
418    flags(Write, DenyOOM),
419    arity = 4,
420    key_spec(
421        flags(ReadWrite, Access, Update),
422        begin_search(index = 1),
423        find_keys(range(last_key = 1, steps = 1, limit = 1)),
424    ),
425    arg(
426        name = "KEY",
427        kind = Key,
428        key_spec_index = 0,
429    ),
430    arg(
431        name = "ID",
432        kind = String,
433    ),
434    arg(
435        name = "EXPIRY",
436        kind = Integer,
437    ),
438)]
439fn quota_renew_at(ctx: &Context, args: CommandArgs) -> RedisResult {
440    let mut args = args.into_iter().skip(1).peekable();
441    let resource = args.next_arg()?;
442    let key = ctx.open_key_writable(&resource);
443
444    let Some(quota) = key.get::<Quota>()? else {
445        return Ok(RedisValue::Integer(0));
446    };
447
448    quota_scope(&key, QuotaMutRef::Ref(quota), |quota, now| {
449        let lease_id = args.next_str()?;
450
451        match NonZeroU64::new(args.next_u64()?) {
452            Some(expires_at) if expires_at > now => {
453                if quota.renew_lease(lease_id, now, Some(expires_at)) {
454                    ctx.replicate_verbatim();
455                    Ok(RedisValue::Integer(1))
456                } else {
457                    Ok(RedisValue::Integer(0))
458                }
459            }
460            _ => {
461                if quota.remove_lease(lease_id) {
462                    ctx.replicate("quota.free", &[resource.as_slice(), lease_id.as_bytes()]);
463
464                    Ok(RedisValue::Integer(1))
465                } else {
466                    Ok(RedisValue::Integer(0))
467                }
468            }
469        }
470    })
471}
472
473#[redis_command(
474    name = "quota.commit",
475    summary = "commits a previously allocated lease",
476    complexity = "O(log(N)): Where N is the number of pending leases.",
477    flags(Write, DenyOOM),
478    arity = 3,
479    key_spec(
480        flags(ReadWrite, Access, Update),
481        begin_search(index = 1),
482        find_keys(range(last_key = 1, steps = 1, limit = 1)),
483    ),
484    arg(
485        name = "KEY",
486        kind = Key,
487        key_spec_index = 0,
488    ),
489    arg(
490        name = "ID",
491        kind = String,
492    ),
493)]
494fn quota_commit(ctx: &Context, args: CommandArgs) -> RedisResult {
495    let mut args = args.into_iter().skip(1).peekable();
496    let resource = args.next_arg()?;
497    let key = ctx.open_key_writable(&resource);
498
499    let Some(quota) = key.get::<Quota>()? else {
500        return Ok(RedisValue::Integer(0));
501    };
502
503    quota_scope(&key, QuotaMutRef::Ref(quota), |quota, now| {
504        let lease_id = args.next_str()?;
505
506        if quota.renew_lease(lease_id, now, None) {
507            ctx.replicate_verbatim();
508            Ok(RedisValue::Integer(1))
509        } else {
510            Ok(RedisValue::Integer(0))
511        }
512    })
513}
514
515#[redis_command(
516    name = "quota.free",
517    summary = "free a previouly allocated lease",
518    complexity = "O(log(N) + D): Where N is the number of pending leases and D is the number of dimentions.",
519    flags(Write, DenyOOM),
520    arity = 3,
521    key_spec(
522        flags(ReadWrite, Access, Update),
523        begin_search(index = 1),
524        find_keys(range(last_key = 1, steps = 1, limit = 1)),
525    ),
526    arg(
527        name = "KEY",
528        kind = Key,
529        key_spec_index = 0,
530    ),
531    arg(
532        name = "ID",
533        kind = String,
534    ),
535)]
536fn quota_free(ctx: &Context, args: CommandArgs) -> RedisResult {
537    let mut args = args.into_iter().skip(1).peekable();
538    let resource = args.next_arg()?;
539    let key = ctx.open_key_writable(&resource);
540
541    let Some(quota) = key.get::<Quota>()? else {
542        return Ok(RedisValue::Integer(0));
543    };
544
545    quota_scope(&key, QuotaMutRef::Ref(quota), |quota, _| {
546        let lease_id = args.next_str()?;
547        ctx.replicate_verbatim();
548
549        if quota.remove_lease(lease_id) {
550            Ok(RedisValue::Integer(1))
551        } else {
552            Ok(RedisValue::Integer(0))
553        }
554    })
555}
556
557impl Quota {
558    fn gc(&mut self, now_ts: NonZeroU64) {
559        while let Some((_, lease_id)) = self.expiry_timeouts.pop_first_if(|(timeout, _)| *timeout < now_ts) {
560            let Some(lease) = self.leases.remove(&lease_id) else {
561                continue;
562            };
563
564            for dim in lease.dims {
565                if let Some(count) = self.dim_count.get_mut(&dim) {
566                    *count = count.saturating_sub(lease.qty);
567                    if *count == 0 {
568                        self.dim_count.remove(&dim);
569                    }
570                }
571            }
572        }
573    }
574
575    fn expires_at(&self) -> Option<NonZeroU64> {
576        self.expiry_timeouts.last().map(|(ts, _)| *ts)
577    }
578
579    fn remove_lease(&mut self, id: &str) -> bool {
580        let Some(lease) = self.leases.remove(id) else {
581            return false;
582        };
583
584        for dim in lease.dims {
585            if let Some(count) = self.dim_count.get_mut(&dim) {
586                *count = count.saturating_sub(lease.qty);
587                if *count == 0 {
588                    self.dim_count.remove(&dim);
589                }
590            }
591        }
592
593        if let Some(expiry_ts) = lease.expires_at {
594            self.expiry_timeouts.remove(&(expiry_ts, lease.id));
595        }
596
597        true
598    }
599
600    fn insert_lease(&mut self, lease: Lease) {
601        for dim in &lease.dims {
602            let count = self.dim_count.entry(dim.clone()).or_default();
603            *count += lease.qty;
604        }
605
606        if let Some(expires_ts) = lease.expires_at {
607            self.expiry_timeouts.insert((expires_ts, lease.id.clone()));
608        }
609
610        self.leases.insert(lease.id.clone(), lease);
611    }
612
613    fn renew_lease(&mut self, id: &str, now: NonZeroU64, expires_at: Option<NonZeroU64>) -> bool {
614        let Some(lease) = self.leases.get_mut(id) else {
615            return false;
616        };
617
618        if expires_at.is_some_and(|expires_at| expires_at < now) {
619            return self.remove_lease(id);
620        }
621
622        if lease.expires_at == expires_at {
623            return false;
624        }
625
626        if let Some(expires_at) = lease.expires_at {
627            self.expiry_timeouts.remove(&(expires_at, lease.id.clone()));
628        }
629
630        if let Some(expires_at) = expires_at {
631            self.expiry_timeouts.insert((expires_at, lease.id.clone()));
632        }
633
634        lease.expires_at = expires_at;
635
636        true
637    }
638}
639
640#[redis_module(
641    name = "quota",
642    version = 1,
643    types(Quota),
644    commands(quota_free, quota_commit, quota_renew_at, quota_renew, quota_restore, quota_lease,)
645)]
646pub struct QuotaModule;