diff --git a/apps/stasher/python/src/stasher.py b/apps/stasher/python/src/stasher.py index 514ce276d..05eccfa6b 100644 --- a/apps/stasher/python/src/stasher.py +++ b/apps/stasher/python/src/stasher.py @@ -163,10 +163,10 @@ runCore = True # timeouts - calibrate as needed timeout = { - 'ping' : 60, - 'findNode' : 60, - 'findData' : 60, - 'store' : 60, + 'ping' : 120, + 'findNode' : 120, + 'findData' : 120, + 'store' : 120, } logToSocket = None @@ -1186,7 +1186,7 @@ class KRpc(KBase): #@-node:terminate #@+node:returnValue - def returnValue(self, result=None, **kw): + def returnValue(self, res=None, **kw): """ Passes a return value back to the original caller, be it the local application, or an upstream peer @@ -1207,11 +1207,11 @@ class KRpc(KBase): self.terminate() if self.callback: if hasattr(self, 'cbArgs'): - self.callback(result, self.cbArgs) + self.callback(res, self.cbArgs) else: - self.callback(result) + self.callback(res) elif self.isLocal: - self.queue.put(result) + self.queue.put(res) else: self.upstreamPeer.send_reply(msgId=self.upstreamMsgId, **kw) @@ -1986,7 +1986,7 @@ class KRpcFindNode(KRpc): peerRec.state = 'replied' # wrap the returned peers as KPeer objects - peersReturned = details.get('nodes', []) + peersReturned = details.get('result', []) peersReturned = [self.localNode._normalisePeer(p) for p in peersReturned] self.numPeersRecommended += len(peersReturned) @@ -2001,6 +2001,7 @@ class KRpcFindNode(KRpc): # and check for and action possible end of query round self.checkEndOfRound() + #@-node:on_reply #@+node:on_tick def on_tick(self): @@ -2157,7 +2158,8 @@ class KRpcFindNode(KRpc): self.reportStats() - KRpc.returnValue(self, items, nodes=items) + KRpc.returnValue(self, items, result=items) + #@-node:returnValue #@+node:reportStats @@ -2201,7 +2203,7 @@ class KRpcFindData(KRpcFindNode): if value != None: self.log(4, "Found required value in local storage") self.log(4, "VALUE='%s'" % value) - self.returnValue(value) + self.on_gotValue(value, self.hashWanted.asHex()) return # no such luck - pass on to parent @@ -2213,13 +2215,117 @@ class KRpcFindData(KRpcFindNode): """ Callback for FIND_NODE reply """ - res = details.get('nodes', None) + res = details.get('result', None) if isinstance(res, str): - self.returnValue(res) + self.on_gotValue(res, self.hashWanted.asHex()) else: KRpcFindNode.on_reply(self, peer, msgId, **details) #@-node:on_reply + #@+node:on_gotValue + def on_gotValue(self, value, hash=None): + """ + Callback which fires when we get the value stored under a key + + Value is either the real value, or a splitfile manifest + If a real value, just return it. + If a splitfile manifest, launch nested findValue RPCs to get each chunk + """ + nchunks = 0 + try: + firstline, rest = value.split("\n", 1) + firstline = firstline.strip() + kwd, str_nchunks = firstline.split(":") + if kwd != 'chunks': + raise hell + nchunks = int(nchunks) + value = rest + except: + pass # in this case, hell hath no fury at all + + if nchunks == 0: + self.returnValue(value) + return + + # now we get to the hard bit - we have to set up nested findData RPCs to + # get all the chunks and reassemble them + hashes = rest.strip().split("\n") + + # do sanity checks + hashesAllValid = [len(h) == 40 for h in hashes] + if len(hashes) != nchunks: + self.log( + 2, + "Splitfile retrieval failure\nmanifest contains %s hashes, should have been %s" % ( + len(hashes), nchunks)) + self.returnValue(None) + if False in hashesAllValid: + self.log(2, "Splitfile retrieval failure - one or more invalid hashes") + + # now this is a bit weird - we need to bind each chunk to its hash, so we create a + # class which produces callables which fire our on_gotChunk callback + class ChunkNotifier: + def __init__(me, h, cb): + me.h = h + me.cb = cb + def __call__(me, val): + me.cb(me.h, val) + + # now launch the chunk retrieval RPCs + # result is that for each retrieved chunk, our on_gotChunk callback will + # be invoked with the arguments (hash, value), so we can tick them off + self.numChunks = nchunks + self.numChunksReceived = 0 + self.chunkHashes = hashes + self.chunks = dict.fromkeys(hashes) + for h in hashes: + KRpcFindData(self.localNode, h, ChunkNotifier(h, self.on_gotChunk)) + + # now, we can sit back and receive the chunks + + #@-node:on_gotValue + #@+node:on_gotChunk + def on_gotChunk(self, hexhash, value): + """ + Callback which fires when a nested chunk findNode returns + """ + if value == None: + self.log(2, "Chunk retrieval failed, fatal to this findData") + self.returnValue(None) + return + + # got a value - vet it against hash + if shahash(value) != hexhash: + self.log(2, "Got a chunk, but it doesn't hash right - fatal to this findData") + self.returnValue(None) + return + + # it's valid - stash it + self.chunks[hexhash] = value + self.numChunksReceived += 1 + + # have we finished yet? + if self.numChunksReceived <= self.numChunks: + # no + self.log(4, "Received chunk %s of %s" % (self.numChunksReceived, self.numChunks)) + return + + # maybe we have + self.log(4, "We appear to have all chunks, checking further") + + # sanity check + if None in self.chunks.values(): + self.log(2, "Fatal - reached chunk count, but chunks still missing") + self.returnValue(None) + return + + # finally done - got all chunks, hashes are valid, reassemble in order + allChunks = [self.chunks[h] for h in self.chunkHashes] + reassembled = "".join(allChunks) + self.log(4, "Reassembled all %s chunks, SUCCESS" % self.numChunks) + self.returnValue(reassembled) + + #@-node:on_gotChunk #@+node:returnValue def returnValue(self, items): """ @@ -2229,9 +2335,10 @@ class KRpcFindData(KRpcFindNode): # so we can introspect it self.localNode.lastrpc = self + # another debugging hack self.reportStats() - KRpc.returnValue(self, items, nodes=items) + KRpc.returnValue(self, items, result=items) #@-node:returnValue #@-others @@ -2272,6 +2379,9 @@ class KRpcStore(KRpc): self.value = kw['value'] self.isLocalOnly = kw.get('local', True) + # set 'splitting' flag to indicate if we need to insert as splitfiles + self.splitting = len(self.value) > maxValueSize + self.log(4, "isLocalOnly=%s" % self.isLocalOnly) if kw.has_key('cbArgs'): @@ -2285,6 +2395,14 @@ class KRpcStore(KRpc): """ Kicks off this RPC """ + # if too big, then break up into <30k chunks + if self.splitting: + self.storeSplit() + return + + # not too big - prefix a 0 chunk count, and go ahead as a single entity + self.value = "chunks:0\n" + self.value + # if local only, or no peers, just save locally if self.isLocalOnly or len(self.localNode.peers) == 0: result = self.localNode.storage.putKey(self.keyHashed, self.value, keyIsHashed=True) @@ -2303,7 +2421,85 @@ class KRpcStore(KRpc): hash=self.keyHashed, raw=True, local=False) return + #@-node:start + #@+node:storeSplit + def storeSplit(self): + """ + Gets called if we're splitting a big file into smaller chunks + + Here, we: + - break the file up into chunks + - build a manifest + - launch store RPCs to store each chunk, where the key is SHA(chunk) + - launch a store RPC to store the 'manifest' (noting that if the manifest + is too big, it'll get recursively inserted as a splitfile as well + """ + # break up into chunks + chunks = [] + hashes = [] + size = len(self.value) + i = 0 + self.nchunks = 0 + while i < size: + chunks.append(self.value[i:i+maxValueSize]) + hashes.append(shahash(chunks[-1])) + i += maxValueSize + self.nchunks += 1 + + # build the manifest + manifest = "chunks:%s\n%s\n" % (self.nchunks, "\n".join(hashes)) + + # set progress attributes + self.chunkManifestInserted = False + self.chunksInserted = 0 + + # launch nested Store RPCs for manifest, and each chunk + KRpcStore(self.localNode, self.on_doneChunkManifest, + local=self.isLocalOnly, + key=self.key, + value=manifest) + i = 0 + while i < self.nchunks: + KRpcStore(self.localNode, self.on_doneChunk, + local=self.isLocalOnly, + key=hashes[i], + value=chunks[i]) + i += 1 + + # now sit back and wait for the callbacks + #@-node:storeSplit + #@+node:on_doneChunkManifest + def on_doneChunkManifest(self, result): + """ + Callback which fires when a manifest insert succeeds/fails + """ + # the chunk callback handles all + self.on_doneChunk(result, isManifest=True) + #@-node:on_doneChunkManifest + #@+node:on_doneChunk + def on_doneChunk(self, result, isManifest=False): + """ + Callback which fires when a single chunk insert succeeds/fails + """ + # a failure either way means the whole RPC has failed + if not result: + # one huge fuck-up + self.returnValue(False) + return + + # update our tally + if isManifest: + self.chunkManifestInserted = True + else: + self.chunksInserted += 1 + + # finished? + if self.chunkManifestInserted and (self.chunksInserted == self.nchunks): + # yep = success + self.returnValue(True) + + #@-node:on_doneChunk #@+node:returnValue def returnValue(self, result): """