P2PKit API ~~~~~~~~~~ Date: 16th March 2005 NOTES ~~~~~ errors: The P2PKit library API adopts a standard convention for returning results and reporting errors. If a call is successful then the result will be bound to a record success(R) where R is the output of the call. If there is no output (i.e., the call is a procedure) then the result is bound to the atom 'success'. If a call fails then the result is bound to a record \?error(E)?, where \?E? is a description of the error. This API definition does not (yet) list the possible errors each call can return, they are left as an exercise for the reader. synchronisation: Some calls are asynchronous. These calls may return before the result is known (maybe after sending a message to another process) and the result will be bound later, asynchronously, once it is known. In this document each operation will be marked synchronous or asynchronous in a comment when they are introduced. debugging: Calls to create/join a p2ps network and initialise a p2pkit node have a Verbose argument. This is an integer that specifies the level of tracing information that should be written by the node to the output. If this argument is 0 then no tracing information is written. Currently setting this to 10 or higher will print ALL available tracing information. You will have to look at the source to see what intermediate values will do! Creating/Joining a P2PS network ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ To become a peer in a P2PKit network we must first either create a new P2PS network or join an existing one. The P2PKitPeer functor provides procedures to help applications create or join a P2PS network. Successful calls return the necessary P2PS entities (a handle to the P2PS subsystem, this peer's message stream, this peer's event stream) for initialising a P2PKit peer. proc {P2PKitPeer.create %% synchronous operation Verbose %% Verbosity level (0=quiet, 10=very verbose) %% Out: Result %% success(OP2PS # MS # ES) | error(E) %% OP2PS: P2P Object %% MS: Message Stream %% ES: Event Stream } %% JoinType is %% joinOnly - join existing network, or fail %% createJoin - join existing network, or create new one proc {P2PKitPeer.join %% synchronous operation EntryPoints %% List of [IP#Port Number] JoinType %% see above comment Verbose %% Verbosity level (0=quiet, 10=very verbose) %% Out: Result %% success(OP2PS # MS # ES) | error(E) %% OP2PS: P2P Object %% MS: Message Stream %% ES: Event Stream } The operation of P2PKitPeer.join depends on JoinType: joinOnly: EntryPoints contains a list of IP address, Port Number pairs, these are addresses for peers already in the network. P2PKitPeer.join randomises this list and then tries each in turn until it succeeds in entering the network. If none of the entries is successful then the call fails. createJoin: Like joinOnly, but if all entries fail then we create a new network. If the call was successful the final parameter, Result, is set to a record success(OP2PS # MS # ES), where OP2PS is the handle for the P2PS subsystem, and MS, ES, are this peer's message and event streams respectively. The Verbose argument has effect only during the creation / joining of the P2PS network. Once the call returns it has no further effect on tracing. Initialising a P2PKit Peer ~~~~~~~~~~~~~~~~~~~~~~~~~~ The P2PKitPeer functor provides a procedure to initialise the P2PKit system proc {P2PKitPeer.init %% synchronous operation OP2PS %% P2PS Object OptMS %% just(MS) | nothing %% MS: P2PS Message Stream Verbose %% Out: Result %% success(CTicket) | error(E) %% CTicket: Client Ticket } It takes the P2P object and, optionally, the message stream returned by the P2PS Create or Join procedures. If successful it returns success(CTicket), otherwise error(E) where E is a description of the error encountered. If OptMS is the atom 'nothing' then P2PKitPeer.init gets the current message stream from OP2PS, if it is just(MS) (i.e. a record with label just and an argument MS) then it takes MS as the message stream. Passing in the message stream returned from the P2PS joining routine avoids a race condition whereby messages that arrive between joining the network and calling P2PKitPeer.init are lost. The Verbose argument sets the tracing level for all future operations in this P2PKit peer. Currently, it is not possible to alter the tracing level later, but this will be possible in future releases. The ticket CTicket is a string that uniquely references an entity inside the peer that accepts client connections. Its use is described later. Once a peer has successfully initialised its P2PKit subsystem it can start receiving and sending messages to access points on its peers. The next section describes the operations provided by the P2PKitPeer functor that are used by peer services. P2PKit Peer Operations ~~~~~~~~~~~~~~~~~~~~~~ proc {P2PKitPeer.getStream %% synchronous operation AccessPoint %% Name of access point %% Result %% success(MessageStream) | error(E) } GetStream returns the current end of message stream associated with AccessPoint. All future messages received at the local peer for this access point will appear on this stream. Received messages are records with three fields src (set to client or peer(PeerId) depending on message source), dst (the key or peer id the message was originally sent to, for a broadcast message this is always the receiving peer's id), and msg (the message as sent). proc {P2PKitPeer.sendPeer %% asynchronous operation PeerId %% Peer to send message to AccessPoint %% Name of access point Message %% Message .... %% Out Result %% success | error(E) } P2PKitPeer.sendPeer sends Message to AccessPoint on peer PeerId. If there is no peer with identifier PeerId, or message delivery fails, then an error will be returned. Note that this is an asynchronous call. Result will not be bound until the sender knows whether the message has been successfully delivered. However, the call will return immediately the message has been handed to P2PS. proc {P2PKitPeer.sendKey %% asynchronous operation Key %% Send to peer responsible for Key AccessPoint %% Name of access point Message %% Message .... %% Out Result %% success | error(E) } P2PKitPeer.sendKey sends Message to AccessPoint on the peer responsible for Key. Key is a positive integer. Due to current P2PS limitations, P2PKitPeer.sendKey is not reliable and the message may be silently dropped, even though success is returned. proc {P2PKitPeer.rpcPeer %% asynchronous operation PeerId %% Peer to send message to AccessPoint %% Name of access point Message %% Message .... %% Out Result %% success(Reply) | error(E) } proc {P2PKitPeer.rpcKey %% asynchronous operation Key %% Peer to send message to AccessPoint %% Name of access point Message %% Message .... %% Out Result %% success(Reply) | error(E) } These are Remote Procedure Call variants of sendPeer and sendKey. The call adds an extra field, replyto, is added to the sent Message. replyto is bound to a record containing two fields id and ap (ap is a fresh access point created on the fly by the call). The receiving message processor should send its reply to this id and ap, e,g, suppose InMsg is bound to the received message in the receiving processor: . . . . . %% Calculate Response . . . . . {P2PKitPeer.sendPeer InMsg.replyto.id InMsg.replyto.ap Response Res} . . . . . The first message received in reply will be bound to the result of the rpc call, i.e., success(Reply). This assumes that there is only one message processor replying. In any case, all responses after the first are silently ignored. proc {P2PKitPeer.broadcast %% asynchronous operation AccessPoint %% Name of access point Message %% Message ..... %% Out Result %% success | error(E) } Broadcast sends Message to AccessPoint on all peers in the network. Due to current P2PS limitations, Broadcast is not reliable. It is possible that some peers will not receive the message, even if success is returned. proc {P2PKitPeer.getPeerInfo %% synchronous operation %% Out Result %% success(NodeInfo) | error(E) } GetPeerInfo returns a record with various statistics about the local peer. These include: id: The local peer id ip: The local peer's IP address pn: The local peer's Port Number The rest of the statistics are from the P2PS getStatistics call. proc {P2PKitPeer.hash %% synchronous operation Entity %% A stateless Oz entity (that can be pickled) %% Out Result %% success(Hash) | error(E) } Hash is a common hashing function (an implementation of the MD5 message-digest algorithm). It will hash any stateless Oz entity to an integer. proc {P2PKitPeer.pushMessages %% synchronous operation MsgList %% List of Messages %% Out Result %% success | error(E) } This routine can only be called by a message processor. It pushes the list of messages, MsgList, onto the front of the message processor's pending Message Stream. That is the messages in MsgList will be the next messages to be processed by this message processor. (This is useful for message processors that save received messages until some condition is true. Once the condition is true they can push the saved messages onto the front of their own message stream and then reprocess them as if they had just arrived). P2PKit Clients ~~~~~~~~~~~~~~ In order to interact with a P2PS network it is necessary to be in the P2PS network. That is, an application must become a full peer in the network. This is often undesirable. Firstly, an application using the network might only enter the network for a short time (e.g., while it performs a query). Such short-lived members disrupt the smooth operation of routing in the network. Secondly, each full peer must be able to offer the full services of the network. We solve this problem by introducing P2PKit clients. A P2PKit client uses an existing peer in the network as a proxy. The proxy initiates requests for P2PKit services on behalf of the client and forwards responses back to the client. The client has a handle which it uses to communicate with the proxy peer. The proxy peer has one or more proxy access points, messages received from peers on these access points are forwarded to message streams in the client. Becoming a P2PKit Client ~~~~~~~~~~~~~~~~~~~~~~~~ The P2PKitClient functor provides a procedure for clients to connect to a P2PS peer via the CTicket returned by the P2PAccessPoint.init procedure described previously. proc {P2PKitClient.makeClient %% synchronous ClientTickets %% [Client Ticket] Verbose %% Out: Result %% success(NWHandle) | error(E) %% NWHandle: P2PKit Network Handle } ClientTickets is a list of Client Tickets, as before this list is randomised and the prospective client attempts to connect to each in turn until it is successful or fails. If the call is successful NWHandle is a handle to the proxy peer. This handle is used to install / upgrade services, get the proxy peer's id and set up proxy services: {NWHandle getPeerId( %% Out PeerId %% Set to the Proxy Peers Id )} This call binds PeerId to our proxy peer's id. {NWHandle makeProxy( %% Out proxy: ProxyService %% o(id: ProxyId ap: ProxyAP) stream: MsgStream %% Set to the Proxy Peers Id )} This creates a fresh proxying service. Any messages sent by peers to peer ProxyId and access point ProxyAP will be forwarded to the client and appear on the MsgStream message stream. To use this the client includes the ProxyService in messages it sends to peers. Those peers can send replies to the client via the id and ap in the ProxyService record. {NWHandle clientOfService( name: AccessPoint %% Name of Access Point %% Out: handle: %% success(ServiceHandle) | error(E) %% ServiceHandle: Handle for Service )} This creates a service handler (described below) for an existing service in the P2PKit network. {NWHandle installService( maker: ServiceProc %% Service Maker name: AP <= {NewName} %% Name of Access Point instance: Inst <= 1 %% Message Processor instance upgradeable: Upg <= true %% Upgradeable? sharable: Share <= true %% Sharable? %% Out: handle: %% success(SHandle) | error(E) %% SHandle: Handle for Service )} This installs a new message processor for an access point AP on all nodes in the network. If the message processor already exists (for that AP and with the same instance name) then it is upgraded by this call. If the name parameter is not given then the service is installed with a fresh name. Then the service can only be accessed / upgraded via this SHandle. (This may change in the future if P2PKit supports an interface to query running services. Though I expect we will add an option to allow services to remain hidden). The call can specify whether the service is upgradeable (if it is not upgradeable then attempts to upgrade it are silently ignored). The default is true. The call can specify whether the service is sharable, i.e., if it can be passed to other peers if they ask for it (Lazy loading of services). The default is true. The call can name the message processor to be installed / upgraded. the default name is 1. This is in contrast to the service name, which is a fresh name by default. A message processor is a procedure or function that processes each message recived at an access point in turn. It can be stateful or state-free. A state-free processor is a procedure that takes a message as argument, processes it and terminates. It will then be called again on the next message received. A stateful processor is a function that takes two arguments, a message and an input state, processes the message and returns a new output state. This output state will be the input state when the next message is processed. In this way the processor can have knowledge of its previous actions. For example, the state may be a simple counter of the number of messages processed. The ServiceProc is a function run on each peer when the processor is installed / upgraded which returns a descriptor. The descriptor contains up to three fields: initialState: If the processor is stateful then this is the initial state. **This field must be present if and only if the processor is stateful.** the initialState is used when the processor is first started, it is not used if we are upgrading an existing processor. upgradeState: Used only when upgrading an existing, stateful, message processor. This function transforms the current value of its state into a format suitable for the updated processor. If this field is not present then no state transformation is done. processor: The actual processor, this field must be present. If the initialState field is present then the processor must be a two argument function (taking a Message and InputState, returning OutputState), otherwise it must be a single argument procedure (taking InputMessage). The client communicates with services in the P2PKit network by the SHandles returned by clientOfService and installService. Communications go to the access point specified when the SHandle was created. {SHandle peer( %% asynchronous id: PeerId %% message: Msg %% The message .... rpc: IsRPC <= false %% Is this an RPC? ) %% Out:: Result %% success(R) | error(E) %% R if RPC } Sends message to the peer with id PeerId. If rpc is false (default) then result is either success or error(E). If rpc is true then a replyto field will be added to the outgoing message and the first reply recieved will be returned as the argument of success(). Compare with P2PKitPeer.rpcPeer. {SHandle key( %% asynchronous key: Key %% message: Msg %% The message .... rpc: IsRPC <= false %% Is this an RPC? ) %% Out:: Result %% success(R) | error(E) %% R if RPC } As {SHandle peer(...)} except key is an integer key and the message is sent to the peer responsible for that key. {SHandle hash( %% asynchronous value: OzValue %% message: Msg %% The message .... rpc: IsRPC <= false %% Is this an RPC? ) %% Out:: Result %% success(R) | error(E) %% R if RPC } Same as {SHandle key(...)} but value is first converted to an integer key using the same hash function as P2PKitPeer.hash, as described previously. {SHandle broadcast( %% asynchronous message: Msg %% The message .... ) %% Out:: Result %% success | error(E) } Broadcasts message to all nodes in the P2PKit network. {SHandle updateService( %% asynchronous maker: ServiceProc %% Service Maker instance: Inst <= 1 %% Message Processor instance upgradeable: Upg <= true %% Upgradeable? sharable: Share <= true %% Sharable? ) %% Out:: Result %% success | error(E) } Upgrades or installs a message processor via an existing SHandle. See documentation for {NWHandle installService(...)}.