tracing: Use temp buffer when filtering events

Filtering of events requires the data to be written to the ring buffer
before it can be decided to filter or not. This is because the parameters of
the filter are based on the result that is written to the ring buffer and
not on the parameters that are passed into the trace functions.

The ftrace ring buffer is optimized for writing into the ring buffer and
committing. The discard procedure used when filtering decides the event
should be discarded is much more heavy weight. Thus, using a temporary
filter when filtering events can speed things up drastically.

Without a temp buffer we have:

 # trace-cmd start -p nop
 # perf stat -r 10 hackbench 50
       0.790706626 seconds time elapsed ( +-  0.71% )

 # trace-cmd start -e all
 # perf stat -r 10 hackbench 50
       1.566904059 seconds time elapsed ( +-  0.27% )

 # trace-cmd start -e all -f 'common_preempt_count==20'
 # perf stat -r 10 hackbench 50
       1.690598511 seconds time elapsed ( +-  0.19% )

 # trace-cmd start -e all -f 'common_preempt_count!=20'
 # perf stat -r 10 hackbench 50
       1.707486364 seconds time elapsed ( +-  0.30% )

The first run above is without any tracing, just to get a based figure.
hackbench takes ~0.79 seconds to run on the system.

The second run enables tracing all events where nothing is filtered. This
increases the time by 100% and hackbench takes 1.57 seconds to run.

The third run filters all events where the preempt count will equal "20"
(this should never happen) thus all events are discarded. This takes 1.69
seconds to run. This is 10% slower than just committing the events!

The last run enables all events and filters where the filter will commit all
events, and this takes 1.70 seconds to run. The filtering overhead is
approximately 10%. Thus, the discard and commit of an event from the ring
buffer may be about the same time.

With this patch, the numbers change:

 # trace-cmd start -p nop
 # perf stat -r 10 hackbench 50
       0.778233033 seconds time elapsed ( +-  0.38% )

 # trace-cmd start -e all
 # perf stat -r 10 hackbench 50
       1.582102692 seconds time elapsed ( +-  0.28% )

 # trace-cmd start -e all -f 'common_preempt_count==20'
 # perf stat -r 10 hackbench 50
       1.309230710 seconds time elapsed ( +-  0.22% )

 # trace-cmd start -e all -f 'common_preempt_count!=20'
 # perf stat -r 10 hackbench 50
       1.786001924 seconds time elapsed ( +-  0.20% )

The first run is again the base with no tracing.

The second run is all tracing with no filtering. It is a little slower, but
that may be well within the noise.

The third run shows that discarding all events only took 1.3 seconds. This
is a speed up of 23%! The discard is much faster than even the commit.

The one downside is shown in the last run. Events that are not discarded by
the filter will take longer to add, this is due to the extra copy of the
event.

Cc: Alexei Starovoitov <ast@kernel.org>
Signed-off-by: Steven Rostedt <rostedt@goodmis.org>
This commit is contained in:
Steven Rostedt (Red Hat) 2016-05-03 17:15:43 -04:00 committed by Steven Rostedt
parent dcb0b5575d
commit 0fc1b09ff1
4 changed files with 186 additions and 9 deletions

View File

