通过本文,你将了解到: 点对点通信的特点;libp2p的基本介绍; 以及Substrate如何使用libp2p实现点对点的通信。
在为以上问题寻找解决方案时,点对点(即peer to peer)的通信机制逐渐走进了技术先驱们的视野。在互联网早期的时候,点对点通信主要用于文件共享,如音乐共享服务Napster和流媒体下载服务BitTorrent。点对点服务更加广泛的应用,还需要一定的治理机制,来处理资源的版权问题和现实世界的监管,这些不是本文的重点,不做过多地介绍。
点对点通信模型如下图:
在点对点的网络里,所有的节点都是对等的,即任何节点都可以存储和处理数据(作为服务端);也可以发送待处理的数据给网络中的其它节点,获取经过网络处理后的数据(作为客户端)。通过这样的通信机制,可以保证,
- 网络具备开放性,节点可以自由加入和退出;
- 不依赖单一服务节点,网络的服务更加可靠、高效;
- 节点运行的程序代码公开可见,规则更加透明。
根据网络中传输的数据和提供服务的不同,点对点应用出现了不同的应用场景,包括文件存储和读取、数据计算、内容共享、数据交易等服务。在开发这些应用的过程中,可能涉及到的技术要点有:
- 节点身份,唯一地标识网络中的节点及地址格式;
- 发现机制,在没有中心化的协调服务存在的情况下,如何发现新的节点;
- 路由,本地节点无法存储网络中所有节点的信息,通过路由算法查找需要的节点;
- 多种通信协议比如TCP、UDP、WebSocket、QUIC等等;
- 加密和认证,保证消息的可靠和安全;
- NAT穿透,解决NAT后面的内部IP无法访问的难题;
- 多路复用以节省资源;
- 消息订阅,高效的获取更新而不会给网络造成负担;
- 中继,当需要建立通信的两个节点都无法直接被访问,比如都在NAT网络中,需要通过中继节点传递信息;
- ……
以上列出的这些技术要点/需求并不会出现在每个点对点应用里,大多数只会使用其中的一部分功能,尽管如此,还是存在严重地重复造轮子的现象。也有一些应用为了避免重复开发,选择了fork已有开源应用的功能代码,这种方式引入了原有应用的技术债,难于定制和扩展。
复杂多变的网络拓扑和膨胀的应用状态导致了点对点应用的开发、推广和普及都极为困难,出现一个高度模块化的点对点通信开发框架也就不足为奇,也就是接下来我们要介绍的libp2p。
Libp2p 介绍
Libp2p是一个开发点对点应用的框架,它最早源于去中心的文件共享服务IPFS,把网络通信相关的内容抽离并重新设计,形成了现在的libp2p,目前比较成熟的几个语言版本包括js-libp2p、go-libp2p、rust-libp2p,并且定义了一套参考规范,不同语言的实现版本只要符合这一规范,就可以实现互通信。
Libp2p提供的核心功能包括,
- 在节点之间建立安全可复用的网络连接;
- 可验证的节点身份和可连接的地址。
安全可复用的连接
Libp2p支持的底层(传输层)协议包括TCP/IP、UDP、WebSocket、QUIC等,不同语言版本的实现完成度不尽相同。连接的安全性是通过对传输内容进行加密来保证的,节点的身份也会进行相应的验证。
为了提升连接的利用率以及应对复杂的网络场景如各种形式的防火墙和NAT,对建立的底层连接进行多路复用十分有必要,stream就是可实现复用的一种上层连接形式,它可以是双向的,也可以是单向的。
QUIC协议有内置的安全和复用组件,对于没有此类功能的协议,使用libp2p可以对原始连接进行upgrade,添加所需的安全和可复用的套件,安全套件有secio、Noise,可复用套件有yamux和mplex。
Upgrade协议的流程如下图:
在stream里可以传输各种各样的libp2p内置或用户自定义的应用层协议,这些协议定义了节点间交换信息的方式和内容,比如:
- ping,用来定时检查节点是否在线;
- identity,用于节点间交换信息如节点的public key和网络中的地址;
- kad-dht,基于Kademlia算法的分布哈希表,用于节点间路由;
- ……
以identity协议为例,它的协议id(具有路径格式的字符串)为/ipfs/id/1.0.0
,消息的表示和序列化使用的是protocol buffer,
message Identify { optional string protocolVersion = 5; optional string agentVersion = 6; optional bytes publicKey = 1; repeated bytes listenAddrs = 2; optional bytes observedAddr = 4; repeated string protocols = 3; }
节点身份
节点启动时需要提供一个private key(也可以随机生成),主要用于
- 将节点双方的公钥通过Diffie-Helman key exchange对消息进行加解密;
- 对节点的public key进行哈希,生成PeerId即节点身份。
Libp2p支持的公钥加密算法包括RSA、Ed25519、Secp256k1等。PeerId的生成采用了multihashes的形式,即支持多种哈希算法,经过base 58 编码后的格式如QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
。
将PeerId与multiaddr结合可以用来在网络中定位节点和验证身份,例如IP地址为7.7.7.7、监听在4242端口、拥有上述PeerId的节点的multiaddr(多层次地址)为:
ip4/7.7.7.7/tcp/4242/p2p/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
以上只列出了libp2p提供的部分功能,更多内容例如消息订阅、中继、NAT穿透等等可以参考相关文档,使用libp2p开发点对点应用可以解决以上提到的大部分难题和技术点,节约大量的开发时间,增加系统的可维护性和可扩展性。接下来,我们看一下如何使用rust-libp2p实现简单的自定义应用协议。
简单应用
这里我们基于rust-libp2p,编写一个简单的点对点应用,可以完成回声(echo)的功能,即其中一个节点发送一个字符串,另一个节点接收该字符串并回复相同的字符,这里我们需要自定义一个应用层的协议EchoProtocol
,需要实现libp2p提供的UpgradeInfo
接口,
#[derive(Default, Debug, Copy, Clone)] pub struct EchoProtocol; impl UpgradeInfo for EchoProtocol { type Info = &'static [u8]; type InfoIter = iter::Once<Self::Info>; fn protocol_info(&self) -> Self::InfoIter { iter::once(b"/ipfs/echo/1.0.0") } }
这里的protocol_info
方法返回了协议的名字和格式。接着实现InboundUpgrade
和OutboundUpgrade
,这两个接口都继承自UpgradeInfo
,
impl InboundUpgrade<NegotiatedSubstream> for EchoProtocol { type Output = NegotiatedSubstream; type Error = Void; type Future = future::Ready<Result<Self::Output, Self::Error>>; fn upgrade_inbound(self, stream: NegotiatedSubstream, _: Self::Info) -> Self::Future { future::ok(stream) } } impl OutboundUpgrade<NegotiatedSubstream> for EchoProtocol { type Output = NegotiatedSubstream; type Error = Void; type Future = future::Ready<Result<Self::Output, Self::Error>>; fn upgrade_outbound(self, stream: NegotiatedSubstream, _: Self::Info) -> Self::Future { future::ok(stream) } }
NegotiatedSubstream
表示协商好的某个协议将会使用的I/O流。当远端的节点支持当前协议时,调用upgrade_inbound
和upgrade_outbound
分别在listener和dialer端开启握手信号。
之后,定义处理连接请求的handler,也就是我们这里的结构体EchoHandler
,它保存了处理过程中所使用的状态信息。
pub struct EchoHandler { inbound: Option<EchoFuture>, outbound: Option<EchoFuture>, init_echo: bool, already_echo: bool, } type EchoFuture = BoxFuture<'static, Result<NegotiatedSubstream, io::Error>>; impl EchoHandler { pub fn new(init_echo: bool) -> Self { EchoHandler { inbound: None, outbound: None, init_echo, already_echo: false, } } }
还需要一个自定义的枚举event枚举类型,
#[derive(Debug)] pub enum EchoHandlerEvent { Success, }
接着就可以实现libp2p::swarm里所提供的ProtocolsHandler
接口了,
impl ProtocolsHandler for EchoHandler { type InEvent = Void; type OutEvent = EchoHandlerEvent; type Error = ReadOneError; type InboundProtocol = EchoProtocol; type OutboundProtocol = EchoProtocol; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol<EchoProtocol, ()> { SubstreamProtocol::new(EchoProtocol, ()) } fn inject_fully_negotiated_inbound(&mut self, stream: NegotiatedSubstream, (): ()) { if self.inbound.is_some() { panic!("already have inbound"); } log::debug!("ProtocolsHandler::inject_fully_negotiated_inbound"); self.inbound = Some(recv_echo(stream).boxed()); } fn inject_fully_negotiated_outbound(&mut self, stream: NegotiatedSubstream, (): ()) { if self.outbound.is_some() { panic!("already have outbound"); } log::debug!("ProtocolsHandler::inject_fully_negotiated_outbound"); self.outbound = Some(send_echo(stream).boxed()); } fn inject_event(&mut self, _: Void) { } fn inject_dial_upgrade_error(&mut self, _info: (), error: ProtocolsHandlerUpgrErr<Void>) { log::debug!("ProtocolsHandler::inject_dial_upgrade_error: {:?}", error); } fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Yes } fn poll(&mut self, cx: &mut Context<'_>) -> Poll< ProtocolsHandlerEvent< EchoProtocol, (), EchoHandlerEvent, Self::Error > > { if let Some(fut) = self.inbound.as_mut() { match fut.poll_unpin(cx) { Poll::Pending => { log::debug!("ProtocolsHandler::poll, inbound is some but pending..."); } Poll::Ready(Err(e)) => { log::error!("ProtocolsHandler::poll, inbound is some but resolve with error: {:?}", e); self.inbound = None; panic!(); } Poll::Ready(Ok(stream)) => { self.inbound = Some(recv_echo(stream).boxed()); return Poll::Ready(ProtocolsHandlerEvent::Custom(EchoHandlerEvent::Success)) } } } match self.outbound.take() { Some(mut send_echo_future) => { match send_echo_future.poll_unpin(cx) { Poll::Pending => { // The future has not yet finished. Make sure // to poll it again on the next iteration. self.outbound = Some(send_echo_future); }, Poll::Ready(Ok(_stream)) => { return Poll::Ready( ProtocolsHandlerEvent::Custom( EchoHandlerEvent::Success ) ) }, Poll::Ready(Err(e)) => { log::error!("ProtocolsHandler::poll, outbound is some but resolve with error: {:?}", e); panic!(); } } }, None => { if self.init_echo && !self.already_echo { self.already_echo = true; let protocol = SubstreamProtocol::new(EchoProtocol, ()); return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) } }, } Poll::Pending } }
当节点为dialer,handler在轮询(ProtocolsHandler::poll()
)时,需要返回包含EchoProtocol
实例的ProtocolsHandlerEvent::OutboundSubstreamRequest
,用于发起并协商连接使用的协议。如果协商成功,调用ProtocolsHandler::inject_fully_negotiated_outbound
,在这里我们将handler保存的outbount状态由None更新为Some(send_echo(stream).boxed())
,其中send_echo
接收协商好的IO stream,无错误发生时返回该stream。
const ECHO_SIZE: usize = 12; pub async fn send_echo<S>(mut stream: S) -> io::Result<S> where S: AsyncRead + AsyncWrite + Unpin { // mxinden: A bit of a hack. Likely nicer to do somewhere else. futures_timer::Delay::new(std::time::Duration::from_secs(3)).await; let payload = "hello world!"; log::info!("send_echo, preparing send payload: {:?}, in bytes: {:?}", payload, payload.as_bytes()); stream.write_all(payload.as_bytes()).await?; stream.flush().await?; let mut recv_payload = [0u8; ECHO_SIZE]; log::info!("send_echo, awaiting echo for {:?}", payload); stream.read_exact(&mut recv_payload).await?; log::info!("send_echo, received echo: {:?}", str::from_utf8(&recv_payload)); if str::from_utf8(&recv_payload) == Ok(payload) { Ok(stream) } else { Err(io::Error::new(io::ErrorKind::InvalidData, "Echo payload mismatch")) } }
我们接着看ProtocolsHandler::poll
里的实现,当outbound为Some,send_echo返回的future轮询的结果为Poll::Pending
时,更新outbound为self.outbound = Some(send_echo_future)
,保证下次轮询时依然有效,当结果为Poll::Ready
时返回相应的事件信息。
当节点为listener,连接中出现新的请求流时,自动调用ProtocolsHandler::listen_protocol
返回一个InboundUpgrade
的实例来协商流使用的协议。协商成功之后,调用inject_fully_negotiated_inbound
,其中一个参数为协商好的stream,在该方法内,将handler的inbound属性状态更新为Some(recv_echo(stream).boxed())
,recv_echo
方法的实现为,
pub async fn recv_echo<S>(mut stream: S) -> io::Result<S> where S: AsyncRead + AsyncWrite + Unpin { let mut payload = [0u8; ECHO_SIZE]; log::info!("recv_echo, waiting for echo..."); stream.read_exact(&mut payload).await?; log::info!("recv_echo, receive echo request for payload: {:?}", payload); stream.write_all(&payload).await?; stream.flush().await?; log::info!("recv_echo, echo back successfully for payload: {:?}", payload); Ok(stream) }
这里泛型S
需要满足futures_io提供的AsyncRead
和AsyncWrite
约束。
点对点网络就像一个蜂群(Swarm),而蜂群的整体行为是由单一个体的行为所组成的,单一个体的行为由一系列的规则所制定,此类的规则可以组合使用,在rust-libp2p中,规定的定义需要实现NetworkBehaviour
接口,这里我们先定义一个结构体,对规则的状态进行保存。
pub struct EchoBehaviour { events: VecDeque<EchoBehaviourEvent>, config: EchoBehaviourConfig, } pub struct EchoBehaviourConfig { init_echo: bool, } impl EchoBehaviour { pub fn new(config: EchoBehaviourConfig) -> Self { EchoBehaviour { events: VecDeque::new(), config, } } } #[derive(Debug)] pub struct EchoBehaviourEvent { pub peer: PeerId, pub result: EchoHandlerEvent, }
本结构体包含了与Swarm沟通的消息events
,行为定义所需要的初始配置。接着,就可以实现NetworkBehaviour
接口了,
impl NetworkBehaviour for EchoBehaviour { type ProtocolsHandler = EchoHandler; type OutEvent = EchoBehaviourEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { EchoHandler::new(self.config.init_echo) } fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> { Vec::new() } fn inject_connected(&mut self, _: &PeerId) { log::debug!("NetworkBehaviour::inject_connected"); } fn inject_disconnected(&mut self, _: &PeerId) { log::debug!("NetworkBehaviour::inject_disconnected"); } fn inject_event(&mut self, peer: PeerId, _: ConnectionId, result: EchoHandlerEvent) { log::debug!("NetworkBehaviour::inject_event"); self.events.push_front(EchoBehaviourEvent { peer, result }) } fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<Void, EchoBehaviourEvent>> { log::debug!("NetworkBehaviour::poll, events: {:?}", self.events); if let Some(e) = self.events.pop_back() { Poll::Ready(NetworkBehaviourAction::GenerateEvent(e)) } else { Poll::Pending } } }
当连接建立或者尝试去呼叫节点时会调用new_handler
,返回我们之前定义的handler即EchoHandler
,作为该连接的后台处理线程,behaviour和handler通过消息传递的机制进行通信,inject_event
可以把handler的消息传给behaviour,behaviour在poll的时候返回SendEvent
将消息传递给handler。
到这里,我们已经完成了一个简单的echo点对点通信协议,现在我们看一下main函数里如何使用。
fn main() -> Result<(), Box<dyn Error>> { env_logger::init(); // create a random peerid. let id_keys = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(id_keys.public()); log::info!("Local peer id: {:?}", peer_id); // create a transport. let transport = libp2p::build_development_transport(id_keys)?; let mut behaviour_config = EchoBehaviourConfig { init_echo: false }; // get the multi-address of remote peer given as the second cli argument. let target = std::env::args().nth(1); // if remote peer exists, the peer can initialize an echo request. if target.is_some() { behaviour_config = EchoBehaviourConfig { init_echo: true }; } // create a echo network behaviour. let behaviour = EchoBehaviour::new(behaviour_config); // create a swarm that establishes connections through the given transport // and applies the echo behaviour on each connection. let mut swarm = Swarm::new(transport, behaviour, peer_id); // if the remote peer exists, dial it. if let Some(addr) = target { let remote = addr.parse()?; Swarm::dial_addr(&mut swarm, remote)?; log::info!("Dialed {}", addr) } // Tell the swarm to listen on all interfaces and a random, OS-assigned port. Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; let mut listening = false; task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { loop { match swarm.poll_next_unpin(cx) { Poll::Ready(Some(event)) => log::info!("Get event: {:?}", event), Poll::Ready(None) => { log::info!("Swam poll next ready none"); return Poll::Ready(()) }, Poll::Pending => { if !listening { for addr in Swarm::listeners(&swarm) { log::info!("Listening on {}", addr); listening = true; } } return Poll::Pending } } } })); Ok(()) }
代码的简单说明如下:
- 通过
Keypair::generate_ed25519
生成用于节点间通信加密的密钥,其中的公钥可以派生出节点的PeerId
。 libp2p::build_development_transport
构建了开发常用的传输层,支持TCP/IP、WebSocket,使用noise协议作为加密层,yamux和mplex多路复用协议。- 解析传入参数,如果包含呼叫的节点信息,则是dialer(客户端),将构造behaviour的初始参数
init_echo
设置为true。 - 使用上面构造的传输层、behaviour、节点id,调用
Swarm::new(transport, behaviour, peer_id)
构造模拟网络的swarm。 - 当节点为dialer时,呼叫传入的远端节点
Swarm::dial_addr(&mut swarm, remote)?
,将该节点加入swarm节点池中。 - 对swarm进行轮询
swarm.poll_next_unpin(cx)
,如果有behaviour触发的消息,处理对应的消息。
小结,libp2p对点对点通信进行了高度的抽象,在开始接触这些概念时,容易摸不着头脑,需要不断去熟悉划分的层次和常用的协议;rust-libp2p的实现,针对libp2p定义的层次和协议,封装出了不同的接口,在开发自定义协议的同时,需要深入去了解这些抽象的接口及接口间通信的方式。总体来说,点对点通信开发的难度比传统的客户端-服务器通信形式高很多,libp2p的设计在于弥合这其中的一些痛点,但也还有很长的路要走,应用开发者需要更多地了解底层的机制才能更好的开发应用协议。目前,使用libp2p的应用包括IPFS,Substrate/Polkadot,Libra,Ethereum 2.0等等,接下来我们来了解下Substrate如何使用的libp2p。