forked from I2P_Developers/i2p.i2p
* SSU: Fix concurrency errors (ticket #536)
This commit is contained in:
@@ -172,15 +172,20 @@ class EstablishmentManager {
|
|||||||
if (_queuedOutbound.size() > MAX_QUEUED_OUTBOUND) {
|
if (_queuedOutbound.size() > MAX_QUEUED_OUTBOUND) {
|
||||||
rejected = true;
|
rejected = true;
|
||||||
} else {
|
} else {
|
||||||
List<OutNetMessage> newQueued = new ArrayList(1);
|
List<OutNetMessage> newQueued = new ArrayList(MAX_QUEUED_PER_PEER);
|
||||||
List<OutNetMessage> queued = _queuedOutbound.putIfAbsent(to, newQueued);
|
List<OutNetMessage> queued = _queuedOutbound.putIfAbsent(to, newQueued);
|
||||||
if (queued == null)
|
if (queued == null)
|
||||||
queued = newQueued;
|
queued = newQueued;
|
||||||
|
// this used to be inside a synchronized (_outboundStates) block,
|
||||||
|
// but that's now a CHM, so protect the ArrayList
|
||||||
|
// There are still races possible but this should prevent AIOOBE and NPE
|
||||||
|
synchronized (queued) {
|
||||||
queueCount = queued.size();
|
queueCount = queued.size();
|
||||||
if (queueCount < MAX_QUEUED_PER_PEER)
|
if (queueCount < MAX_QUEUED_PER_PEER)
|
||||||
queued.add(msg);
|
queued.add(msg);
|
||||||
}
|
|
||||||
deferred = _queuedOutbound.size();
|
deferred = _queuedOutbound.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// must have a valid session key
|
// must have a valid session key
|
||||||
byte[] keyBytes = addr.getIntroKey();
|
byte[] keyBytes = addr.getIntroKey();
|
||||||
@@ -212,9 +217,14 @@ class EstablishmentManager {
|
|||||||
if (state != null) {
|
if (state != null) {
|
||||||
state.addMessage(msg);
|
state.addMessage(msg);
|
||||||
List<OutNetMessage> queued = _queuedOutbound.remove(to);
|
List<OutNetMessage> queued = _queuedOutbound.remove(to);
|
||||||
if (queued != null)
|
if (queued != null) {
|
||||||
for (int i = 0; i < queued.size(); i++)
|
// see comments above
|
||||||
state.addMessage(queued.get(i));
|
synchronized (queued) {
|
||||||
|
for (OutNetMessage m : queued) {
|
||||||
|
state.addMessage(m);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rejected) {
|
if (rejected) {
|
||||||
@@ -399,8 +409,12 @@ class EstablishmentManager {
|
|||||||
// there shouldn't have been queued messages for this active state, but just in case...
|
// there shouldn't have been queued messages for this active state, but just in case...
|
||||||
List<OutNetMessage> queued = _queuedOutbound.remove(state.getRemoteHostId());
|
List<OutNetMessage> queued = _queuedOutbound.remove(state.getRemoteHostId());
|
||||||
if (queued != null) {
|
if (queued != null) {
|
||||||
for (int i = 0; i < queued.size(); i++)
|
// see comments above
|
||||||
state.addMessage(queued.get(i));
|
synchronized (queued) {
|
||||||
|
for (OutNetMessage m : queued) {
|
||||||
|
state.addMessage(m);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//admitted = locked_admitQueued();
|
//admitted = locked_admitQueued();
|
||||||
|
Reference in New Issue
Block a user