1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
/* * Copyright (C) 2019 Open Whisper Systems * Copyright (C) 2021 jessa0 * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */ //! Higher-level API for a Raft node. use alloc::collections::BTreeSet; use bytes::Bytes; use core::fmt::Display; use crate::core::{RaftState, ReplicationState}; use crate::message::{LogIndex, RaftMessage, SendableRaftMessage, TermId}; use crate::log::{CommittedIter, RaftLog}; use rand_core::RngCore; /// A Raft node, used for replicating a strongly-consistent distributed log of entries with arbitrary data amongst its /// peers. /// /// The distributed log can be used, for example, to replicate transactions in a database. /// /// # Appending entries to the distributed log /// /// Log entries passed to [`append`] are not guaranteed to ultimately be appended to the distributed log, and may be /// cancelled any time [`receive`] is called before they are "committed". The provided [`RaftLog`] should provide an API /// to find out which log entries have been cancelled. Only log entries passed to [`append`] on a particular node are /// guaranteed to appear as cancelled in its own [`RaftLog`], but entries appended on other nodes may appear as well. /// /// The distributed log may only be appended to by the node returned by [`leader`], but even that node is not guaranteed /// to be able to append to the log, since it must be able to send each new entry to a majority of its peers before /// losing leadership in order for the entry to become committed. The leader may change at any time, and therefore an /// entry may be first returned from [`take_committed`] on a node different than that to which it was submitted. /// However, [`take_committed`] is guaranteed to return the same entries in the same order on every node. /// /// # Timer ticks /// /// Timeouts in [`RaftNode`] are driven by a timer ticking at fixed interval, with the number of ticks between timeouts /// configured by the provided [`RaftConfig`]. Any consistent time interval between ticks may be chosen, but the time /// interval and [`RaftConfig`] must be the same on all peers in a group. Shorter timeouts will allow Raft to react /// quicker to network disruptions, but may result in spurious leadership changes when the network latency exceeds /// `time_interval * election_timeout_ticks`. /// /// # Message delivery /// /// Unicast message delivery is assumed to be non-lossy in order for replication to make progress. In other words, once /// a non-broadcast [`SendableRaftMessage`] is returned from an API such as [`append`], [`receive`], or [`timer_tick`], /// it must be retained and retransmitted until it is confirmed to have been processed by [`receive`] on its /// destination. Messages may be safely delivered out-of-order or more than once, however. /// /// To prevent unbounded queueing, the API is designed to only ever return a bounded amount of unacknowledged unicast /// message data. This amount can be approximately controlled by [`replication_chunk_size`]. /// /// [`append`]: Self::append /// [`leader`]: Self::leader /// [`receive`]: Self::receive /// [`replication_chunk_size`]: RaftConfig::replication_chunk_size /// [`SendableRaftMessage`]: crate::message::SendableRaftMessage /// [`take_committed`]: Self::take_committed /// [`timer_tick`]: Self::timer_tick pub struct RaftNode<Log, Random, NodeId> { state: RaftState<Log, Random, NodeId>, } /// Configurable parameters of a Raft node. #[derive(Clone, Eq, PartialEq)] pub struct RaftConfig { /// The minimum number of timer ticks between leadership elections. pub election_timeout_ticks: u32, /// The number of timer ticks between sending heartbeats to peers. pub heartbeat_interval_ticks: u32, /// The maximum number of bytes to replicate to a peer at a time. pub replication_chunk_size: usize, } /// An error returned while attempting to append to a Raft log. pub enum AppendError<E> { /// The append to the Raft log was cancelled and should be resubmitted to the current Raft leader. Cancelled { /// Arbitrary data associated with the log entry. data: Bytes, }, /// An error was returned by the [`RaftLog`](crate::log::RaftLog) implementation. RaftLogErr(E), } impl<Log, Random, NodeId> RaftNode<Log, Random, NodeId> where Log: RaftLog, Random: RngCore, NodeId: Ord + Clone + Display, { /// Constructs a new Raft node with specified peers and configuration. /// /// The Raft node will start with an empty initial state. The `log` provided should also be in an empty initial /// state. Each Raft node in a group must be constructed with the same set of peers and `config`. `peers` may /// contain `node_id` or omit it to the same effect. `rand` must produce different values on every node in a group. pub fn new( node_id: NodeId, peers: BTreeSet<NodeId>, log: Log, random: Random, config: RaftConfig, ) -> Self { Self { state: RaftState::new( node_id, peers, log, random, config, ), } } /// Request appending an entry with arbitrary `data` to the Raft log, returning messages to be sent. /// /// See ["Message delivery"] for details about delivery requirements for the returned messages. /// /// # Errors /// /// If this request would immediately be cancelled, then an error is returned. /// /// ["Message delivery"]: RaftNode#message-delivery #[must_use = "This function returns Raft messages to be sent."] pub fn append<T: Into<Bytes>>(&mut self, data: T) -> Result<impl Iterator<Item = SendableRaftMessage<NodeId>> + '_, AppendError<Log::Error>> { let () = self.state.client_request(data.into())?; Ok(self.append_entries()) } /// Returns this node's configurable parameters. pub fn config(&self) -> &RaftConfig { self.state.config() } /// Returns whether this node is the leader of the latest known term. pub fn is_leader(&self) -> bool { self.state.is_leader() } /// Returns the index of the last [`LogEntry`] which has been committed and thus may be returned by /// [`take_committed`]. /// /// [`take_committed`]: Self::take_committed /// [`LogEntry`]: crate::message::LogEntry pub fn last_committed_log_index(&self) -> LogIndex { *self.state.commit_idx() } /// Returns the ID of the leader, if there is one, of the latest known term, along with the term. pub fn leader(&self) -> (Option<&NodeId>, TermId) { let (leader, term) = self.state.leader(); (leader, *term) } /// Returns a reference to the Raft log storage. pub fn log(&self) -> &Log { self.state.log() } /// Returns a mutable reference to the Raft log storage. pub fn log_mut(&mut self) -> &mut Log { self.state.log_mut() } /// Returns this node's ID. pub fn node_id(&self) -> &NodeId { self.state.node_id() } /// Returns the IDs of this node's peers. pub fn peers(&self) -> &BTreeSet<NodeId> { self.state.peers() } /// Processes receipt of a `message` from a peer with ID `from`, returning messages to be sent. /// /// See ["Message delivery"] for details about delivery requirements for the returned messages. /// /// ["Message delivery"]: RaftNode#message-delivery #[must_use = "This function returns Raft messages to be sent."] pub fn receive( &mut self, message: RaftMessage, from: NodeId, ) -> impl Iterator<Item = SendableRaftMessage<NodeId>> + '_ { let message = self.state.receive(message, from); message.into_iter().chain(self.append_entries()) } /// Returns the replication state corresponding to the peer with ID `peer_node_id`. pub fn replication_state(&self, peer_node_id: &NodeId) -> Option<&ReplicationState> { self.state.replication_state(peer_node_id) } /// Returns a reference to the low-level state of the Raft node. pub fn state(&mut self) -> &RaftState<Log, Random, NodeId> { &self.state } /// Returns a mutable reference to the low-level state of the Raft node. pub fn state_mut(&mut self) -> &mut RaftState<Log, Random, NodeId> { &mut self.state } /// Returns an iterator yielding committed [log entries][`LogEntry`]. A given [`LogEntry`] will be yielded only once /// over the lifetime of a [`RaftNode`]. See ["Appending entries to the distributed log"] for details about log /// commital. /// /// ["Appending entries to the distributed log"]: RaftNode#appending-entries-to-the-distributed-log /// [`LogEntry`]: crate::message::LogEntry pub fn take_committed(&mut self) -> CommittedIter<'_, Log> { self.state.take_committed() } /// Ticks forward this node's internal clock by one tick, returning messages to be sent. /// /// See ["Message delivery"] for details about delivery requirements for the returned messages. /// /// ["Message delivery"]: RaftNode#message-delivery #[must_use = "This function returns Raft messages to be sent."] pub fn timer_tick(&mut self) -> impl Iterator<Item = SendableRaftMessage<NodeId>> + '_ { let message = self.state.timer_tick(); message.into_iter().chain(self.append_entries()) } #[must_use = "This function returns Raft messages to be sent."] fn append_entries( &mut self, ) -> impl Iterator<Item = SendableRaftMessage<NodeId>> + '_ { let peers = self.state.peers().clone().into_iter(); peers.flat_map(move |peer| self.state.append_entries(peer)) } }