Skip to content

Commit

Permalink
Capture Go errors automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
marctc committed Jul 11, 2024
1 parent 890ebbf commit 2513121
Show file tree
Hide file tree
Showing 65 changed files with 464 additions and 99 deletions.
27 changes: 27 additions & 0 deletions bpf/errors.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#ifndef __ERRORS_H_
#define __ERRORS_H_

#ifndef TASK_COMM_LEN
#define TASK_COMM_LEN 16
#endif

#ifndef ERR_MSG_LEN
#define ERR_MSG_LEN 32
#endif

#ifndef MAX_STACK_DEPTH
#define MAX_STACK_DEPTH 32
#endif

typedef __u64 stack_trace_t[MAX_STACK_DEPTH];

typedef struct error_event {
__u32 pid;
__u32 cpu_id;
char comm[TASK_COMM_LEN];
__s32 ustack_sz;
stack_trace_t ustack;
u8 err_msg[ERR_MSG_LEN];
} error_event;

#endif /* __ERRORS_H_ */
60 changes: 59 additions & 1 deletion bpf/go_nethttp.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "tracing.h"
#include "hpack.h"
#include "ringbuf.h"
#include "errors.h"

typedef struct http_func_invocation {
u64 start_monotime_ns;
Expand All @@ -46,6 +47,13 @@ struct {
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_http_client_requests SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, struct error_event);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} last_error SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // key: pointer to the request goroutine
Expand Down Expand Up @@ -198,6 +206,9 @@ int uprobe_ServeHTTPReturns(struct pt_regs *ctx) {
resp_ptr = deref_resp_ptr;
}

struct error_event *error = bpf_map_lookup_elem(&last_error, &goroutine_addr);
bpf_map_delete_elem(&last_error, &goroutine_addr);

