%%% Version = 1.2 declare Home={OS.getEnv 'HOME'} User={OS.getEnv 'USER'} %User='glynn' %OzStoreTicketFile = Home#'/public_html/quaver_ozstore' %OzStoreTicketFile = Home#'/public_html/OzStoreTicket_plab' OzStoreTicketFile = 'http://www.info.ucl.ac.be/~'#User#'/OzStoreTicket' %OzStoreTicketFile = '/tmp/OzStoreTicket' %OzStoreTicketFile = '/home/keving/public_html/quaver_ozstore' %Network=quaver %Network=durer Network=keving %Network='benoit' Verbose=10 %% %% [P2PClient Util] = {Module.link ['x-ozlib://keving/net/p2p/P2PKitClient.ozf' 'x-ozlib://keving/lib/Util.ozf']} %% If argument is error then raise it as an exception %% If success then return content GetS = Util.getS %% As GetS, but throws away any success value and kills app on exception GetS_ = Util.getS_ % %% Retrieve p2p network entry points from OzStore % ClientEntryPoints = {Map {GetS {Send {Connection.take {Pickle.load OzStoreTicketFile}} get(Network $)}} fun {$ _#(_#_#C)} C end} declare % %% Connect to Peer Network % NWHandle = {GetS {P2PClient.makeClient ClientEntryPoints Verbose}} {Property.put 'print.width' 1000} {Property.put 'print.depth' 1000} try {Wait NWHandle} {System.showInfo 'joined network'} catch E then {System.show 'failed to join network:'#E} end %% Reply to a ping message with a pong declare local fun {PingPongMaker Env} MyPeer = {Env resolve_peer} [System] = {Env resolve_module_list(['System'])} [SetColour GetColour MkGetS_ TraceInfo] = {Env resolve_proc_list(['SetColour' 'GetColour' 'MkGetSTrace_' 'TraceInfo'])} GetS_ = {MkGetS_ 'ERROR - PingPong' 0} in {System.show 'XXXXXXXXXX Installing PingPong Processor XXXXXXXXX'} {SetColour if {GetColour} == red then blue else red end} %% Message will contain a servicename to send response to desc(processor: proc {$ M} Msg = M.msg in case Msg of ping(pong:PongMsg) then {TraceInfo 9 'Replying to PING Request with '#PongMsg} {SetColour if {GetColour} == green then yellow else green end} {GetS_ {MyPeer reply(replyToMessage:M message:PongMsg)}} [] _ then {GetS_ {MyPeer reply(replyto:M message:unknownRequest(message: M))}} end end) end in PingPong = {GetS {NWHandle installService(maker: PingPongMaker)}} PingPongAP = {GetS {PingPong getServiceName}} end {Inspect {GetS {PingPong key(id: 4200000 message:ping(pong:pong) rpc:true)}}} %% %% flash: Changes the node's colour every 2 seconds %% local fun {FlashMaker Env} [System] = {Env resolve_module_list(['System'])} SetColour = {Env resolve_proc('SetColour')} proc {Loop} {SetColour yellow} {Delay 2*1000} {SetColour blue} {Delay 2*1000} {Loop} end in {System.show 'XXXXXXXXXX Installing Flash XXXXXXXXX'} thread {Loop} end desc(processor: proc {$ _} skip end) end in {GetS_ {NWHandle installService(maker: FlashMaker)}} end %% %% node_details sends the node's id and fully qualified domain name to %% the service in the incoming message %% declare local fun {NodeDetailsMaker Env} MyPeer = {Env resolve_peer} [OS System] = {Env resolve_module_list(['OS' 'System'])} NodeId = {Env resolve_constant('NodeId')} [SetColour MkGetS_] = {Env resolve_proc_list(['SetColour' 'MkGetSTrace_'])} GetS_ = {MkGetS_ 'ERROR - NodeDetails' 0} in {System.show 'XXXXXXXXXX Installing Node Details Processor XXXXXXXXX'} {SetColour red} %% Message will contain a servicename to send response to desc(processor: proc {$ M} {System.show 'XXXXXXXXXX Send my Details XXXXXXXXX'} %% Don't worry if message fails {GetS_ {MyPeer reply(replyToMessage:M message:NodeId#{OS.uName})}} end) end in Details = {GetS {NWHandle installService(maker: NodeDetailsMaker)}} end %% Make a port for replies declare ResponseStream ResponseAP = {GetS {NWHandle makeProxy(stream: ResponseStream)}} {GetS_ {Details broadcast(message: msg(replyto:ResponseAP))}} for Id#Nm in ResponseStream do {System.show Id#Nm} end declare Snapshot = {NewCell nil} for I in ResponseStream do Snapshot := I|@Snapshot end for Id#Nm in {List.sort @Snapshot fun {$ I#_ J#_} I < J end} do {System.showInfo Id#'\t'#Nm.nodename} end {Browse {Length @Snapshot}} {Browse {GetS {Details broadcast(message:null rpc:true)}}} local Resp = {GetS {Details hash(value:kevin message:null rpc:true)}} in {Wait Resp} {Show Resp} end %% Flip Colour declare local fun {FlipMkr Env} [SetColour GetColour] = {Env resolve_proc_list(['SetColour' 'GetColour'])} [System] = {Env resolve_module_list(['System'])} {System.show 'XXXXXXXXXX Installing Flip XXXXXXXXX'} in desc(processor: proc {$ _} OldC = {GetColour} NewC = if OldC == 'blue' then 'red' else 'blue' end in {System.showInfo 'XXXXXXXXXX Flip '#OldC#' To '#NewC#' XXXXXXXXX'} {SetColour NewC} end) end in Flipper = {GetS {NWHandle installService(name:flip maker:FlipMkr)}} end {GetS_ {Flipper broadcast(message:null)}} %% Swallows all messages declare fun {SwallowMkr _} desc(processor: proc {$ _} skip end) end Swallow = {GetS {NWHandle installService(name: swallow maker: SwallowMkr)}} %% %% Pass a message through a chain of nodes given as argument. %% declare local fun {ChainMaker Env} MyPeer = {Env resolve_peer} [OS System] = {Env resolve_module_list(['OS' 'System'])} NodeId = {Env resolve_constant('NodeId')} MyAp = {Env get_ap_name} [SetColour MkGetS_] = {Env resolve_proc_list(['SetColour' 'MkGetSTrace_'])} GetS_ = {MkGetS_ 'ERROR - ChainMaker' 0} in {System.show 'XXXXXXXXXX Installing Chain Processor XXXXXXXXX'} %% Message will contain a servicename to send response to desc(processor: proc {$ M} message(chain: Rest colour: NewColour endmsg: EndMsg replyto: EndAP) = M.msg in {System.show 'XXXXXXXXXX Chain XXXXXXXXX'} {SetColour NewColour} case Rest of nil then Src = EndAP.id AP = EndAP.ap in {GetS_ {MyPeer peer(id:Src ap:AP message:EndMsg)}} [] I | Is then {GetS_ {MyPeer key(id:I ap:MyAp message:message(chain: Is colour: NewColour endmsg: EndMsg replyto: EndAP))}} end end) end in Chain = {GetS {NWHandle installService(maker: ChainMaker)}} end %% Make a port for replies declare ResponseStream ResponseAP = {GetS {NWHandle makeProxy(stream: ResponseStream)}} MyNode = {GetS {NWHandle getPeerId}} {GetS_ {Chain peer(id: MyNode message: message(chain: [3515222 1886999 5222000] colour: yellow endmsg: done replyto: ResponseAP))}} {ForAll ResponseStream System.show} {Browse ResponseStream} %% count number of nodes in a network by passing a token through %% the successor chain declare local fun {CountMaker Colour} fun {$ Env} MyPeer = {Env resolve_peer} success(OP2PS) = {MyPeer getP2PSHandle} [System OS] = {Env resolve_module_list(['System' 'OS'])} NodeId = {Env resolve_constant('NodeId')} [SetColour GetColour MkGetS_ CWDistance] = {Env resolve_proc_list(['SetColour' 'GetColour' 'MkGetSTrace_' 'CWDistance'])} CountService = {Env get_ap_name} MaxNetSize = {OP2PS getNetConfig($)}.maxNetSize GetS_ = {MkGetS_ 'ERROR - Count Through Successors' 0} in {System.show 'XXXXXXXXXX Installing Count Nodes Processor XXXXXXXXX'} {SetColour Colour} %% Message will contain a servicename to send response to desc(processor: proc {$ M} Msg = M.msg in {SetColour Msg.colour} %{Delay 500} {System.show 'XXXXXXXXXX Add to counter and pass it on XXXXXXXXX'#{GetColour}} case Msg of %% Got back home counter(startId: Src counter: N progress: _ replyto: Reply colour: _) andthen N > 0 andthen Src == NodeId then {GetS_ {MyPeer peer(id:Reply.id ap:Reply.ap message:result(N))}} [] counter(startId: Src counter: N progress: Prog replyto: Reply colour: C) then SuccId = {OP2PS getSucc($)} HomeDist = {CWDistance Src} SuccDist = {CWDistance SuccId} in {System.showInfo 'XXXXXXXXXX HomeDist = '#HomeDist#' SuccDist = '#SuccDist#' XXXXXXXXX'} if SuccId == NodeId then %% (OP2PS returns NodeId for successor when Succ == s {GetS_ {MyPeer peer(id:Reply.id ap:Reply.ap message:result(bad_successor))}} elseif HomeDist >= SuccDist orelse HomeDist == 0 then {GetS_ {MyPeer peer(id:Prog.id ap:Prog.ap message:NodeId#{String.toAtom {OS.uName}.nodename}#SuccId)}} %% Send to successor node {GetS_ {MyPeer peer(id:SuccId ap:CountService message:counter(startId: Src counter: N+1 progress: Prog replyto: Reply colour:C))}} else %% Originating node has disappeared {GetS_ {MyPeer peer(id:Reply.id ap:Reply.ap message:result(lost_home))}} end [] _ then %% Unrecognised message skip end end) end end in Count = {GetS {NWHandle installService(maker: {CountMaker red})}} end %% Make a port for replies declare ResponseStream ResponseAP = {GetS {NWHandle makeProxy(stream: ResponseStream)}} ProgressStream ProgressAP = {GetS {NWHandle makeProxy(stream: ProgressStream)}} MyPeerId = {GetS {NWHandle getPeerId}} thread for Id#Nm#Succ in ProgressStream do {System.showInfo Id#'\t -> '#Succ#'\t'#Nm} end end thread for result(N) in ResponseStream do {System.showInfo "There are "#N#" nodes in the network"} end end {GetS_ {Count peer(id: MyPeerId message: counter(startId: MyPeerId counter: 0 progress: ProgressAP replyto: ResponseAP colour: blue))} } %% gui_details sends the node's id and fully qualified domain name to %% the service in the incoming message %% declare local fun {GuiDetailsMaker Env} PeerHandler = {Env resolve_peer} success(OP2PS) = {PeerHandler getP2PSHandle} [OS System Record] = {Env resolve_module_list(['OS' 'System' 'Record'])} [NodeId] = {Env resolve_constant_list(['NodeId'])} [GetColour SetColour AddNotification MkGetS_] = {Env resolve_proc_list(['GetColour' 'SetColour' 'AddNotification' 'MkGetSTrace_'])} GetS_ = {MkGetS_ 'ERROR - GUI Details' 0} in {System.show 'XXXXXXXXXX Installing Node Details Processor XXXXXXXXX'} %% Message will contain a servicename to send response to desc(processor: proc {$ M} LAP = {OP2PS getLocalAP($)} in {System.show 'XXXXXXXXXX Send my Details XXXXXXXXX'} if {GetColour} == yellow then {SetColour red} else {SetColour yellow} end {GetS_ {PeerHandler reply(replyToMessage: M message:details(id:NodeId uname:{String.toAtom {OS.uName}.nodename} fingertable: {Record.filter {OP2PS getFT($)} fun {$ V} V \= unit end} pred: {OP2PS getPred($)} succ: {OP2PS getSucc($)} colour : {GetColour} messageStats: {OP2PS getStatistics($)} ap: ap(ip: {String.toAtom LAP.ip} pn: LAP.pn) ))}} end) end in Details = {GetS {NWHandle installService(name:gui_details maker:GuiDetailsMaker)}} end %% Make a port for replies declare ResponseStream ResponseAP = {GetS {NWHandle makeProxy(stream: ResponseStream)}} {GetS_ {Details broadcast(message: msg(replyto:ResponseAP) ranges:[0#100000 3000000#5000000])}} {ForAll ResponseStream System.show} {System.showInfo 'Node\tPred\tSucc\tName'} for M in ResponseStream do case M of details(id: Id pred: P succ: S uname: Nm ...) then {System.showInfo Id#'\t'#P#'\t'#S#'\t'#Nm} [] notify(id:Id msg:newcolour(C)) then {System.showInfo Id#' is now '#C} end end %% Count number of requests received declare local fun {CountPingsMaker Env} PeerHandler = {Env resolve_peer} [System] = {Env resolve_module_list(['System'])} NodeId = {Env resolve_constant('NodeId')} [MkGetS_] = {Env resolve_proc_list(['MkGetSTrace_'])} GetS_ = {MkGetS_ 'ERROR - CountPings Fixed' 0} {System.showInfo 'XXXXXXXXXX Installing Ping Counter XXXXXXXXX'} in desc(initialState: 0 processor: fun {$ M Count} NewCount = Count+1 in {System.showInfo 'XXXXXXXXXX Ping '#Count#' XXXXXXXXX'} {GetS_ {PeerHandler reply(replyToMessage: M message:NodeId#NewCount)}} NewCount end) end in CountPingsFixed = {GetS {NWHandle installService(maker: CountPingsMaker upgradeable: false)}} end %% Make a port for replies declare ResponseStream ResponseAP = {GetS {NWHandle makeProxy(stream: ResponseStream)}} {GetS_ {CountPingsFixed broadcast(message: msg(replyto:ResponseAP))}} {FoldL ResponseStream fun {$ Line Id#Count } {System.showInfo Line#'\t'#Count#'\t'#Id} Line+1 end 1 _} %% Count number of requests received (mutable) declare local fun {CountPingsMaker Env} PeerHandler = {Env resolve_peer} [System] = {Env resolve_module_list(['System'])} NodeId = {Env resolve_constant('NodeId')} [MkGetS_] = {Env resolve_proc_list(['MkGetSTrace_'])} GetS_ = {MkGetS_ 'ERROR - CountPings Fixed' 0} {System.showInfo 'XXXXXXXXXX Installing Ping Counter XXXXXXXXX'} in desc(initialState: 0 processor: fun {$ M Count} NewCount = Count+1 in {System.showInfo 'XXXXXXXXXX Ping '#Count#' XXXXXXXXX'} {GetS_ {PeerHandler reply(replyToMessage: M message:NodeId#NewCount)}} NewCount end) end in CountPingsMutable = {GetS {NWHandle installService(maker: CountPingsMaker upgradeable: true)}} end %% Make a port for replies declare ResponseStream ResponseAP = {GetS {NWHandle makeProxy(stream: ResponseStream)}} {GetS_ {CountPingsMutable broadcast(message: msg(replyto:ResponseAP))}} {FoldL ResponseStream fun {$ Line Id#Count } {System.showInfo Line#'\t'#Count#'\t'#Id} Line+1 end 1 _} %% Upgrade Service local fun {CountPingsMaker Env} PeerHandler = {Env resolve_peer} [System] = {Env resolve_module_list(['System'])} NodeId = {Env resolve_constant('NodeId')} [MkGetS_] = {Env resolve_proc_list(['MkGetSTrace_'])} GetS_ = {MkGetS_ 'ERROR - CountPings Fixed' 0} {System.showInfo 'XXXXXXXXXX Installing Ping Counter XXXXXXXXX'} in desc(initialState: 0 upgradeState: fun {$ _} 100 end processor: fun {$ M Count} NewCount = Count+5 in {System.showInfo 'XXXXXXXXXX Ping '#Count#' XXXXXXXXX'} {GetS_ {PeerHandler reply(replyToMessage: M message:NodeId#NewCount)}} NewCount end) end in {GetS_ {CountPingsMutable updateService(maker: CountPingsMaker upgradeable: true)}} end %% Install a service from within a peer %% First we create a service to install services: declare local fun {ServiceInstallerMaker Env} PeerHandler = {Env resolve_peer} [System] = {Env resolve_module_list(['System'])} [MkGetS_] = {Env resolve_proc_list(['MkGetSTrace_'])} GetS_ = {MkGetS_ 'ERROR - Service Installer' 0} {System.showInfo 'XXXXXXXXXX Installing Service Installer XXXXXXXXX'} in desc(processor: proc {$ M} Mkr = M.msg.maker SName = {Value.condSelect M.msg name {NewName}} BCast = {Value.condSelect M.msg broadcast true} in {System.showInfo 'XXXXXXXXXX Installing Service '# if {IsName SName} then '' else SName end#' XXXXXXXXX'} {GetS_ {PeerHandler installService(maker: Mkr name: SName broadcast: BCast)}} {GetS_ {PeerHandler reply(replyToMessage: M message:SName)}} end) end in Installer = {GetS {NWHandle installService(maker: ServiceInstallerMaker)}} end %% Now install a ping pong service: declare local fun {PingPongMaker Env} MyPeer = {Env resolve_peer} [System] = {Env resolve_module_list(['System'])} [SetColour GetColour MkGetS_ TraceInfo] = {Env resolve_proc_list(['SetColour' 'GetColour' 'MkGetSTrace_' 'TraceInfo'])} GetS_ = {MkGetS_ 'ERROR - PingPong' 0} in {System.show 'XXXXXXXXXX Installing PingPong Processor XXXXXXXXX'} {SetColour if {GetColour} == red then blue else red end} %% Message will contain a servicename to send response to desc(processor: proc {$ M} Msg = M.msg in case Msg of ping(pong:PongMsg) then {TraceInfo 9 'Replying to PING Request with '#PongMsg} {SetColour if {GetColour} == green then yellow else green end} {GetS_ {MyPeer reply(replyToMessage:M message:PongMsg)}} [] _ then {GetS_ {MyPeer reply(replyto:M message:unknownRequest(message: M))}} end end) end in PName = {GetS {Installer key(id: 0 message:service(name: lping maker: PingPongMaker) rpc:true)}} PClient = {GetS {NWHandle clientOfService(name: PName)}} end {System.show PName} {Browse {GetS {PClient key(id: 0 message:ping(pong:loc_pong) rpc:true)}}} %% Misc stuff declare BadClient = {GetS {NWHandle clientOfService(name: xxx handle: $)}} {GetS_ {BadClient broadcast(message: msg)}} declare PingClient = {GetS {NWHandle clientOfService(name: ping handle: $)}} {Browse {GetS {PingClient key(key: 0 message: msg rpc: true)}}} %% Test out Failure Detector declare FDS = {GetS {NWHandle clientOfService(name: failureDetector)}} {Browse FDS} declare ResponseStream ResponseAP = {GetS {NWHandle makeProxy(stream: ResponseStream)}} {GetS_ {FDS peer(message: watch(w_id:4488436 notify:ResponseAP))}} {Browse ResponseStream} declare Nodes = {Map {List.take ResponseStream 7} fun {$ R} R.1 end} for I in Nodes do {GetS_ {FDS peer(message: watch(w_id:I notify:ResponseAP))}} end %% Sample for pushMessages. %% The Var starts off unbound, any read messages are delayed until %% the first set message. declare local fun {VarMkr Env} MyPeer = {Env resolve_peer} [System] = {Env resolve_module_list(['System'])} [SetColour GetColour MkGetS_] = {Env resolve_proc_list(['SetColour' 'GetColour' 'MkGetSTrace_'])} GetS_ = {MkGetS_ 'ERROR - Var' 0} UnBound = unb %%{NewName} in {System.show 'XXXXXXXXXX Installing Var Processor XXXXXXXXX'} {SetColour if {GetColour} == red then blue else red end} %% Message will contain a servicename to send response to desc(processor: fun {$ M State#StoredMsgs} {System.show '********* HERE *********'} Msg = M.msg in {System.show 'Var Service got '#Msg} case Msg of set(V) then {GetS_ {MyPeer pushMessages(messages: {Reverse StoredMsgs})}} V#nil [] get andthen State == UnBound then %% Remember Message State#(M|StoredMsgs) [] get then {GetS_ {MyPeer reply(replyToMessage:M message:State)}} State#StoredMsgs [] mkUnBound then UnBound#StoredMsgs else {GetS_ {MyPeer reply(replyto:M message:unknownRequest(message: M))}} State#StoredMsgs end end initialState: UnBound#nil) end in VarService = {GetS {NWHandle installService(maker:VarMkr)}} end for _ in 1 .. 10 do {Browse {GetS {VarService key(id:0 message:get rpc:true)}}} end {GetS_ {VarService key(id:0 message:set(33))}} %% Test out Failure Detector %% BROKEN for now. %% Exercise the network by flooding the pinger service %% local %% fun {PingerMaker Env} %% MyPeer = {Env resolve_peer} %% success(OP2PS) = {MyPeer getP2PSHandle} %% [System] = %% {Env resolve_module_list(['System'])} %% NodeId = {Env resolve_constant('NodeId')} %% [SetColour GetColour] = {Env resolve_proc_list(['SetColour' 'GetColour'])} %% PingerService = {Env get_ap_name} %% MaxNetSize = {OP2PS getNetConfig($)}.maxNetSize %% in %% {System.show 'XXXXXXXXXX Installing Pinger Processor XXXXXXXXX'} %% {SetColour green} %% %% Message will contain a servicename to send response to %% desc(initialState: 0 %% processor: fun {$ M InState} %% Msg = M.msg %% in %% case Msg of %% getState then %% _ = {MyPeer reply(replyto:M message:InState)} %% InState %% [] setState(newstate: NewState) then %% NewState %% [] broadcastPings then %% thread %% case {MyPeer broadcast(ap:PingPongAp %% message:ping(pong:incState) %% rpc:true)} of %% success(Pongs) then %% for P in Pong do %% _ = {MyPeer peer(ap:PingerService %% message: P)} %% end %% [] _ then %% skip %% end %% end %% InState %% end %% end) %% in %% Pinger = {GetS {NWHandle installService(maker: PingerMaker)}} %% end %% %% All nodes ping all other nodes. An even more broken attempt at the above. %% declare %% fun {BCastPingMkr Env} %% [P2PKitPeer] = %% {Env resolve_module_list(['P2PKitPeer'])} %% NodeId = {Env resolve_constant('NodeId')} %% SetColour = {Env resolve_proc('SetColour')} %% in %% desc(processor: proc {$ _} %% {SetColour red} %% %% Don't worry if message fails %% _ = {P2PKitPeer.broadcast ping o(replyto:o(id:NodeId %% ap:swallow))} %% end) %% end %% BCastPing = {GetS {NWHandle installService(maker: BCastPingMkr handle: $)}} %% %% {GetS_ {BCastPing broadcast(message: fred)}}