Skip to content

Commit

Permalink
Merge branch 'main' into add_error_tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
marctc committed Jul 16, 2024
2 parents ea1709f + 27dc018 commit 12a90af
Show file tree
Hide file tree
Showing 1,205 changed files with 32,977 additions and 437,578 deletions.
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,12 @@ checkfmt:

.PHONY: lint-dashboard
lint-dashboard: prereqs
@echo "### Linting dashboard"
$(DASHBOARD_LINTER) lint grafana/dashboard.json
@echo "### Linting dashboard";
@if [ "$(shell sh -c 'git ls-files --modified | grep grafana/dashboard.json ')" != "" ]; then \
$(DASHBOARD_LINTER) lint --strict grafana/dashboard.json; \
else \
echo '(no git changes detected. Skipping)'; \
fi

.PHONY: lint
lint: prereqs checkfmt
Expand Down
2 changes: 2 additions & 0 deletions bpf/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ typedef struct flow_metrics_t {
u16 flags;
// direction of the flow EGRESS / INGRESS
u8 direction;
// who initiated of the connection: INITIATOR_SRC or INITIATOR_DST
u8 initiator;
// The positive errno of a failed map insertion that caused a flow
// to be sent via ringbuffer.
// 0 otherwise
Expand Down
3 changes: 3 additions & 0 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ static inline int flow_monitor(struct __sk_buff *skb) {
.end_mono_time_ns = current_time,
.flags = flags,
.direction = UNKNOWN,
.initiator = INITIATOR_UNKNOWN,
};

u8 *direction = (u8 *)bpf_map_lookup_elem(&flow_directions, &id);
Expand Down Expand Up @@ -225,6 +226,8 @@ static inline int flow_monitor(struct __sk_buff *skb) {
new_flow.direction = *direction;
}

new_flow.initiator = get_connection_initiator(&id, flags);

// even if we know that the entry is new, another CPU might be concurrently inserting a flow
// so we need to specify BPF_ANY
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
Expand Down
138 changes: 135 additions & 3 deletions bpf/flows_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@
#define FIN_ACK_FLAG 0x200
#define RST_ACK_FLAG 0x400

// In conn_initiator_key, which sorted ip:port inititated the connection
#define INITIATOR_LOW 1
#define INITIATOR_HIGH 2

// In flow_metrics, who initiated the connection
#define INITIATOR_SRC 1
#define INITIATOR_DST 2

#define INITIATOR_UNKNOWN 0

// Common Ringbuffer as a conduit for ingress/egress flows to userspace
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
Expand All @@ -47,16 +57,138 @@ struct {

// Key: the flow identifier. Value: the flow direction.
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, flow_id);
__type(value, u8);
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, flow_id);
__type(value, u8);
} flow_directions SEC(".maps");

// To know who initiated each connection, we store the src/dst ip:ports but ordered
// by numeric value of the IP (and port as secondary criteria), so the key is consistent
// for either client and server flows.
typedef struct conn_initiator_key_t {
struct in6_addr low_ip;
struct in6_addr high_ip;
u16 low_ip_port;
u16 high_ip_port;
} __attribute__((packed)) conn_initiator_key;

// Key: the flow identifier.
// Value: the connection initiator index (INITIATOR_LOW, INITIATOR_HIGH).
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, conn_initiator_key);
__type(value, u8);
} conn_initiators SEC(".maps");

const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};

// Constant definitions, to be overridden by the invoker
volatile const u32 sampling = 0;
volatile const u8 trace_messages = 0;

// we can safely assume that the passed address is IPv6 as long as we encode IPv4
// as IPv6 during the creation of the flow_id.
static inline s32 compare_ipv6(flow_id *fid) {
for (int i = 0; i < 4; i++) {
s32 diff = fid->src_ip.in6_u.u6_addr32[i] - fid->dst_ip.in6_u.u6_addr32[i];
if (diff != 0) {
return diff;
}
}
return 0;
}