http_request_trace *trace = bpf_ringbuf_reserve(&events, sizeof(http_request_trace), 0);
if (!trace) {
bpf_dbg_printk("can't reserve space in the ringbuffer");
Expand All @@ -208,6 +219,8 @@ int uprobe_ServeHTTPReturns(struct pt_regs *ctx) {
trace->type = EVENT_HTTP_REQUEST;
trace->start_monotime_ns = invocation->start_monotime_ns;
trace->end_monotime_ns = bpf_ktime_get_ns();
if (error)
trace->error = *error;

goroutine_metadata *g_metadata = bpf_map_lookup_elem(&ongoing_goroutines, &goroutine_addr);
if (g_metadata) {
Expand Down Expand Up @@ -436,6 +449,51 @@ int uprobe_roundTripReturn(struct pt_regs *ctx) {
return 0;
}


SEC("uprobe/error")
int uprobe_error(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc error return === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

int pid = bpf_get_current_pid_tgid() >> 32;
int cpu_id = bpf_get_smp_processor_id();
int BPF_F_USER_STACK = (1ULL << 8);

struct error_event event = {
.pid = pid,
.cpu_id = cpu_id,
};

// Read the error message
void *msg_ptr = GO_PARAM1(ctx);
if (!read_go_str_n("error", msg_ptr, (u64)GO_PARAM2(ctx), &event.err_msg, sizeof(event.err_msg))) {
bpf_printk("can't read error message");
return 0;
}
bpf_dbg_printk("error msg %llx, %llx", msg_ptr, GO_PARAM2(ctx));

if (bpf_get_current_comm(event.comm, sizeof(event.comm)))
event.comm[0] = 0;

// Read the stack trace
event.ustack_sz = bpf_get_stack(ctx, event.ustack, sizeof(event.ustack), BPF_F_USER_STACK);

// Get the caller of the error function and store it in the first slot of the stack
void *sp_caller = STACK_PTR(ctx);
u64 caller = 0;
bpf_probe_read(&caller, sizeof(u64), sp_caller);
bpf_dbg_printk("sp_caller %lx caller %lx", sp_caller, caller);
event.ustack[0] = caller;

// Write event
if (bpf_map_update_elem(&last_error, &goroutine_addr, &event, BPF_ANY)) {
bpf_dbg_printk("can't update event error map element");
}

return 0;
}
#ifndef NO_HEADER_PROPAGATION
// Context propagation through HTTP headers
SEC("uprobe/header_writeSubset")
Expand Down Expand Up @@ -936,4 +994,4 @@ int uprobe_queryReturn(struct pt_regs *ctx) {
bpf_dbg_printk("can't reserve space in the ringbuffer");
}
return 0;
}
}
3 changes: 2 additions & 1 deletion bpf/headers/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
// In x86, current goroutine is pointed by r14, according to
// https://go.googlesource.com/go/+/refs/heads/dev.regabi/src/cmd/compile/internal-abi.md#amd64-architecture
#define GOROUTINE_PTR(x) ((void*)(x)->r14)

#define STACK_PTR(x) ((void*)(x)->sp)
#elif defined(__TARGET_ARCH_arm64)

#define GO_PARAM1(x) ((void*)((PT_REGS_ARM64 *)(x))->regs[0])
Expand All @@ -50,6 +50,7 @@
// In arm64, current goroutine is pointed by R28 according to
// https://github.com/golang/go/blob/master/src/cmd/compile/abi-internal.md#arm64-architecture
#define GOROUTINE_PTR(x) ((void*)((PT_REGS_ARM64 *)(x))->regs[28])
#define STACK_PTR(x) ((void*)((PT_REGS_ARM64 *)(x))->regs[13])

#endif /*defined(__TARGET_ARCH_arm64)*/

Expand Down
2 changes: 2 additions & 0 deletions bpf/http_trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "pid_types.h"
#include "utils.h"
#include "errors.h"
#include "http_types.h"

#define PATH_MAX_LEN 100
Expand All @@ -39,6 +40,7 @@ typedef struct http_request_trace_t {
u16 status;
connection_info_t conn __attribute__ ((aligned (8)));
s64 content_length;
error_event error;
tp_info_t tp;

pid_info pid;
Expand Down
11 changes: 6 additions & 5 deletions pkg/beyla/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ var DefaultConfig = Config{
TTL: defaultMetricsTTL,
},
Traces: otel.TracesConfig{
Protocol: otel.ProtocolUnset,
TracesProtocol: otel.ProtocolUnset,
MaxQueueSize: 4096,
MaxExportBatchSize: 4096,
ReportersCacheLen: ReporterLRUSize,
Protocol: otel.ProtocolUnset,
TracesProtocol: otel.ProtocolUnset,
MaxQueueSize: 4096,
MaxExportBatchSize: 4096,
ReportersCacheLen: ReporterLRUSize,
ReportExceptionEvents: false,
Instrumentations: []string{
instrumentations.InstrumentationALL,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/discover/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (ta *TraceAttacher) getTracer(ie *Instrumentable) (*ebpf.ProcessTracer, boo
}
} else {
tracerType = ebpf.Go
programs = filterNotFoundPrograms(newGoTracersGroup(ta.Cfg, ta.Metrics), ie.Offsets)
programs = filterNotFoundPrograms(newGoTracersGroup(ta.Cfg, ta.Metrics, ie.Offsets.SymTab), ie.Offsets)
}
case svc.InstrumentableNodejs:
programs = ta.genericTracers()
Expand Down
5 changes: 3 additions & 2 deletions pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package discover

import (
"context"
"debug/gosym"
"fmt"

"github.com/mariomac/pipes/pipe"
Expand Down Expand Up @@ -96,10 +97,10 @@ func (pf *ProcessFinder) Start() (<-chan *ebpf.ProcessTracer, <-chan *Instrument
// auxiliary functions to instantiate the go and non-go tracers on diverse steps of the
// discovery pipeline

func newGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer {
func newGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter, symTab *gosym.Table) []ebpf.Tracer {
// Each program is an eBPF source: net/http, grpc...
return []ebpf.Tracer{
nethttp.New(cfg, metrics),
nethttp.New(cfg, metrics, symTab),
grpc.New(cfg, metrics),
goruntime.New(cfg, metrics),
sarama.New(cfg, metrics),
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/discover/typer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (t *typer) inspectOffsets(execElf *exec.FileInfo) (*goexec.Offsets, bool, e
t.log.Debug("skipping inspection for Go functions", "pid", execElf.Pid, "comm", execElf.CmdExePath)
} else {
t.log.Debug("inspecting", "pid", execElf.Pid, "comm", execElf.CmdExePath)
offsets, err := goexec.InspectOffsets(execElf, t.allGoFunctions)
offsets, err := goexec.InspectOffsets(&t.cfg.Traces, execElf, t.allGoFunctions)
if err != nil {
t.log.Debug("couldn't find go specific tracers", "error", err)
return nil, false, err
Expand All @@ -191,7 +191,7 @@ func isGoProxy(offsets *goexec.Offsets) bool {
func (t *typer) loadAllGoFunctionNames() {
uniqueFunctions := map[string]struct{}{}
t.allGoFunctions = nil
for _, p := range newGoTracersGroup(t.cfg, t.metrics) {
for _, p := range newGoTracersGroup(t.cfg, t.metrics, nil) {
for funcName := range p.GoProbes() {
// avoid duplicating function names
if _, ok := uniqueFunctions[funcName]; !ok {
Expand Down
Binary file added pkg/internal/ebpf/common/bpf_bpf.o
Binary file not shown.
11 changes: 10 additions & 1 deletion pkg/internal/ebpf/common/bpf_bpfel_arm64.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/internal/ebpf/common/bpf_bpfel_arm64.o
Binary file not shown.
11 changes: 10 additions & 1 deletion pkg/internal/ebpf/common/bpf_bpfel_x86.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/internal/ebpf/common/bpf_bpfel_x86.o
Binary file not shown.
5 changes: 3 additions & 2 deletions pkg/internal/ebpf/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ebpfcommon
import (
"bufio"
"bytes"
"debug/gosym"
"encoding/binary"
"io"
"log/slog"
Expand Down Expand Up @@ -103,7 +104,7 @@ var MisclassifiedEvents = make(chan MisclassifiedEvent)

func ptlog() *slog.Logger { return slog.With("component", "ebpf.ProcessTracer") }

func ReadBPFTraceAsSpan(record *ringbuf.Record, filter ServiceFilter) (request.Span, bool, error) {
func ReadBPFTraceAsSpan(record *ringbuf.Record, filter ServiceFilter, symTab *gosym.Table) (request.Span, bool, error) {
var eventType uint8

// we read the type first, depending on the type we decide what kind of record we have
Expand Down Expand Up @@ -136,7 +137,7 @@ func ReadBPFTraceAsSpan(record *ringbuf.Record, filter ServiceFilter) (request.S
return request.Span{}, true, err
}

return HTTPRequestTraceToSpan(&event), false, nil
return HTTPRequestTraceToSpan(&event, symTab), false, nil
}

func ReadSQLRequestTraceAsSpan(record *ringbuf.Record) (request.Span, bool, error) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/internal/ebpf/common/ringbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ebpfcommon

import (
"context"
"debug/gosym"
"errors"
"io"
"log/slog"
Expand Down Expand Up @@ -37,11 +38,12 @@ type ringBufForwarder struct {
spansLen int
access sync.Mutex
ticker *time.Ticker
reader func(*ringbuf.Record, ServiceFilter) (request.Span, bool, error)
reader func(*ringbuf.Record, ServiceFilter, *gosym.Table) (request.Span, bool, error)
// filter the input spans, eliminating these from processes whose PID
// belong to a process that does not match the discovery policies
filter ServiceFilter
metrics imetrics.Reporter
symTab *gosym.Table
}

var singleRbf *ringBufForwarder
Expand All @@ -55,6 +57,7 @@ func SharedRingbuf(
filter ServiceFilter,
ringbuffer *ebpf.Map,
metrics imetrics.Reporter,
symTab *gosym.Table,
) func(context.Context, []io.Closer, chan<- []request.Span) {
singleRbfLock.Lock()
defer singleRbfLock.Unlock()
Expand All @@ -67,7 +70,7 @@ func SharedRingbuf(
rbf := ringBufForwarder{
cfg: cfg, logger: log, ringbuffer: ringbuffer,
closers: nil, reader: ReadBPFTraceAsSpan,
filter: filter, metrics: metrics,
filter: filter, metrics: metrics, symTab: symTab,
}
singleRbf = &rbf
return singleRbf.sharedReadAndForward
Expand All @@ -77,7 +80,7 @@ func ForwardRingbuf(
cfg *TracerConfig,
ringbuffer *ebpf.Map,
filter ServiceFilter,
reader func(*ringbuf.Record, ServiceFilter) (request.Span, bool, error),
reader func(*ringbuf.Record, ServiceFilter, *gosym.Table) (request.Span, bool, error),
logger *slog.Logger,
metrics imetrics.Reporter,
closers ...io.Closer,
Expand Down Expand Up @@ -170,7 +173,7 @@ func (rbf *ringBufForwarder) alreadyForwarded(ctx context.Context, _ []io.Closer
func (rbf *ringBufForwarder) processAndForward(record ringbuf.Record, spansChan chan<- []request.Span) {
rbf.access.Lock()
defer rbf.access.Unlock()
s, ignore, err := rbf.reader(&record, rbf.filter)
s, ignore, err := rbf.reader(&record, rbf.filter, rbf.symTab)
if err != nil {
rbf.logger.Error("error parsing perf event", err)
return
Expand Down
Loading

0 comments on commit 2513121

Please sign in to comment.