@ -312,7 +312,7 @@ int call_filter_check_discard(struct trace_event_call *call, void *rec,
{
if (unlikely(call->flags & TRACE_EVENT_FL_FILTERED) &&
!filter_match_preds(call->filter, rec)) {
ring_buffer_discard_commit(buffer, event);
__trace_event_discard_commit(buffer, event);
return 1;
}
@ -1660,6 +1660,16 @@ tracing_generic_entry_update(struct trace_entry *entry, unsigned long flags,
}
EXPORT_SYMBOL_GPL(tracing_generic_entry_update);
static __always_inline void
trace_event_setup(struct ring_buffer_event *event,
int type, unsigned long flags, int pc)
{
struct trace_entry *ent = ring_buffer_event_data(event);
tracing_generic_entry_update(ent, flags, pc);
ent->type = type;
}
struct ring_buffer_event *
trace_buffer_lock_reserve(struct ring_buffer *buffer,
int type,
@ -1669,21 +1679,136 @@ trace_buffer_lock_reserve(struct ring_buffer *buffer,
struct ring_buffer_event *event;
event = ring_buffer_lock_reserve(buffer, len);
if (event != NULL) {
struct trace_entry *ent = ring_buffer_event_data(event);
tracing_generic_entry_update(ent, flags, pc);
ent->type = type;
}
if (event != NULL)
trace_event_setup(event, type, flags, pc);
return event;
}
DEFINE_PER_CPU(struct ring_buffer_event *, trace_buffered_event);
DEFINE_PER_CPU(int, trace_buffered_event_cnt);
static int trace_buffered_event_ref;
/**
* trace_buffered_event_enable - enable buffering events
*
* When events are being filtered, it is quicker to use a temporary
* buffer to write the event data into if there's a likely chance
* that it will not be committed. The discard of the ring buffer
* is not as fast as committing, and is much slower than copying
* a commit.
*
* When an event is to be filtered, allocate per cpu buffers to
* write the event data into, and if the event is filtered and discarded
* it is simply dropped, otherwise, the entire data is to be committed
* in one shot.
*/
void trace_buffered_event_enable(void)
{
struct ring_buffer_event *event;
struct page *page;
int cpu;
WARN_ON_ONCE(!mutex_is_locked(&event_mutex));
if (trace_buffered_event_ref++)
return;
for_each_tracing_cpu(cpu) {
page = alloc_pages_node(cpu_to_node(cpu),
GFP_KERNEL | __GFP_NORETRY, 0);
if (!page)
goto failed;
event = page_address(page);
memset(event, 0, sizeof(*event));
per_cpu(trace_buffered_event, cpu) = event;
preempt_disable();
if (cpu == smp_processor_id() &&
this_cpu_read(trace_buffered_event) !=
per_cpu(trace_buffered_event, cpu))
WARN_ON_ONCE(1);
preempt_enable();
}
return;
failed:
trace_buffered_event_disable();
}
static void enable_trace_buffered_event(void *data)
{
/* Probably not needed, but do it anyway */
smp_rmb();
this_cpu_dec(trace_buffered_event_cnt);
}
static void disable_trace_buffered_event(void *data)
{
this_cpu_inc(trace_buffered_event_cnt);
}
/**
* trace_buffered_event_disable - disable buffering events
*
* When a filter is removed, it is faster to not use the buffered
* events, and to commit directly into the ring buffer. Free up
* the temp buffers when there are no more users. This requires
* special synchronization with current events.
*/
void trace_buffered_event_disable(void)
{
int cpu;
WARN_ON_ONCE(!mutex_is_locked(&event_mutex));
if (WARN_ON_ONCE(!trace_buffered_event_ref))
return;
if (--trace_buffered_event_ref)
return;
preempt_disable();
/* For each CPU, set the buffer as used. */
smp_call_function_many(tracing_buffer_mask,
disable_trace_buffered_event, NULL, 1);
preempt_enable();
/* Wait for all current users to finish */
synchronize_sched();
for_each_tracing_cpu(cpu) {
free_page((unsigned long)per_cpu(trace_buffered_event, cpu));
per_cpu(trace_buffered_event, cpu) = NULL;
}
/*
* Make sure trace_buffered_event is NULL before clearing
* trace_buffered_event_cnt.
*/
smp_wmb();
preempt_disable();
/* Do the work on each cpu */
smp_call_function_many(tracing_buffer_mask,
enable_trace_buffered_event, NULL, 1);
preempt_enable();
}
void
__buffer_unlock_commit(struct ring_buffer *buffer, struct ring_buffer_event *event)
{
__this_cpu_write(trace_cmdline_save, true);
ring_buffer_unlock_commit(buffer, event);
/* If this is the temp buffer, we need to commit fully */
if (this_cpu_read(trace_buffered_event) == event) {
/* Length is in event->array[0] */
ring_buffer_write(buffer, event->array[0], &event->array[1]);
/* Release the temp buffer */
this_cpu_dec(trace_buffered_event_cnt);
} else
ring_buffer_unlock_commit(buffer, event);
}
static struct ring_buffer *temp_buffer;
@ -1695,8 +1820,23 @@ trace_event_buffer_lock_reserve(struct ring_buffer **current_rb,
unsigned long flags, int pc)
{
struct ring_buffer_event *entry;
int val;
*current_rb = trace_file->tr->trace_buffer.buffer;
if ((trace_file->flags &
(EVENT_FILE_FL_SOFT_DISABLED | EVENT_FILE_FL_FILTERED)) &&
(entry = this_cpu_read(trace_buffered_event))) {
/* Try to use the per cpu buffer first */
val = this_cpu_inc_return(trace_buffered_event_cnt);
if (val == 1) {
trace_event_setup(entry, type, flags, pc);
entry->array[0] = len;
return entry;
}
this_cpu_dec(trace_buffered_event_cnt);
}
entry = trace_buffer_lock_reserve(*current_rb,
type, len, flags, pc);
/*

View File

@ -1083,6 +1083,23 @@ static inline void trace_buffer_unlock_commit(struct trace_array *tr,
trace_buffer_unlock_commit_regs(tr, buffer, event, flags, pc, NULL);
}
DECLARE_PER_CPU(struct ring_buffer_event *, trace_buffered_event);
DECLARE_PER_CPU(int, trace_buffered_event_cnt);
void trace_buffered_event_disable(void);
void trace_buffered_event_enable(void);
static inline void
__trace_event_discard_commit(struct ring_buffer *buffer,
struct ring_buffer_event *event)
{
if (this_cpu_read(trace_buffered_event) == event) {
/* Simply release the temp buffer */
this_cpu_dec(trace_buffered_event_cnt);
return;
}
ring_buffer_discard_commit(buffer, event);
}
/*
* Helper function for event_trigger_unlock_commit{_regs}().
* If there are event triggers attached to this event that requires
@ -1111,7 +1128,7 @@ __event_trigger_test_discard(struct trace_event_file *file,
if (test_bit(EVENT_FILE_FL_SOFT_DISABLED_BIT, &file->flags) ||
(unlikely(file->flags & EVENT_FILE_FL_FILTERED) &&
!filter_match_preds(file->filter, entry))) {
ring_buffer_discard_commit(buffer, event);
__trace_event_discard_commit(buffer, event);
return true;
}

View File

@ -363,6 +363,7 @@ static int __ftrace_event_enable_disable(struct trace_event_file *file,
{
struct trace_event_call *call = file->event_call;
struct trace_array *tr = file->tr;
unsigned long file_flags = file->flags;
int ret = 0;
int disable;
@ -445,6 +446,15 @@ static int __ftrace_event_enable_disable(struct trace_event_file *file,
break;
}
/* Enable or disable use of trace_buffered_event */
if ((file_flags & EVENT_FILE_FL_SOFT_DISABLED) !=
(file->flags & EVENT_FILE_FL_SOFT_DISABLED)) {
if (file->flags & EVENT_FILE_FL_SOFT_DISABLED)
trace_buffered_event_enable();
else
trace_buffered_event_disable();
}
return ret;
}

View File

@ -823,7 +823,12 @@ static void __free_preds(struct event_filter *filter)
static void filter_disable(struct trace_event_file *file)
{
unsigned long old_flags = file->flags;
file->flags &= ~EVENT_FILE_FL_FILTERED;
if (old_flags != file->flags)
trace_buffered_event_disable();
}
static void __free_filter(struct event_filter *filter)
@ -1698,7 +1703,12 @@ fail:
static inline void event_set_filtered_flag(struct trace_event_file *file)
{
unsigned long old_flags = file->flags;
file->flags |= EVENT_FILE_FL_FILTERED;
if (old_flags != file->flags)
trace_buffered_event_enable();
}
static inline void event_set_filter(struct trace_event_file *file,