Preempt-RCU: CPU Hotplug handling

This patch allows preemptible RCU to tolerate CPU-hotplug operations.
It accomplishes this by maintaining a local copy of a map of online
CPUs, which it accesses under its own lock.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Reviewed-by: Steven Rostedt <srostedt@redhat.com>
Signed-off-by: Ingo Molnar <mingo@elte.hu>
This commit is contained in:
Paul E. McKenney 2008-01-25 21:08:25 +01:00 committed by Ingo Molnar
parent e260be673a
commit eaf649e9fe
1 changed files with 142 additions and 5 deletions

View File

@ -147,6 +147,8 @@ static char *rcu_try_flip_state_names[] =
{ "idle", "waitack", "waitzero", "waitmb" };
#endif /* #ifdef CONFIG_RCU_TRACE */
static cpumask_t rcu_cpu_online_map __read_mostly = CPU_MASK_NONE;
/*
* Enum and per-CPU flag to determine when each CPU has seen
* the most recent counter flip.
@ -445,7 +447,7 @@ rcu_try_flip_idle(void)
/* Now ask each CPU for acknowledgement of the flip. */
for_each_possible_cpu(cpu)
for_each_cpu_mask(cpu, rcu_cpu_online_map)
per_cpu(rcu_flip_flag, cpu) = rcu_flipped;
return 1;
@ -461,7 +463,7 @@ rcu_try_flip_waitack(void)
int cpu;
RCU_TRACE_ME(rcupreempt_trace_try_flip_a1);
for_each_possible_cpu(cpu)
for_each_cpu_mask(cpu, rcu_cpu_online_map)
if (per_cpu(rcu_flip_flag, cpu) != rcu_flip_seen) {
RCU_TRACE_ME(rcupreempt_trace_try_flip_ae1);
return 0;
@ -492,7 +494,7 @@ rcu_try_flip_waitzero(void)
/* Check to see if the sum of the "last" counters is zero. */
RCU_TRACE_ME(rcupreempt_trace_try_flip_z1);
for_each_possible_cpu(cpu)
for_each_cpu_mask(cpu, rcu_cpu_online_map)
sum += RCU_DATA_CPU(cpu)->rcu_flipctr[lastidx];
if (sum != 0) {
RCU_TRACE_ME(rcupreempt_trace_try_flip_ze1);
@ -507,7 +509,7 @@ rcu_try_flip_waitzero(void)
smp_mb(); /* ^^^^^^^^^^^^ */
/* Call for a memory barrier from each CPU. */
for_each_possible_cpu(cpu)
for_each_cpu_mask(cpu, rcu_cpu_online_map)
per_cpu(rcu_mb_flag, cpu) = rcu_mb_needed;
RCU_TRACE_ME(rcupreempt_trace_try_flip_z2);
@ -525,7 +527,7 @@ rcu_try_flip_waitmb(void)
int cpu;
RCU_TRACE_ME(rcupreempt_trace_try_flip_m1);
for_each_possible_cpu(cpu)
for_each_cpu_mask(cpu, rcu_cpu_online_map)
if (per_cpu(rcu_mb_flag, cpu) != rcu_mb_done) {
RCU_TRACE_ME(rcupreempt_trace_try_flip_me1);
return 0;
@ -637,6 +639,98 @@ void rcu_advance_callbacks(int cpu, int user)
spin_unlock_irqrestore(&rdp->lock, flags);
}
#ifdef CONFIG_HOTPLUG_CPU
#define rcu_offline_cpu_enqueue(srclist, srctail, dstlist, dsttail) do { \
*dsttail = srclist; \
if (srclist != NULL) { \
dsttail = srctail; \
srclist = NULL; \
srctail = &srclist;\
} \
} while (0)
void rcu_offline_cpu(int cpu)
{
int i;
struct rcu_head *list = NULL;
unsigned long flags;
struct rcu_data *rdp = RCU_DATA_CPU(cpu);
struct rcu_head **tail = &list;
/*
* Remove all callbacks from the newly dead CPU, retaining order.
* Otherwise rcu_barrier() will fail
*/
spin_lock_irqsave(&rdp->lock, flags);
rcu_offline_cpu_enqueue(rdp->donelist, rdp->donetail, list, tail);
for (i = GP_STAGES - 1; i >= 0; i--)
rcu_offline_cpu_enqueue(rdp->waitlist[i], rdp->waittail[i],
list, tail);
rcu_offline_cpu_enqueue(rdp->nextlist, rdp->nexttail, list, tail);
spin_unlock_irqrestore(&rdp->lock, flags);
rdp->waitlistcount = 0;
/* Disengage the newly dead CPU from the grace-period computation. */
spin_lock_irqsave(&rcu_ctrlblk.fliplock, flags);
rcu_check_mb(cpu);
if (per_cpu(rcu_flip_flag, cpu) == rcu_flipped) {
smp_mb(); /* Subsequent counter accesses must see new value */
per_cpu(rcu_flip_flag, cpu) = rcu_flip_seen;
smp_mb(); /* Subsequent RCU read-side critical sections */
/* seen -after- acknowledgement. */
}
RCU_DATA_ME()->rcu_flipctr[0] += RCU_DATA_CPU(cpu)->rcu_flipctr[0];
RCU_DATA_ME()->rcu_flipctr[1] += RCU_DATA_CPU(cpu)->rcu_flipctr[1];
RCU_DATA_CPU(cpu)->rcu_flipctr[0] = 0;
RCU_DATA_CPU(cpu)->rcu_flipctr[1] = 0;
cpu_clear(cpu, rcu_cpu_online_map);
spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, flags);
/*
* Place the removed callbacks on the current CPU's queue.
* Make them all start a new grace period: simple approach,
* in theory could starve a given set of callbacks, but
* you would need to be doing some serious CPU hotplugging
* to make this happen. If this becomes a problem, adding
* a synchronize_rcu() to the hotplug path would be a simple
* fix.
*/
rdp = RCU_DATA_ME();
spin_lock_irqsave(&rdp->lock, flags);
*rdp->nexttail = list;
if (list)
rdp->nexttail = tail;
spin_unlock_irqrestore(&rdp->lock, flags);
}
void __devinit rcu_online_cpu(int cpu)
{
unsigned long flags;
spin_lock_irqsave(&rcu_ctrlblk.fliplock, flags);
cpu_set(cpu, rcu_cpu_online_map);
spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, flags);
}
#else /* #ifdef CONFIG_HOTPLUG_CPU */
void rcu_offline_cpu(int cpu)
{
}
void __devinit rcu_online_cpu(int cpu)
{
}
#endif /* #else #ifdef CONFIG_HOTPLUG_CPU */
static void rcu_process_callbacks(struct softirq_action *unused)
{
unsigned long flags;
@ -746,6 +840,32 @@ int rcu_pending(int cpu)
return 0;
}
static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
unsigned long action, void *hcpu)
{
long cpu = (long)hcpu;
switch (action) {
case CPU_UP_PREPARE:
case CPU_UP_PREPARE_FROZEN:
rcu_online_cpu(cpu);
break;
case CPU_UP_CANCELED:
case CPU_UP_CANCELED_FROZEN:
case CPU_DEAD:
case CPU_DEAD_FROZEN:
rcu_offline_cpu(cpu);
break;
default:
break;
}
return NOTIFY_OK;
}
static struct notifier_block __cpuinitdata rcu_nb = {
.notifier_call = rcu_cpu_notify,
};
void __init __rcu_init(void)
{
int cpu;
@ -769,6 +889,23 @@ void __init __rcu_init(void)
rdp->rcu_flipctr[0] = 0;
rdp->rcu_flipctr[1] = 0;
}
register_cpu_notifier(&rcu_nb);
/*
* We don't need protection against CPU-Hotplug here
* since
* a) If a CPU comes online while we are iterating over the
* cpu_online_map below, we would only end up making a
* duplicate call to rcu_online_cpu() which sets the corresponding
* CPU's mask in the rcu_cpu_online_map.
*
* b) A CPU cannot go offline at this point in time since the user
* does not have access to the sysfs interface, nor do we
* suspend the system.
*/
for_each_online_cpu(cpu)
rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE, (void *)(long) cpu);
open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
}