%%% Version = 1.1 %% %% P2PKit Node. starts a P2PKit node with three basic services: %% %% __control: allows for shutdown, installation of further services %% %% __persist network: receives node information and updates OzStore %% as appropriate. %% %% __client_service: Receives and forwards messages from P2PKit clients %% %% Additionally it periodically sends its contact info to the persist service %% %% Author: Kevin Glynn (glynn@info.ucl.ac.be) %% functor import Application System OS DPInit Property Pickle P2PKitPeer at 'x-ozlib://keving/net/p2p/P2PKitPeer.ozf' Util at 'x-ozlib://keving/lib/Util.ozf' %DPStatistics Connection define NetworkSize = 6625109 %% Copied from P2P/Lib/GSChord/GSCParam.oz DefaultOzStoreTicketFile = 'http://www.info.ucl.ac.be/~glynn/OzStoreTicket' {Property.put 'dp.tcpHardLimit' 2000} {Property.put 'dp.tcpWeakLimit' 1800} Verbosity = {NewCell 0} %% To set verbosity on the fly. Trace = {Util.mkTrace cell(Verbosity) nothing} TraceInfo = {Util.mkTraceInfo cell(Verbosity) nothing} RMin RMax {OS.randLimits RMin RMax} %% If argument is error then raise it as an exception %% If success then return content fun {GetS R} thread case R of error(E) then {Trace 0 'GetS: Operation failed, returning failed value: '#E} {Value.failed E} [] success(S) then S end end end %% As GetS, but throws away any success value and kills app on exception proc {GetS_ R} thread case R of error(E) then {Trace 0 'GetS_: Operation failed, about to raise exception: '#E} raise E end [] success(...) then skip end end end Args = try {Application.getArgs record(network(single char:&n type:atom default:default) % P2P network name master(single char:&m type:bool default:false) peers(single char:&p type:int default:1) ozstoreticket(single type:atom default:DefaultOzStoreTicketFile) verbose(single char:&v type:int default:0) % verbosity help(single char:[&? &h] default:false) ) } catch _ then {System.showInfo 'Unrecognised arguments'} optRec(help:true) end if Args.help then {System.showInfo "Usage: "#{Property.get 'application.url'}#" [option]"} {System.showInfo "Options:"} {System.showInfo "\t"#"-m, --master"#"\t\t\t\t"#"Create New Network (default: false)"} {System.showInfo "\t"#"-p , --peers "#"\t\t\t"#"Start virtual peers (default: 1)"} {System.showInfo "\t"#"--ozstoreticket "#"\t\t"#"ozstore ticket file (default: "#DefaultOzStoreTicketFile#")"} {System.showInfo "\t"#"-n , --network "#"\t"#"Create/join "} {System.showInfo "\t"#"-v , --verbose "#"\t\t"#"Verbosity level"} {System.showInfo "\t"#"-h, --help"#"\t\t\t\t"#"This help"} {Application.exit 0} end %% Must set before first call to Trace ..... Verbosity := Args.verbose {TraceInfo 10 'pid: '#{OS.getPID}} {Trace 10 Args} OzStoreTicket = {Pickle.load Args.ozstoreticket} fun {OzStoreTake Retries} try {Connection.take OzStoreTicket} catch E then {Trace 5 'Failed to take OzStore Ticket'#E} if Retries =< 1 then raise E end else {Delay {OS.rand} mod 30} {OzStoreTake Retries-1} end end end proc {CreatePeer NodeId Master} Trace = {Util.mkTrace cell(Verbosity) just(NodeId)} TraceInfo = {Util.mkTraceInfo cell(Verbosity) just(NodeId)} NetParams = if Master then {P2PKitPeer.createWithConfig netConfig nodeConfig(nodeId: NodeId) apConfig @Verbosity} else %% Retrieve p2p network entry points from OzStore EntryPoints = {Map {GetS {Send {OzStoreTake 5} get(Args.network $)}} fun {$ _#(A#B#_)} A#B end} {System.gcDo} %% Drop connections to OzStore Server in {P2PKitPeer.joinWithConfig EntryPoints joinOnly nodeConfig(nodeId: NodeId) apConfig @Verbosity} end in case NetParams of error(E) then {System.showInfo 'Failed to '# if Master then 'create' else 'join' end# ' a p2p network'} {Trace 10 'Reason: '#E} raise virtual_peer_failed(E) end else %% OP2PS an instance of class P2PS.p2pServices %% MS Message Stream %% ES Event Stream %% CPTicket Ticket for Client Port success(OP2PS#MS#ES) = NetParams P2PNodeConfig = {OP2PS getNodeConfig($)} NodeId = P2PNodeConfig.nodeId %% Start receiving/processing messages MyPeer = {GetS {P2PKitPeer.makePeer OP2PS just(MS) @Verbosity}} CPTicket = {GetS {MyPeer getClientTicket}} in {TraceInfo 3 'In P2P network '#Args.network#' Node Id = '#NodeId#' ('# if P2PNodeConfig.useProxy then useproxy else fullmember end#')'} %% We should support some events (such as newsucc) in the notifier dictionary thread for E in ES do {Trace 10 event(case {Label E} of newft then {Record.filter E fun {$ F} F \= unit end} [] _ then E end)} end end %% comment out tracing of all messages received %% thread %% for M in MS do %% {Trace 10 msg(M)} %% end %% end local %% Update cache of live nodes (if we are the responsible ....) fun {PersistProcessorMkr Env} [Cell Time Record Connection Port OS System] = {Env resolve_module_list(['Cell' 'Time' 'Record' 'Connection' 'Port' 'OS' 'System'])} [Trace] = {Env resolve_proc_list(['Trace'])} [NodeId] = {Env resolve_constant_list(['NodeId'])} Previous = {Cell.new running_nodes} Current = {Cell.new running_nodes} fun {OzStoreTake Retries} try {Connection.take OzStoreTicket} catch E then {Trace 5 'Failed to take OzStore Ticket'#E} if Retries =< 1 then raise E end else {Time.delay {OS.rand} mod 10} {OzStoreTake Retries-1} end end end in thread proc {Loop} if {Cell.access Current} \= running_nodes then %% We received some live reports if {Cell.access Current} \= {Cell.access Previous} then %% Update the cache {Trace 5 update_cache(NodeId info:{Cell.access Current})} try {Port.send {OzStoreTake 5} put(Args.network {Record.toListInd {Cell.access Current}})} catch E then {Trace 0 'Could not update cache, exception: '#E} end {System.gcDo} %% Kill connections to OzStore server {Cell.assign Previous {Cell.access Current}} end %% Reset %% keving: comment out for now, never forget fallen comrades! %%Current := running_nodes end {Time.delay 45*1000} {Loop} end in {Time.delay 2*1000} %% So that our details have arrived {Loop} end desc(processor: proc {$ M} Msg = M.msg New Old = Current := New in %% Space Leak due to record arities :-( New = {AdjoinAt Old Msg.id (Msg.ip#Msg.pn#Msg.cpticket)} end) end BasicServices = builtin_services('__persist network':PersistProcessorMkr) in {Record.forAllInd BasicServices proc {$ Name ProcMkr} {GetS_ {MyPeer peer(id:NodeId ap:'__control' message:install_processors(Name ['initial'#{Pickle.pack ProcMkr}#false#true]))}} end} end %% Periodically send our AP to a central point thread {Wait CPTicket} %% CPTicket is calculated asynchronously. %% Make sure it is resolved before sending it. MyInfo = {GetS {MyPeer getPeerInfo}} ContactInfo = node(id:MyInfo.id ip:MyInfo.ip pn:MyInfo.pn cpticket:CPTicket) in for _ in [1 2 3 4 5 6 7 8] do %% for _ in [1 2] do Result = {MyPeer hash(value:'__persist network' ap:'__persist network' message:ContactInfo)} in thread case Result of error(E) then {Trace 3 'Sending details to persist service failed: '#E} else skip end end {Delay 30*1000} end end end end local %% Create a unique seed from fqdn/ip/pid/time UName = {OS.uName} DPStatIP = {Value.condSelect {DPInit.getSettings} ip "127.0.0.1"} DPStatPort = {Value.condSelect {DPInit.getSettings} port 9000} RawSeed = {GetS {P2PKitPeer.hash UName.nodename#UName.domainname# DPStatIP#DPStatPort# {OS.getPID}#{OS.time}}} Seed = (RawSeed mod (RMax - RMin)) + RMin {TraceInfo 3 'Seed: '#Seed} {OS.srand Seed} in for I in 1..Args.peers do NID = {Float.toInt {Int.toFloat {OS.rand}-RMin}/{Int.toFloat RMax - RMin}*{Int.toFloat NetworkSize - 1}} IsMaster = Args.master andthen I == 1 in {TraceInfo 3 '================================= Starting '# if IsMaster then 'Master' else 'Peer' end# ': '#NID#' =============================='} try {CreatePeer NID IsMaster} catch E then {Trace 5 'Failed to start peer, reason:'#E} end {Delay 5000} end end {TraceInfo 3 '=*==*==*==*==*==*==*==*==*==*==*= Created All Nodes =*==*==*==*==*==*==*==*==*==*='} end