// #Craft
kyoto
is an asynchronous application which uses the tokio
runtime. Rob has opted to use channels for the async data flow which I like from my golang experience. The channel abstraction makes a lot of sense in my head and feels like they protect the developer from a lot of those crazy concurrency scenarios.
With that said, there are a handful of them in kyoto
connecting different subsystems together and I need to walk through the map. Some open questions I have for all the message passing.
- Are there internal channels as well as external? Does the distinction matter?
- How many different types of channels are used?
- Does the caller care about the channels? Is it possible to hide them if they want to just use synchronous calls?
Channel Types
kyoto
makes use of three types of tokio
channel implementations, which have minor differences making them suited for different use cases.
- Bounded // Multi-producer, single-consumer messaging with backpressure.
- Unbounded // Multi-producer, single-consumer messaging with no backpressure.
- Oneshot // Single-producer, single-consumer request-response scenarios.
The bounded and unbounded types are very similar, here is an example in action.
// mpsc is "Multiple Producer Single Consumer"
use tokio::sync::mpsc;
async fn bounded_channel_example() {
// Create a bounded channel with capacity 5, returns a producer and consumer tuple.
// "tx" for transmitter and "rx" for receiver.
let (tx, mut rx) = mpsc::channel::<String>(5);
// Multiple producers!
let tx2 = tx.clone();
tokio::spawn(async move {
for i in 0..3 {
tx.send(format!("Message {} from sender 1", i)).await.unwrap();
}
});
tokio::spawn(async move {
for i in 0..3 {
tx2.send(format!("Message {} from sender 2", i)).await.unwrap();
}
});
// Only one consumer, nicely enforced by the borrow checker.
for _ in 0..6 {
if let Some(msg) = rx.recv().await {
println!("Received: {}", msg);
}
}
}
Flexing a bounded channel.
The oneshot type is a little more bespoke.
use tokio::sync::oneshot;
async fn oneshot_channel_example() {
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Send a single message, consumes the sender.
tx.send("Only message this channel will ever carry".to_string()).unwrap();
});
// Can only receive once since rx is consumed, borrow checker enforced again.
match rx.await {
Ok(msg) => println!("Received response: {}", msg),
Err(_) => println!("Sender dropped without sending"),
}
}
Flexing a oneshot channel.
Data Flow
kyoto
subsystems from 10,000 feet up.
Here is my understanding of the kyoto
subsystems. Some of these line up 1:1 with rust modules, others are implemented with a collection of them.
- Client // The end-user interface for requesting and receiving data from a
kyoto
instance. - Node // The glue that ties the subsystems together.
node
sounds like a bitcoin node, but in this case, it is just a client of the peer to peer network. Also manages end user specific data, including addresses to monitor and transactions to broadcast. - Chain // Holds all the blockchain state, performs block and filter header validations, and makes the call on what is the blockchain. The most critical part of a light client.
- Network // The interface with the bitcoin peer to peer network. There can be some extra complexity here for a light client since it has to trust the network a bit more than a normal peer.
- Database // Persist any data between processes, including chain and network data.
These big boxes share data either through channels or lower-level shared state. The client
to/from node
connection is obviously special, so will save that till later. For the rest, the general pattern is for node
to own the receiver of a channel and coordinate data flow between the subsystems. node
has a pretty classic looking event loop where it selects!
across the receiving ends of its channels for the next work to coordinate.
loop {
// ...
select! {
peer = tokio::time::timeout(Duration::from_secs(LOOP_TIMEOUT), peer_recv.recv()) => {
// Handle peer message
},
message = client_recv.recv() => {
// Handle client message
}
}
}
Event loop lives in node
.
Event Loop
Something interesting about these receivers, they are not simple fields on the node
struct. They are wrapped with Arc
and Mutex
like so Arc<Mutex<Receiver<PeerThreadMessage>>>
. I was just singing the praises of the borrow checker above, why are all these complicated wrappers necessary now? There are quite a few things to keep in mind to explain the wrappers.
The first is that the borrow checker has to be conservative. If it is unsure, it rounds down to “naw”. For example, if you borrow a field of a struct mutable, it considers this a borrowing of the entire stuct mutably, not just the field. Turns out, allowing this flexibility would open up the borrow checker to some complex data race scenarios. So it just says naw. You generally have to give more information to the borrow checker in these scenarios to give it confidence nothing will explode. For the record, the borrow checker does support split borrows where you borrow different fields, but you cannot borrow the whole struct at the same time. But some of these smarts go out the window when async
is introduced.
The big shift is that async
functions are turned into struct-based state machines. Borrows across await
points (state transitions) are very different than regular sync code. Every local variable in an async
function becomes a field in the state machine struct under the hood. So their lifetime must now be for the whole “function”, it is harder to see when a lifetime begins and ends within the function. Even impossible in some scenarios, like a select!
across branches.
The receiver’s recv
function requires a mutable reference, which I haven’t dug into the exact reason for the requirement, but makes sense since you are pulling off a message from the channel’s state. The receivers are field members of the node
, so to get mutable access we usually need mutable access to the struct. The simplest way is a method with &mut self
. And now add in the async event loop, the run()
function with the select!
across multiple branches, each to a different channel receiver on the node
struct. That is mutable references to the stuct and the fields, but all in a state machine. The borrow checker cannot have any confidence those lifetimes will play nice.
Interior Mutability
Rust has a pattern for poking holes in the borrow checker called interior mutability. Generally it’s where an immutable container allows mutability to its contents during runtime. The big loss is compile time checking.
- Cell // Copy-based mutability. Copy the value out, replace.
- RefCell // Runtime-checked borrowing. Get references to the value.
- Mutex // Thread-safe blocking mutability.
- RwLock // Blocking writes, multiple reads.
Cell
is very simple, and the other three are smart pointers. So how can these help the async scenario?
If we limit our application to a single thread, a big but possible assumption, we don’t have to look into the more complex Mutex
(yet!). But Cell
is probably too simple since our receivers which are complex objects which do not implement the required Copy
trait. But RefCell
? Yea, that works.
use std::cell::RefCell;
fn main() {
let counter = RefCell::new(0);
// Reading the value using borrow().
{
let value = counter.borrow(); // Value is a Ref<i32>, which implements Deref to i32
println!("Current count: {}", *value);
} // Value is dropped here, releasing the borrow.
{
let mut value = counter.borrow_mut(); // value is a RefMut<i32>, which implements DerefMut to i32
*value += 1;
println!("Updated count: {}", *value);
}
// We can borrow again after the previous borrow is released.
println!("Final count: {}", *counter.borrow());
}
Basic RefCell
usage, checkout how the counter
variable is not mutable.
If we wrap our receivers in RefCell
’s, the event loop no longer needs to take a mutable reference to self
(the node
instance) since we no longer need mutable references to the receivers. The tradeoff is some more boiler plate to get a reference to the receiver when needed, but now the borrow checker is satisfied. It is now on the developer to ensure mutable lifetimes don’t overlap which would result in a runtime panic. Specifically, borrows are scoped to select!
branches.
And if the runtime is allowed to have multiple thread, a Mutex
is required since different threads are looking to mutate the values.
Ownership
Interior mutability handles the mutation challenge of async, but there can also be some subtle ownership challenges too. Async runtimes make it super easy to spawn off new tasks, lightweight threads. This is great for concurrency, but makes it more difficult to track ownership of resources since the order of tasks lifetimes can get complicated fast.
Generally you use a reference counter of some sort, Rc
or Arc
, when you have a shared ownership to a resource. The lifetime is now managed by the wrapper, rather than implicitly tied to the context which created it. This makes a lot of sense for resources in an async runtime.
However, it is not clear to me why the channel receivers of the node struct need an Arc
wrapper. By definition the receiver can only have one consumer. Also, the channel is created in the node
instance, the resource isn’t passed in from a shared context. Maybe it makes moving the node
instance itself cheaper or safer?