// creates a key that is consistent for both requests and responses, by
// ordering endpoints (ip:port) numerically into a lower and a higher endpoint.
// returns true if the lower address corresponds to the source address
// (false if the lower address corresponds to the destination address)
static inline u8 fill_conn_initiator_key(flow_id *id, conn_initiator_key *key) {
s32 cmp = compare_ipv6(id);
if (cmp < 0) {
__builtin_memcpy(&key->low_ip, &id->src_ip, sizeof(struct in6_addr));
key->low_ip_port = id->src_port;
__builtin_memcpy(&key->high_ip, &id->dst_ip, sizeof(struct in6_addr));
key->high_ip_port = id->dst_port;
return 1;
}
// if the IPs are equal (cmp == 0) we will use the ports as secondary order criteria
__builtin_memcpy(&key->high_ip, &id->src_ip, sizeof(struct in6_addr));
__builtin_memcpy(&key->low_ip, &id->dst_ip, sizeof(struct in6_addr));
if (cmp > 0 || id->src_port > id->dst_port) {
key->high_ip_port = id->src_port;
key->low_ip_port = id->dst_port;
return 0;
}
key->low_ip_port = id->src_port;
key->high_ip_port = id->dst_port;
return 1;
}

// returns INITIATOR_SRC or INITIATOR_DST, but might return INITIATOR_UNKNOWN
// if the connection initiator couldn't be found. The user-space Beyla pipeline
// will handle this last case heuristically
static inline u8 get_connection_initiator(flow_id *id, u16 flags) {
conn_initiator_key initiator_key;
// from the initiator_key with sorted ip/ports, know the index of the
// endpoint that that initiated the connection, which might be the low or the high address
u8 low_is_src = fill_conn_initiator_key(id, &initiator_key);
u8 *initiator = (u8 *)bpf_map_lookup_elem(&conn_initiators, &initiator_key);
u8 initiator_index = INITIATOR_UNKNOWN;
if (initiator == NULL) {
// SYN and ACK is sent from the server to the client
// The initiator is the destination address
if ((flags & (SYN_FLAG | ACK_FLAG)) == (SYN_FLAG | ACK_FLAG)) {
if (low_is_src) {
initiator_index = INITIATOR_HIGH;
} else {
initiator_index = INITIATOR_LOW;
}
}
// SYN is sent from the client to the server.
// The initiator is the source address
else if (flags & SYN_FLAG) {
if (low_is_src) {
initiator_index = INITIATOR_LOW;
} else {
initiator_index = INITIATOR_HIGH;
}
}

if (initiator_index != INITIATOR_UNKNOWN) {
bpf_map_update_elem(&conn_initiators, &initiator_key, &initiator_index, BPF_NOEXIST);
}
} else {
initiator_index = *initiator;
}

// when flow receives FIN or RST, clean flow_directions
if (flags & FIN_FLAG || flags & RST_FLAG || flags & FIN_ACK_FLAG || flags & RST_ACK_FLAG) {
bpf_map_delete_elem(&conn_initiators, &initiator_key);
}

u8 flow_initiator = INITIATOR_UNKNOWN;
// at this point, we should know the index of the endpoint that initiated the connection.
// Then we accordingly set whether the initiator is the source or the destination address.
// If not, we forward the unknown status and the userspace will take
// heuristic actions to guess who is
switch (initiator_index) {
case INITIATOR_LOW:
if (low_is_src) {
flow_initiator = INITIATOR_SRC;
} else {
flow_initiator = INITIATOR_DST;
}
break;
case INITIATOR_HIGH:
if (low_is_src) {
flow_initiator = INITIATOR_DST;
} else {
flow_initiator = INITIATOR_SRC;
}
break;
}

return flow_initiator;
}

#endif //__FLOW_HELPERS_H__
2 changes: 2 additions & 0 deletions bpf/flows_sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ int socket__http_filter(struct __sk_buff *skb) {
new_flow.direction = *direction;
}

new_flow.initiator = get_connection_initiator(&id, flags);

// even if we know that the entry is new, another CPU might be concurrently inserting a flow
// so we need to specify BPF_ANY
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
Expand Down
12 changes: 9 additions & 3 deletions bpf/go_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ static __always_inline void read_ip_and_port(u8 *dst_ip, u16 *dst_port, void *sr
}
}

