[PATCHv3] lib/ratelimit: Lockless ratelimiting
Dmitry Safonov
dima at arista.com
Mon Aug 20 23:14:35 UTC 2018
Hi Petr, thanks for review,
On Wed, 2018-08-15 at 17:10 +0200, Petr Mladek wrote:
> On Tue 2018-07-03 23:56:28, Dmitry Safonov wrote:
> > Currently ratelimit_state is protected with spin_lock. If the .lock
> > is
> > taken at the moment of ___ratelimit() call, the message is
> > suppressed to
> > make ratelimiting robust.
> >
> > That results in the following issue issue:
> > CPU0 CPU1
> > ------------------ ------------------
> > printk_ratelimit() printk_ratelimit()
> > | |
> > try_spin_lock() try_spin_lock()
> > | |
> > time_is_before_jiffies() return 0; // suppress
> >
> > So, concurrent call of ___ratelimit() results in silently
> > suppressing
> > one of the messages, regardless if the limit is reached or not.
> > And rc->missed is not increased in such case so the issue is
> > covered
> > from user.
> >
> > Convert ratelimiting to use atomic counters and drop spin_lock.
> >
> > Note: That might be unexpected, but with the first interval of
> > messages
> > storm one can print up to burst*2 messages. So, it doesn't
> > guarantee
> > that in *any* interval amount of messages is lesser than burst.
> > But, that differs lightly from previous behavior where one can
> > start
> > burst=5 interval and print 4 messages on the last milliseconds of
> > interval + new 5 messages from new interval (totally 9 messages in
> > lesser than interval value):
>
> I am still confused by this paragraph. Does this patch change the
> behavior? What is the original and what is the new one, please?
Yeah, I could be a bit cleaner about the change.
Originally more than `burst` messages could be printed if the previous
period hasn't ended:
>
> > msg0 msg1-msg4 msg0-msg4
> > | | |
> > | | |
> > |--o---------------------o-|-----o--------------------|--> (t)
> > <------->
> > Lesser than burst
But now, because I check:
+ if (atomic_add_unless(&rs->printed, 1, rs->burst))
+ return 1;
*before* checking the end of interval, the maximum number of messages
in the peak will be the same, burst*2 (burst*2 - 1 in original).
Why do I check it before the end of interval?
I thought it would be a nice optimization, making atomic_add_unless()
the only called function on the fast-path (when we haven't hit the
limit yet). If you want, I can move it into a separate patch..
> >
> > Dropped dev/random patch since v1 version:
> > lkml.kernel.org/r/<20180510125211.12583-1-dima at arista.com>
> >
> > Dropped `name' in as it's unused in RATELIMIT_STATE_INIT()
> >
> > diff --git a/lib/ratelimit.c b/lib/ratelimit.c
> > index d01f47135239..d9b749d40108 100644
> > --- a/lib/ratelimit.c
> > +++ b/lib/ratelimit.c
> > @@ -13,6 +13,18 @@
> > #include <linux/jiffies.h>
> > #include <linux/export.h>
> >
> > +static void ratelimit_end_interval(struct ratelimit_state *rs,
> > const char *func)
> > +{
> > + rs->begin = jiffies;
> > +
> > + if (!(rs->flags & RATELIMIT_MSG_ON_RELEASE)) {
> > + unsigned int missed = atomic_xchg(&rs->missed, 0);
> > +
> > + if (missed)
> > + pr_warn("%s: %u callbacks suppressed\n",
> > func, missed);
> > + }
> > +}
> > +
> > /*
> > * __ratelimit - rate limiting
> > * @rs: ratelimit_state data
> > @@ -27,45 +39,30 @@
> > */
> > int ___ratelimit(struct ratelimit_state *rs, const char *func)
> > {
> > - unsigned long flags;
> > - int ret;
> > -
> > if (!rs->interval)
> > return 1;
> >
> > - /*
> > - * If we contend on this state's lock then almost
> > - * by definition we are too busy to print a message,
> > - * in addition to the one that will be printed by
> > - * the entity that is holding the lock already:
> > - */
> > - if (!raw_spin_trylock_irqsave(&rs->lock, flags))
> > + if (unlikely(!rs->burst)) {
> > + atomic_add_unless(&rs->missed, 1, -1);
> > + if (time_is_before_jiffies(rs->begin + rs-
> > >interval))
> > + ratelimit_end_interval(rs, func);
>
> This is racy. time_is_before_jiffies() might be valid on two
> CPUs in parallel. They would both call ratelimit_end_interval().
> This is not longer atomic context. Therefore one might get scheduled
> and set rs->begin = jiffies seconds later. I am sure that there might
> be more crazy scenarios.
That's the case with rs->burst == 0.
So, in this situation all the messages will be suppressed.
And the only reason to call ratelimit_end_interval() is to update the
start time and number of messages not printed.
I didn't want to add any complexion for this case - the worst will be
the count of messages suppressed will be imprecise for rs->burst == 0
case. Not a big deal, huh?
>
> > +
> > return 0;
> > + }
> >
> > - if (!rs->begin)
> > - rs->begin = jiffies;
> > + if (atomic_add_unless(&rs->printed, 1, rs->burst))
> > + return 1;
> >
> > if (time_is_before_jiffies(rs->begin + rs->interval)) {
> > - if (rs->missed) {
> > - if (!(rs->flags &
> > RATELIMIT_MSG_ON_RELEASE)) {
> > - printk_deferred(KERN_WARNING
> > - "%s: %d callbacks
> > suppressed\n",
> > - func, rs->missed);
> > - rs->missed = 0;
> > - }
> > - }
> > - rs->begin = jiffies;
> > - rs->printed = 0;
> > - }
> > - if (rs->burst && rs->burst > rs->printed) {
> > - rs->printed++;
> > - ret = 1;
> > - } else {
> > - rs->missed++;
> > - ret = 0;
> > + if (atomic_cmpxchg(&rs->printed, rs->burst, 0))
> > + ratelimit_end_interval(rs, func);
> > }
> > - raw_spin_unlock_irqrestore(&rs->lock, flags);
> >
> > - return ret;
> > + if (atomic_add_unless(&rs->printed, 1, rs->burst))
> > + return 1;
>
> The entire logic is complicated and hard to understand. Especially
> calling
> ratelimit_end_interval() and atomic_add_unless(&rs->printed) twice.
Probably, I've described the reasons above.. I may add a comment or
drop the first atomic_add_unless(), but I would prefer still keep it
there, having only one check for fast-path.
>
> > + atomic_add_unless(&rs->missed, 1, -1);
> > +
> > + return 0;
> > }
>
>
> I wonder if the following code would do the job (not even compile
> tested!):
>
> static void ratelimit_end_interval(struct ratelimit_state *rs, const
> char *func)
> {
> rs->begin = jiffies;
>
> if (!(rs->flags & RATELIMIT_MSG_ON_RELEASE)) {
> unsigned int missed = atomic_xchg(&rs->missed, 0);
>
> if (missed)
> pr_warn("%s: %u callbacks suppressed\n", func,
> missed);
> }
>
> atomic_xchg(&rs->printed, 0);
> }
>
> /*
> * __ratelimit - rate limiting
> * @rs: ratelimit_state data
> * @func: name of calling function
> *
> * This enforces a rate limit: not more than @rs->burst callbacks
> * in every @rs->interval
> *
> * RETURNS:
> * 0 means callbacks will be suppressed.
> * 1 means go ahead and do it.
> */
> int ___ratelimit(struct ratelimit_state *rs, const char *func)
> {
> unsigned long begin, old_begin = rs->begin;
>
> if (!rs->interval)
> return 1;
>
> if (time_is_before_jiffies(rs->begin + rs->interval) &&
> cmpxchg(&rs->begin, begin, begin + rs->interval) == begin)
Probably, cmpxchg(&rs->begin, begin, jiffies)?
Otherwise hitting it later will be buggy.
Also `begin` might be uninitialized? (if cmpxchg() fails)
> {
> ratelimit_end_interval(rs, func);
> }
>
> if (atomic_add_unless(&rs->printed, 1, rs->burst))
> return 1;
>
> atomic_add_unless(&rs->missed, 1, -1);
> return 0;
> }
> EXPORT_SYMBOL(___ratelimit);
>
>
> The main logic is the same as in the original code. Only one CPU is
> able to reset the interval and counters (thanks to cmpxchg).
> Every caller increases either "printed" or "missed" counter.
Do we care about fast-path argument I did above?
--
Thanks,
Dmitry
More information about the dri-devel
mailing list