Skip to content

Commit

Permalink
feat(subscriber) expose server parts
Browse files Browse the repository at this point in the history
The `ConsoleLayer` builder provides the user with a console layer and
a server, which is used to start the gRPC server.

However, it may be desireable to expose the instrumentation server together
with other services on the same Tonic router. This was requested
explicitly in #428.

Additionally, to add tests which make use of the instrumentation server
(as part of improving test coverage for #450), more flexibility is
needed than what is provided by the current API. Specifically we would
like to connect a client and server via an in memory channel, rather
than a TCP connection.

This change adds an additional method to `console_subscriber::Server`
called `into_parts` which allows the user to access the
`InstrumentServer` directly. A handle which controls the lifetime of the
`Aggregator` is also provided, as the user must ensure that the
aggregator lives at least as long as the instrument server.

To facilitate the addition of functionality which would result in more
"parts" in the future, `into_parts` returns a non-exhaustive struct,
rather than a tuple of parts.

Closes: #428
  • Loading branch information
hds committed Jul 18, 2023
1 parent 7c8e80a commit 93eb450
Showing 1 changed file with 113 additions and 12 deletions.
125 changes: 113 additions & 12 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![doc = include_str!("../README.md")]
use console_api as proto;
use proto::resources::resource;
use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
use serde::Serialize;
use std::{
cell::RefCell,
Expand All @@ -15,7 +15,10 @@ use std::{
use thread_local::ThreadLocal;
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::sync::{mpsc, oneshot};
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};
#[cfg(unix)]
use tokio_stream::wrappers::UnixListenerStream;
use tracing_core::{
Expand Down Expand Up @@ -933,18 +936,15 @@ impl Server {
///
/// [`tonic`]: https://docs.rs/tonic/
pub async fn serve_with(
mut self,
self,
mut builder: tonic::transport::Server,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let aggregate = self
.aggregator
.take()
.expect("cannot start server multiple times");
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
let addr = self.addr.clone();
let router = builder.add_service(
proto::instrument::instrument_server::InstrumentServer::new(self),
);
let ServerParts {
instrument_server: service,
aggregator_handle: aggregate,
} = self.into_parts();
let router = builder.add_service(service);
let res = match addr {
ServerAddr::Tcp(addr) => {
let serve = router.serve(addr);
Expand All @@ -957,9 +957,110 @@ impl Server {
spawn_named(serve, "console::serve").await
}
};
aggregate.abort();
drop(aggregate);
res?.map_err(Into::into)
}

/// Returns the parts needed to spawn a gRPC server and keep the aggregation
/// worker running.
///
/// Note that a server spawned in this way will overwrite any value set by
/// [`Builder::server_addr`] as the user becomes responsible for defining
/// the address when calling [`Router::serve`].
///
/// # Examples
///
/// The parts can be used to serve the instrument server together with
/// other endpoints from the same gRPC server.
///
/// ```
/// use console_subscriber::{ConsoleLayer, ServerParts};
///
/// # let runtime = tokio::runtime::Builder::new_current_thread()
/// # .enable_all()
/// # .build()
/// # .unwrap();
/// # runtime.block_on(async {
/// let (console_layer, server) = ConsoleLayer::builder().build();
/// let ServerParts {
/// instrument_server,
/// aggregator_handle,
/// ..
/// } = server.into_parts();
///
/// let router = tonic::transport::Server::builder()
/// //.add_service(some_other_service)
/// .add_service(instrument_server);
/// let serve = router.serve(std::net::SocketAddr::new(
/// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
/// 6669,
/// ));
///
/// // Finally, spawn the server.
/// tokio::spawn(serve);
/// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
/// # drop(console_layer);
/// # drop(aggregator_handle);
/// # });
/// ```
///
/// [`Router::serve`]: fn@tonic::transport::server::Router::serve
pub fn into_parts(mut self) -> ServerParts {
let aggregate = self
.aggregator
.take()
.expect("cannot start server multiple times");
let aggregate = spawn_named(aggregate.run(), "console::aggregate");

let service = proto::instrument::instrument_server::InstrumentServer::new(self);

ServerParts {
instrument_server: service,
aggregator_handle: AggregatorHandle {
join_handle: aggregate,
},
}
}
}

/// Server Parts
///
/// This struct contains the parts returned by [`Server::into_parts`]. It may contain
/// further parts in the future, an as such is marked as [`non_exhaustive`].
///
/// The `InstrumentServer<Server>` can be used to construct a router which
/// can be added to a [`tonic`] gRPC server.
///
/// The [`AggregatorHandle`] must be kept until after the server has been
/// shut down.
///
/// See the [`Server::into_parts`] documentation for usage.
#[non_exhaustive]
pub struct ServerParts {
/// The instrument server.
///
/// See the documentation for [`InstrumentServer`] for details.
pub instrument_server: InstrumentServer<Server>,

/// The aggregate handle.
///
/// See the documentation for [`AggregatorHandle`] for details.
pub aggregator_handle: AggregatorHandle,
}

/// Aggregator handle.
///
/// This object is returned from [`Server::into_parts`] and must be
/// kept as long as the `InstrumentServer<Server>` - which is also
/// returned - is in use.
pub struct AggregatorHandle {
join_handle: JoinHandle<()>,
}

impl Drop for AggregatorHandle {
fn drop(&mut self) {
self.join_handle.abort();
}
}

#[tonic::async_trait]
Expand Down

0 comments on commit 93eb450

Please sign in to comment.