static __always_inline void get_conn_info_from_fd(void *fd_ptr, connection_info_t *info) {
static __always_inline u8 get_conn_info_from_fd(void *fd_ptr, connection_info_t *info) {
if (fd_ptr) {
void *laddr_ptr = 0;
void *raddr_ptr = 0;
Expand All @@ -251,20 +251,26 @@ static __always_inline void get_conn_info_from_fd(void *fd_ptr, connection_info_
// in Go we keep the original connection info order, since we only need it
// sorted when we make server requests or when we populate the trace_map for
// black box context propagation.

return 1;
}
}

return 0;
}

// HTTP black-box context propagation
static __always_inline void get_conn_info(void *conn_ptr, connection_info_t *info) {
static __always_inline u8 get_conn_info(void *conn_ptr, connection_info_t *info) {
if (conn_ptr) {
void *fd_ptr = 0;
bpf_probe_read(&fd_ptr, sizeof(fd_ptr), (void *)(conn_ptr + conn_fd_pos)); // find fd

bpf_dbg_printk("Found fd ptr %llx", fd_ptr);

get_conn_info_from_fd(fd_ptr, info);
return get_conn_info_from_fd(fd_ptr, info);
}

return 0;
}

#endif // GO_COMMON_H
9 changes: 5 additions & 4 deletions bpf/go_grpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
bpf_probe_read(&conn_conn_ptr, sizeof(conn_conn_ptr), conn_ptr + 8);
bpf_dbg_printk("conn_conn_ptr %llx", conn_conn_ptr);
if (conn_conn_ptr) {
get_conn_info(conn_conn_ptr, &trace->conn);
found_conn = 1;
found_conn = get_conn_info(conn_conn_ptr, &trace->conn);
}
}
}
Expand Down Expand Up @@ -431,8 +430,10 @@ int uprobe_transport_http2Client_NewStream(struct pt_regs *ctx) {
bpf_dbg_printk("conn_conn_ptr %llx", conn_conn_ptr);
if (conn_conn_ptr) {
connection_info_t conn = {0};
get_conn_info(conn_conn_ptr, &conn);
bpf_map_update_elem(&ongoing_client_connections, &goroutine_addr, &conn, BPF_ANY);
u8 ok = get_conn_info(conn_conn_ptr, &conn);
if (ok) {
bpf_map_update_elem(&ongoing_client_connections, &goroutine_addr, &conn, BPF_ANY);
}
}
}

Expand Down
10 changes: 8 additions & 2 deletions bpf/go_kafka_go.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ int uprobe_protocol_roundtrip_ret(struct pt_regs *ctx) {
bpf_probe_read(&conn_ptr, sizeof(conn_ptr), (void *)(p_ptr->conn_ptr + 8)); // find conn
bpf_dbg_printk("conn ptr %llx", conn_ptr);
if (conn_ptr) {
get_conn_info(conn_ptr, &trace->conn);
u8 ok = get_conn_info(conn_ptr, &trace->conn);
if (!ok) {
__builtin_memset(&trace->conn, 0, sizeof(connection_info_t));
}
}

__builtin_memcpy(trace->topic, topic_ptr->name, MAX_TOPIC_NAME_LEN);
Expand Down Expand Up @@ -237,7 +240,10 @@ int uprobe_reader_read(struct pt_regs *ctx) {
bpf_probe_read(&conn_ptr, sizeof(conn_ptr), (void *)(conn + 8)); // find conn
bpf_dbg_printk("conn ptr %llx", conn_ptr);
if (conn_ptr) {
get_conn_info(conn_ptr, &r.conn);
u8 ok = get_conn_info(conn_ptr, &r.conn);
if (!ok) {
__builtin_memset(&r.conn, 0, sizeof(connection_info_t));
}
}
}

Expand Down
24 changes: 13 additions & 11 deletions bpf/go_nethttp.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ int uprobe_ServeHTTPReturns(struct pt_regs *ctx) {
bpf_probe_read(&conn_conn_conn_ptr, sizeof(conn_conn_conn_ptr), conn_conn_ptr + 8);
bpf_dbg_printk("conn_conn_conn_ptr %llx", conn_conn_conn_ptr);

get_conn_info(conn_conn_conn_ptr, &trace->conn);
found_conn = 1;
found_conn = get_conn_info(conn_conn_conn_ptr, &trace->conn);
}
}

