1use std::collections::BTreeSet;
2use std::num::NonZeroU64;
3use std::time::Duration;
4
5use deepsize::DeepSizeOf;
6use fnv::FnvHashMap;
7use redis_module::key::RedisKeyWritable;
8use redis_module::{Context, NextArg, RedisError, RedisResult, RedisString, RedisValue};
9use redis_module_ext::data_type::{AofRewriteIo, RdbLoadIo, RdbSaveIo};
10use redis_module_ext::prelude::{IoLoggingExt, RedisKeyWritableExt};
11use redis_module_ext::{CommandArgs, redis_command, redis_data_type, redis_module};
12
13use crate::utils::{self, SetExt, Str, now};
14
15#[derive(Clone, bincode::Encode, bincode::Decode, deepsize::DeepSizeOf)]
16struct Lease {
17 id: Str,
18 qty: u64,
19 expires_at: Option<NonZeroU64>,
20 dims: Vec<Str>,
21}
22
23impl Lease {
24 fn restore_args(&self) -> impl Iterator<Item = String> {
25 [
26 self.id.to_string(),
27 self.qty.to_string(),
28 if let Some(expiry) = self.expires_at {
29 expiry.to_string()
30 } else {
31 "-1".to_string()
32 },
33 ]
34 .into_iter()
35 .chain(self.dims.iter().map(|dim| dim.to_string()))
36 }
37}
38
39const QUOTA_TYPE_VERSION: i32 = 0;
40
41#[redis_data_type(
42 name = "scufquota",
43 version = QUOTA_TYPE_VERSION,
44 methods(
45 rdb_load,
46 rdb_save,
47 aof_rewrite,
48 mem_usage,
49 )
50)]
51#[derive(Default, Clone, deepsize::DeepSizeOf)]
52struct Quota {
53 leases: FnvHashMap<Str, Lease>,
54 dim_count: FnvHashMap<Str, u64>,
55 expiry_timeouts: BTreeSet<(NonZeroU64, Str)>,
56}
57
58impl bincode::Encode for Quota {
59 fn encode<E: bincode::enc::Encoder>(&self, encoder: &mut E) -> Result<(), bincode::error::EncodeError> {
60 let length = self.leases.len();
61
62 (length as u64).encode(encoder)?;
63
64 for lease in self.leases.values() {
65 lease.encode(encoder)?;
66 }
67
68 Ok(())
69 }
70}
71
72impl<C> bincode::Decode<C> for Quota {
73 fn decode<D: bincode::de::Decoder<Context = C>>(decoder: &mut D) -> Result<Self, bincode::error::DecodeError> {
74 let length = u64::decode(decoder)?;
75
76 let mut quota = Self::default();
77 let now = utils::now();
78
79 for _ in 0..length {
80 let lease = Lease::decode(decoder)?;
81 if lease.expires_at.is_some_and(|expiry_ts| expiry_ts < now) {
82 continue;
83 }
84
85 quota.insert_lease(lease);
86 }
87
88 Ok(quota)
89 }
90}
91
92const BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard();
93
94impl Quota {
95 fn rdb_load(rdb: &mut RdbLoadIo, version: i32) -> RedisResult<Self> {
96 if version != QUOTA_TYPE_VERSION {
97 return Err(RedisError::String(format!(
98 "unsupported quota type version expected: {QUOTA_TYPE_VERSION}, actual: {version}"
99 )));
100 }
101
102 let buf = rdb.load_string_buffer()?;
103 let quota: Quota = match bincode::decode_from_slice(buf.as_ref(), BINCODE_CONFIG) {
104 Ok((v, _)) => v,
105 Err(err) => return Err(RedisError::String(format!("failed to decode quota: {err}"))),
106 };
107
108 Ok(quota)
109 }
110
111 fn rdb_save(&mut self, rdb: &mut RdbSaveIo) {
112 self.gc(now());
113
114 match bincode::encode_to_vec(&*self, BINCODE_CONFIG) {
115 Ok(buf) => {
116 rdb.save_slice(buf);
117 }
118 Err(err) => {
119 rdb.log_warning(format!("failed to encode: {err}"));
120 }
121 }
122 }
123
124 fn aof_rewrite(&mut self, aof: &mut AofRewriteIo, key: RedisString) -> RedisResult<()> {
125 self.gc(utils::now());
126 if self.leases.is_empty() {
127 return Ok(());
128 }
129
130 for lease in self.leases.values() {
131 aof.emit_command(c"quota.restore")
132 .arg(&*key)
133 .args(lease.restore_args())
134 .dispatch()?;
135 }
136
137 Ok(())
138 }
139
140 fn mem_usage(&self) -> usize {
141 self.deep_size_of()
142 }
143}
144
145enum QuotaMutRef<'a> {
146 Owned(Quota),
147 Ref(&'a mut Quota),
148}
149
150impl std::ops::Deref for QuotaMutRef<'_> {
151 type Target = Quota;
152
153 fn deref(&self) -> &Self::Target {
154 match self {
155 Self::Owned(o) => o,
156 Self::Ref(r) => r,
157 }
158 }
159}
160
161impl std::ops::DerefMut for QuotaMutRef<'_> {
162 fn deref_mut(&mut self) -> &mut Self::Target {
163 match self {
164 Self::Owned(o) => o,
165 Self::Ref(r) => r,
166 }
167 }
168}
169
170fn quota_scope(
171 key: &RedisKeyWritable,
172 mut quota: QuotaMutRef<'_>,
173 op: impl FnOnce(&mut Quota, NonZeroU64) -> RedisResult,
174) -> RedisResult {
175 let now = utils::now();
176 quota.gc(now);
177 let ret = op(&mut quota, now);
178
179 if quota.leases.is_empty() || quota.expires_at().is_some_and(|expiry| expiry < now) {
180 if !key.is_empty() {
181 key.delete()?;
182 }
183 } else {
184 let expires = quota.expires_at();
185 if let QuotaMutRef::Owned(o) = quota {
186 key.set(o)?;
187 }
188
189 if let Some(seconds) = expires.map(|ex| ex.get() - now.get()) {
190 key.set_expire(Duration::from_secs(seconds))?;
191 } else {
192 key.remove_expire()?;
193 }
194 }
195
196 ret
197}
198
199#[redis_command(
200 name = "quota.lease",
201 summary = "aquire a new lease",
202 complexity = "O(log(N) + D): Where N is the number of pending leases and D is the number of dimentions.",
203 flags(Write, DenyOOM, Fast),
204 arity = -7,
205 key_spec(
206 flags(ReadWrite, Access, Update),
207 begin_search(index = 1),
208 find_keys(range(last_key = 1, steps = 1, limit = 1))
209 ),
210 arg(
211 name = "KEY",
212 kind = Key,
213 key_spec_index = 0,
214 ),
215 arg(
216 name = "ID",
217 kind = String,
218 ),
219 arg(
220 name = "QTY",
221 kind = Integer,
222 ),
223 arg(
224 name = "TTL",
225 kind = Integer,
226 ),
227 arg(
228 name = "dim_limit",
229 kind = Block,
230 flags = 2,
231 arg(
232 name = "DIM",
233 kind = String,
234 ),
235 arg(
236 name = "LIMIT",
237 kind = Integer,
238 ),
239 )
240)]
241fn quota_lease(ctx: &Context, args: CommandArgs) -> RedisResult {
242 let mut args = args.into_iter().skip(1).peekable();
243 let resource = args.next_arg()?;
244 let key = ctx.open_key_writable(&resource);
245
246 let quota = if let Some(value) = key.get::<Quota>()? {
247 QuotaMutRef::Ref(value)
248 } else {
249 QuotaMutRef::Owned(Quota::default())
250 };
251
252 quota_scope(&key, quota, |quota, now| {
253 let lease_id = Str::from(args.next_str()?);
254 if quota.leases.contains_key(&lease_id) {
255 return Err(RedisError::Str("LEASE_ALREADY_EXISTS"));
256 }
257
258 let qty = args.next_u64()?;
259 let expires_at = args.next_i64()?;
260 let expires_at = if expires_at < 0 {
261 None
262 } else if let Some(expiry_ts) = NonZeroU64::new(expires_at as u64) {
263 let Some(expires_at) = expiry_ts.checked_add(now.get()) else {
264 return Err(RedisError::Str("TTL_EXCEEDS_MAX_TIMESTAMP"));
265 };
266 Some(expires_at)
267 } else {
268 return Ok(RedisValue::Integer(0));
269 };
270
271 let mut dims = Vec::new();
272 while args.peek().is_some() {
273 let dim = args.next_str()?;
274 let limit = args.next_u64()?;
275 if quota.dim_count.get(dim).is_some_and(|count| *count + qty > limit) {
276 return Err(RedisError::String(format!("QUOTA_EXCEEDED:{dim}")));
277 }
278
279 dims.push(Str::from(dim));
280 }
281
282 let lease = Lease {
283 id: lease_id,
284 dims,
285 expires_at,
286 qty,
287 };
288
289 let args = lease.restore_args().map(|arg| ctx.create_string(arg)).collect::<Vec<_>>();
290 let args = std::iter::once(&resource).chain(args.iter()).collect::<Vec<_>>();
291
292 ctx.replicate("quota.restore", args.as_slice());
293 quota.insert_lease(lease);
294
295 Ok(RedisValue::Integer(1))
296 })
297}
298
299#[redis_command(
300 name = "quota.restore",
301 flags(Internal),
302 arity = -7,
303 key_spec(
304 flags(ReadWrite, Access, Update),
305 begin_search(index = 1),
306 find_keys(range(last_key = 1, steps = 1, limit = 1)),
307 )
308)]
309fn quota_restore(ctx: &Context, args: CommandArgs) -> RedisResult {
310 let mut args = args.into_iter().skip(1).peekable();
311 let resource = args.next_arg()?;
312 let key = ctx.open_key_writable(&resource);
313
314 let quota = if let Some(value) = key.get::<Quota>()? {
315 QuotaMutRef::Ref(value)
316 } else {
317 QuotaMutRef::Owned(Quota::default())
318 };
319
320 quota_scope(&key, quota, |quota, now| {
321 let lease_id = Str::from(args.next_str()?);
322 quota.remove_lease(&lease_id);
323
324 let qty = args.next_u64()?;
325 let expires_at = if let Some(expires_at) = NonZeroU64::new(args.next_u64()?) {
326 if expires_at < now {
327 return Ok(RedisValue::Integer(0));
328 }
329
330 Some(expires_at)
331 } else {
332 None
333 };
334
335 let mut dims = Vec::new();
336 while args.peek().is_some() {
337 dims.push(Str::from(args.next_str()?));
338 }
339
340 quota.insert_lease(Lease {
341 dims,
342 expires_at,
343 id: lease_id,
344 qty,
345 });
346
347 Ok(RedisValue::Integer(1))
348 })
349}
350
351#[redis_command(
352 name = "quota.renew",
353 summary = "renew a previously allocated lease",
354 complexity = "O(log(N)): Where N is the number of pending leases.",
355 flags(Write, DenyOOM),
356 arity = 4,
357 key_spec(
358 flags(ReadWrite, Access, Update),
359 begin_search(index = 1),
360 find_keys(range(last_key = 1, steps = 1, limit = 1)),
361 ),
362 arg(
363 name = "KEY",
364 kind = Key,
365 key_spec_index = 0,
366 ),
367 arg(
368 name = "ID",
369 kind = String,
370 ),
371 arg(
372 name = "TTL",
373 kind = Integer,
374 ),
375)]
376fn quota_renew(ctx: &Context, args: CommandArgs) -> RedisResult {
377 let mut args = args.into_iter().skip(1).peekable();
378 let resource = args.next_arg()?;
379 let key = ctx.open_key_writable(&resource);
380
381 let Some(quota) = key.get::<Quota>()? else {
382 return Ok(RedisValue::Integer(0));
383 };
384
385 quota_scope(&key, QuotaMutRef::Ref(quota), |quota, now| {
386 let lease_id = args.next_str()?;
387 match NonZeroU64::new(args.next_u64()?).map(|ts| ts.checked_add(now.get())) {
388 Some(Some(expire_ts)) => {
389 if quota.renew_lease(lease_id, now, Some(expire_ts)) {
390 ctx.replicate(
391 "quota.renew_at",
392 &[resource.as_slice(), lease_id.as_bytes(), expire_ts.to_string().as_bytes()],
393 );
394
395 Ok(RedisValue::Integer(1))
396 } else {
397 Ok(RedisValue::Integer(0))
398 }
399 }
400 Some(None) => Err(RedisError::Str("TTL_EXCEEDS_MAX_TIMESTAMP")),
401 None => {
402 if quota.remove_lease(lease_id) {
403 ctx.replicate("quota.free", &[resource.as_slice(), lease_id.as_bytes()]);
404
405 Ok(RedisValue::Integer(1))
406 } else {
407 Ok(RedisValue::Integer(0))
408 }
409 }
410 }
411 })
412}
413
414#[redis_command(
415 name = "quota.renew_at",
416 summary = "renew a previously allocated lease",
417 complexity = "O(log(N)): Where N is the number of pending leases.",
418 flags(Write, DenyOOM),
419 arity = 4,
420 key_spec(
421 flags(ReadWrite, Access, Update),
422 begin_search(index = 1),
423 find_keys(range(last_key = 1, steps = 1, limit = 1)),
424 ),
425 arg(
426 name = "KEY",
427 kind = Key,
428 key_spec_index = 0,
429 ),
430 arg(
431 name = "ID",
432 kind = String,
433 ),
434 arg(
435 name = "EXPIRY",
436 kind = Integer,
437 ),
438)]
439fn quota_renew_at(ctx: &Context, args: CommandArgs) -> RedisResult {
440 let mut args = args.into_iter().skip(1).peekable();
441 let resource = args.next_arg()?;
442 let key = ctx.open_key_writable(&resource);
443
444 let Some(quota) = key.get::<Quota>()? else {
445 return Ok(RedisValue::Integer(0));
446 };
447
448 quota_scope(&key, QuotaMutRef::Ref(quota), |quota, now| {
449 let lease_id = args.next_str()?;
450
451 match NonZeroU64::new(args.next_u64()?) {
452 Some(expires_at) if expires_at > now => {
453 if quota.renew_lease(lease_id, now, Some(expires_at)) {
454 ctx.replicate_verbatim();
455 Ok(RedisValue::Integer(1))
456 } else {
457 Ok(RedisValue::Integer(0))
458 }
459 }
460 _ => {
461 if quota.remove_lease(lease_id) {
462 ctx.replicate("quota.free", &[resource.as_slice(), lease_id.as_bytes()]);
463
464 Ok(RedisValue::Integer(1))
465 } else {
466 Ok(RedisValue::Integer(0))
467 }
468 }
469 }
470 })
471}
472
473#[redis_command(
474 name = "quota.commit",
475 summary = "commits a previously allocated lease",
476 complexity = "O(log(N)): Where N is the number of pending leases.",
477 flags(Write, DenyOOM),
478 arity = 3,
479 key_spec(
480 flags(ReadWrite, Access, Update),
481 begin_search(index = 1),
482 find_keys(range(last_key = 1, steps = 1, limit = 1)),
483 ),
484 arg(
485 name = "KEY",
486 kind = Key,
487 key_spec_index = 0,
488 ),
489 arg(
490 name = "ID",
491 kind = String,
492 ),
493)]
494fn quota_commit(ctx: &Context, args: CommandArgs) -> RedisResult {
495 let mut args = args.into_iter().skip(1).peekable();
496 let resource = args.next_arg()?;
497 let key = ctx.open_key_writable(&resource);
498
499 let Some(quota) = key.get::<Quota>()? else {
500 return Ok(RedisValue::Integer(0));
501 };
502
503 quota_scope(&key, QuotaMutRef::Ref(quota), |quota, now| {
504 let lease_id = args.next_str()?;
505
506 if quota.renew_lease(lease_id, now, None) {
507 ctx.replicate_verbatim();
508 Ok(RedisValue::Integer(1))
509 } else {
510 Ok(RedisValue::Integer(0))
511 }
512 })
513}
514
515#[redis_command(
516 name = "quota.free",
517 summary = "free a previouly allocated lease",
518 complexity = "O(log(N) + D): Where N is the number of pending leases and D is the number of dimentions.",
519 flags(Write, DenyOOM),
520 arity = 3,
521 key_spec(
522 flags(ReadWrite, Access, Update),
523 begin_search(index = 1),
524 find_keys(range(last_key = 1, steps = 1, limit = 1)),
525 ),
526 arg(
527 name = "KEY",
528 kind = Key,
529 key_spec_index = 0,
530 ),
531 arg(
532 name = "ID",
533 kind = String,
534 ),
535)]
536fn quota_free(ctx: &Context, args: CommandArgs) -> RedisResult {
537 let mut args = args.into_iter().skip(1).peekable();
538 let resource = args.next_arg()?;
539 let key = ctx.open_key_writable(&resource);
540
541 let Some(quota) = key.get::<Quota>()? else {
542 return Ok(RedisValue::Integer(0));
543 };
544
545 quota_scope(&key, QuotaMutRef::Ref(quota), |quota, _| {
546 let lease_id = args.next_str()?;
547 ctx.replicate_verbatim();
548
549 if quota.remove_lease(lease_id) {
550 Ok(RedisValue::Integer(1))
551 } else {
552 Ok(RedisValue::Integer(0))
553 }
554 })
555}
556
557impl Quota {
558 fn gc(&mut self, now_ts: NonZeroU64) {
559 while let Some((_, lease_id)) = self.expiry_timeouts.pop_first_if(|(timeout, _)| *timeout < now_ts) {
560 let Some(lease) = self.leases.remove(&lease_id) else {
561 continue;
562 };
563
564 for dim in lease.dims {
565 if let Some(count) = self.dim_count.get_mut(&dim) {
566 *count = count.saturating_sub(lease.qty);
567 if *count == 0 {
568 self.dim_count.remove(&dim);
569 }
570 }
571 }
572 }
573 }
574
575 fn expires_at(&self) -> Option<NonZeroU64> {
576 self.expiry_timeouts.last().map(|(ts, _)| *ts)
577 }
578
579 fn remove_lease(&mut self, id: &str) -> bool {
580 let Some(lease) = self.leases.remove(id) else {
581 return false;
582 };
583
584 for dim in lease.dims {
585 if let Some(count) = self.dim_count.get_mut(&dim) {
586 *count = count.saturating_sub(lease.qty);
587 if *count == 0 {
588 self.dim_count.remove(&dim);
589 }
590 }
591 }
592
593 if let Some(expiry_ts) = lease.expires_at {
594 self.expiry_timeouts.remove(&(expiry_ts, lease.id));
595 }
596
597 true
598 }
599
600 fn insert_lease(&mut self, lease: Lease) {
601 for dim in &lease.dims {
602 let count = self.dim_count.entry(dim.clone()).or_default();
603 *count += lease.qty;
604 }
605
606 if let Some(expires_ts) = lease.expires_at {
607 self.expiry_timeouts.insert((expires_ts, lease.id.clone()));
608 }
609
610 self.leases.insert(lease.id.clone(), lease);
611 }
612
613 fn renew_lease(&mut self, id: &str, now: NonZeroU64, expires_at: Option<NonZeroU64>) -> bool {
614 let Some(lease) = self.leases.get_mut(id) else {
615 return false;
616 };
617
618 if expires_at.is_some_and(|expires_at| expires_at < now) {
619 return self.remove_lease(id);
620 }
621
622 if lease.expires_at == expires_at {
623 return false;
624 }
625
626 if let Some(expires_at) = lease.expires_at {
627 self.expiry_timeouts.remove(&(expires_at, lease.id.clone()));
628 }
629
630 if let Some(expires_at) = expires_at {
631 self.expiry_timeouts.insert((expires_at, lease.id.clone()));
632 }
633
634 lease.expires_at = expires_at;
635
636 true
637 }
638}
639
640#[redis_module(
641 name = "quota",
642 version = 1,
643 types(Quota),
644 commands(quota_free, quota_commit, quota_renew_at, quota_renew, quota_restore, quota_lease,)
645)]
646pub struct QuotaModule;