functor import Application Connection Pickle System Property P2PKitClient at 'x-ozlib://keving/net/p2p/P2PKitClient.ozf' Util at 'x-ozlib://keving/lib/Util.ozf' TwikiStore at 'x-ozlib://keving/net/TwikiStore.ozf' export define GetTwikiCacheURL = 'http://renoir.info.ucl.ac.be/twiki/pub/INGI/TwikiCache/' DefaultOzStoreTicketFile = 'http://www.info.ucl.ac.be/~glynn/OzStoreTicket' OzStoreTicket {Property.put 'print.width' 1000} {Property.put 'print.depth' 1000} %% If argument is error then raise it as an exception %% If success then return content fun {GetS R} thread case R of error(E) then {Value.failed E} [] success(S) then S end end end %% %% As GetS, but throws away any success value and kills app on exception %% proc {GetS_ R} %% thread %% case R of %% error(E) then raise E end %% [] success(...) then skip end %% end %% end %% Args = try {Application.getArgs record(network(single char:&n type:atom default:default) % P2P network name ozstore(single type:bool default: false) ozstoreticket(single type:atom default:DefaultOzStoreTicketFile) ticket(single type: atom char:&t default:unit) % client ticket help(single char:[&? &h] default:false) verbose(single char:&v type:int default:0) % verbosity )} catch _ then {System.showInfo 'Unrecognised arguments'} optRec(help:true) end if Args.help then {System.showInfo "Usage: "#{Property.get 'application.url'}#" [option]"} {System.showInfo "Options:"} {System.showInfo "\t"#"-n , --network "#"\t"#"Connect to "} {System.showInfo "\t"#"-t , --ticket "#"\t\t\t\t"#"Force Connection to this client ticket"} {System.showInfo "\t"#"--ozstore"#"\t\t\t\t"#"Use ozstore to find clients (default: false)"} {System.showInfo "\t"#"--ozstoreticket "#"\t\t"#"ozstore ticket file (default: "#DefaultOzStoreTicketFile#")"} {System.showInfo "\t"#"-v , --verbose "#"\t\t"#"Verbosity level"} {System.showInfo "\t"#"-h, --help"#"\t\t\t\t"#"This help"} {Application.exit 0} end %% Print all messages with Lvl less than verbosity cutoff %% Trace = {Util.mkTrace const(Args.verbose)} TraceInfo = {Util.mkTraceInfo const(Args.verbose)} OzStoreTicket = if Args.ozstore then {Pickle.load Args.ozstoreticket} else noTicket end %% Retrieve p2p network entry points from Twiki ClientEntryPoints = if Args.ticket == unit then {Map if Args.ozstore then {GetS {Send {Connection.take OzStoreTicket} get(Args.network $)}} else {TwikiStore.get GetTwikiCacheURL Args.network} end fun {$ _#(_#_#C)} C end} else [Args.ticket] end NWHandle = {GetS {P2PKitClient.makeClient ClientEntryPoints Args.verbose}} {TraceInfo 10 'Install DSM on all nodes'} %% %% gui_details sends the node's id, fully qualified domain name and other stats to %% the service in the incoming message %% local fun {DSM_Manager_Maker Env} Version = 1.3 [System OP2PS P2PKitPeer Record OS] = {Env resolve_module_list(['System' 'OP2PS' 'P2PKitPeer' 'Record' 'OS'])} NodeId = {Env resolve_constant('NodeId')} MyServiceName = {Env get_ap_name} MaxNetSize = {OP2PS getNetConfig($)}.maxNetSize {System.showInfo 'XXXXXXXXXX Installing DSM Manager '#MyServiceName# ' Version '#Version#' XXXXXXXXX'} MinR {OS.randLimits MinR _} fun {MakeDSM Store TSVec} proc {$ Request} case Request of read(Key Val) then {System.showInfo 'Reading Location '#Key} {Dictionary.condGet Store Key undefined Val} [] write(Key Val) then OTS NTS UpdM in {System.showInfo 'Writing Location '#Key} {Dictionary.exchange TSVec NodeId OTS NTS} {Wait OTS} %% Block until we have unique access {Dictionary.put Store Key Val} NTS = OTS+1 %% And free any concurrent write UpdM = updLoc(src: NodeId key:Key val:Val tsvec:{Dictionary.toRecord upd TSVec}) thread %% Delay random time up to a second {Delay {Int.'mod' {OS.rand}-MinR 1000}} %% Fix broadcast so that we don't receive our own updLoc {P2PKitPeer.broadcast MyServiceName UpdM _} end [] exchange(Key OldVal NewVal) then OTS NTS UpdM in {System.showInfo 'Exchanging Location '#Key} {Dictionary.exchange TSVec NodeId OTS NTS} {Wait OTS} %% Block until we have unique access {Dictionary.condExchange Store Key undefined OldVal NewVal} NTS = OTS+1 %% And free any concurrent write UpdM = updLoc(src: NodeId key:Key val:NewVal tsvec:{Dictionary.toRecord upd TSVec}) thread %% Delay random time up to a second {Delay {Int.'mod' {OS.rand}-MinR 1000}} {P2PKitPeer.broadcast MyServiceName UpdM _} end end end end fun {DSMMgr M State} {System.show 'DSMMgr'(M)} Msg = M.msg in {System.showInfo 'XXXXXXXXXX DSM Manager '#{Label Msg}#' XXXXXXXXX'} case Msg of getStore(src:SrcId ...) andthen SrcId == NodeId then %% Noone is started, so start with empty store {System.showInfo 'DSM: Creating Fresh Store'} {P2PKitPeer.pushMessages {Reverse State.todo}} state(store: {NewDictionary} tsvec: local T = {NewDictionary} in T.NodeId := 0 T end state: ok todo: nil) [] getStore(src:SrcId replyto:ReplyAP) then SuccId = {OP2PS getSucc($)} HomeDist = if SrcId-NodeId =< 0 then MaxNetSize+(SrcId-NodeId) else SrcId-NodeId end SuccDist = if SuccId-NodeId < 0 then MaxNetSize+(SuccId-NodeId) else SuccId-NodeId end in if HomeDist < SuccDist then {System.showInfo 'DSM: TTL passed'} %% Originating node must have died, let it go elseif State.state == init then {System.showInfo 'DSM: Pass getStore to successor'} %% Send on to successor {P2PKitPeer.sendKey SuccId MyServiceName Msg _} else {System.showInfo 'DSM: This is a newbie, send our store :-)'} %% Reply with our store {P2PKitPeer.sendPeer ReplyAP.id ReplyAP.ap setStore(store:{Dictionary.toRecord store State.store} tsvec:{Dictionary.toRecord tsvec State.tsvec} todo:State.todo) _} end State [] setStore(store: Store tsvec:RemTS todo: RemTodo) then {System.showInfo 'DSM: Set store from good Samaritan'} {P2PKitPeer.pushMessages {Reverse State.todo}} state(store: {Record.toDictionary Store} tsvec: local NewTS = {Record.toDictionary RemTS} in NewTS.NodeId := 0 NewTS end state: ok todo: RemTodo) elseif State.state == init then %% Add to todo list until we are ready {AdjoinAt State todo M|State.todo} else case Msg of getDSM(replyto:ReplyAP) then {System.showInfo 'DSM: Give our store to friend'} {P2PKitPeer.sendPeer ReplyAP.id ReplyAP.ap dsm(dsm:{MakeDSM State.store State.tsvec}) _} State [] updLoc(src:SrcId ...) andthen SrcId == NodeId then {System.show 'Discarding Update'} %% Fix broadcast so that we don't receive our own updLoc State [] updLoc(...) then MyTSVec = State.tsvec {System.show 'msg: '#Msg} fun {ApplyUpd Upd} case Upd of updLoc(src: SrcId key:Key val:Val tsvec:RemTSVec) then {System.show 'upd: '#Upd} if RemTSVec.SrcId == {Dictionary.condGet MyTSVec SrcId 0}+1 andthen {Record.allInd RemTSVec fun {$ Id _} if Id \= SrcId then RemTSVec.Id =< {Dictionary.condGet MyTSVec Id 0} else true end end} then %% This record can be applied State.store.Key := Val {Dictionary.put MyTSVec SrcId RemTSVec.SrcId} true else false end end end in if {ApplyUpd Msg} then %% Check if we can now apply other updates to the store %% If we order todo in applicability order then we only need %% make one pass through. fun {ApplyPending Us} Changed#NewUpds = {List.foldL Us fun {$ C#Ns U} Updated = {ApplyUpd U} in {Bool.'or' C Updated}# if Updated then Ns else U|Ns end end false#nil } in if Changed then {ApplyPending NewUpds} else NewUpds end end in %% todo contains outstanding updates {AdjoinAt State todo {ApplyPending State.todo}} else %% As above it would be better to add in 'applicability' %% order {AdjoinAt State todo Msg|State.todo} end end end end in %% Ask around the circle for a copy of the store {P2PKitPeer.sendKey {OP2PS getSucc($)} MyServiceName getStore(src:NodeId replyto:ap(id: NodeId ap: MyServiceName)) _} desc(processor: DSMMgr initialState: state(store:{NewDictionary} state:init todo:nil)) end in _ = {GetS {NWHandle installService(name:dsm_manager upgradeable: true maker: DSM_Manager_Maker handle: $)}} end {Application.exit 0} end