Expand Down Expand Up @@ -276,9 +275,10 @@ int uprobe_ServeHTTPReturns(struct pt_regs *ctx) {
void *conn_ptr = 0;
bpf_probe_read(&conn_ptr, sizeof(conn_ptr), (void *)(rwc_ptr + rwc_conn_pos)); // find conn
if (conn_ptr) {
get_conn_info(conn_ptr, &trace->conn);
found = 1;
bpf_dbg_printk("found backup connection info");
found = get_conn_info(conn_ptr, &trace->conn);
if (found) {
bpf_dbg_printk("found backup connection info");
}
//dbg_print_http_connection_info(&conn);
}
}
Expand Down Expand Up @@ -644,12 +644,14 @@ int uprobe_http2RoundTrip(struct pt_regs *ctx) {
bpf_dbg_printk("tconn_conn %llx", tconn_conn);

connection_info_t conn = {0};
get_conn_info(tconn_conn, &conn);
u8 ok = get_conn_info(tconn_conn, &conn);

void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);
if (ok) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

bpf_map_update_elem(&ongoing_client_connections, &goroutine_addr, &conn, BPF_ANY);
bpf_map_update_elem(&ongoing_client_connections, &goroutine_addr, &conn, BPF_ANY);
}
}

#ifndef NO_HEADER_PROPAGATION
Expand Down Expand Up @@ -857,7 +859,7 @@ int uprobe_netFdRead(struct pt_regs *ctx) {
bpf_dbg_printk("Found existing server connection, parsing FD information for socket tuples, %llx", goroutine_addr);

void *fd_ptr = GO_PARAM1(ctx);
get_conn_info_from_fd(fd_ptr, conn);
get_conn_info_from_fd(fd_ptr, conn); // ok to not check the result, we leave it as 0

//dbg_print_http_connection_info(conn);
}
Expand Down Expand Up @@ -895,7 +897,7 @@ int uprobe_persistConnRoundTrip(struct pt_regs *ctx) {
bpf_probe_read(&conn_ptr, sizeof(conn_ptr), (void *)(conn_conn_ptr + rwc_conn_pos)); // find conn
if (conn_ptr) {
connection_info_t conn = {0};
get_conn_info(conn_ptr, &conn);
get_conn_info(conn_ptr, &conn); // initialized to 0, no need to check the result if we succeeded
u64 pid_tid = bpf_get_current_pid_tgid();
u32 pid = pid_from_pid_tgid(pid_tid);
tp_info_pid_t tp_p = {
Expand Down
5 changes: 4 additions & 1 deletion bpf/go_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ int uprobe_redis_with_writer(struct pt_regs *ctx) {
bpf_probe_read(&conn_ptr, sizeof(conn_ptr), (void *)(tcp_conn_ptr + 8)); // find conn
bpf_dbg_printk("conn ptr %llx", conn_ptr);
if (conn_ptr) {
get_conn_info(conn_ptr, &req->conn);
u8 ok = get_conn_info(conn_ptr, &req->conn);
if (!ok) {
__builtin_memset(&req->conn, 0, sizeof(connection_info_t));
}
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion bpf/go_sarama.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ int uprobe_sarama_broker_write(struct pt_regs *ctx) {
bpf_probe_read(&conn_ptr, sizeof(conn_ptr), (void *)(tcp_conn_ptr + 8)); // find conn
bpf_dbg_printk("conn ptr %llx", conn_ptr);
if (conn_ptr) {
get_conn_info(conn_ptr, &req.conn);
u8 ok = get_conn_info(conn_ptr, &req.conn);
if (!ok) {
__builtin_memset(&req.conn, 0, sizeof(connection_info_t));
}
}
}
}
Expand Down
Binary file added examples/quickstart/golang/golang
Binary file not shown.
Loading

0 comments on commit 12a90af

Please sign in to comment.