* MessageHistory:

- Flush at shutdown
    - Fix file location, only delete if enabled
    - Cleanups, concurrent
This commit is contained in:
zzz
2011-12-13 15:25:56 +00:00
parent 15cbb6bb71
commit 900defcd42
3 changed files with 48 additions and 30 deletions

View File

@@ -8,13 +8,16 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.TimeZone;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.tunnel.HopConfig;
import net.i2p.util.Log;
import net.i2p.util.SecureFileOutputStream;
/**
* Simply act as a pen register of messages sent in and out of the router.
@@ -22,17 +25,20 @@ import net.i2p.util.Log;
* (with clock synchronization, this will generate a log that can be used to
* analyze the entire network, if everyone provides their logs honestly)
*
* This is always instantiated in the context and the WriteJob runs every minute,
* but unless router.keepHistory=true it does nothing.
* It generates a LARGE log file.
*/
public class MessageHistory {
private Log _log;
private RouterContext _context;
private final List _unwrittenEntries = new ArrayList(64); // list of raw entries (strings) yet to be written
private final Log _log;
private final RouterContext _context;
private final Queue<String> _unwrittenEntries; // list of raw entries (strings) yet to be written
private String _historyFile; // where to write
private String _localIdent; // placed in each entry to uniquely identify the local router
private boolean _doLog; // true == we want to log
private boolean _doPause; // true == briefly stop writing data to the log (used while submitting it)
private ReinitializeJob _reinitializeJob;
private WriteJob _writeJob;
private final ReinitializeJob _reinitializeJob;
private final WriteJob _writeJob;
//private SubmitMessageHistoryJob _submitMessageHistoryJob;
private volatile boolean _firstPass;
@@ -52,6 +58,7 @@ public class MessageHistory {
_log = context.logManager().getLog(getClass());
_fmt = new SimpleDateFormat("yy/MM/dd.HH:mm:ss.SSS");
_fmt.setTimeZone(TimeZone.getTimeZone("GMT"));
_unwrittenEntries = new LinkedBlockingQueue();
_reinitializeJob = new ReinitializeJob();
_writeJob = new WriteJob();
_firstPass = true;
@@ -59,9 +66,19 @@ public class MessageHistory {
initialize(true);
}
void setDoLog(boolean log) { _doLog = log; }
/** @since 0.8.12 */
public void shutdown() {
if (_doLog)
addEntry(getPrefix() + "** Router shutdown");
_doPause = false;
flushEntries();
_doLog = false;
}
private void setDoLog(boolean log) { _doLog = log; }
public boolean getDoLog() { return _doLog; }
/** @deprecated unused */
void setPauseFlushes(boolean doPause) { _doPause = doPause; }
String getFilename() { return _historyFile; }
@@ -78,10 +95,8 @@ public class MessageHistory {
public void initialize(boolean forceReinitialize) {
if (!forceReinitialize) return;
if (_context.router() == null) return;
if (_context.router().getRouterInfo() == null) {
_reinitializeJob.getTiming().setStartAfter(_context.clock().now()+5000);
_reinitializeJob.getTiming().setStartAfter(_context.clock().now() + 15*1000);
_context.jobQueue().addJob(_reinitializeJob);
} else {
_localIdent = getName(_context.routerHash());
@@ -90,11 +105,15 @@ public class MessageHistory {
// clear the history file on startup
if (_firstPass) {
File f = new File(_historyFile);
if (!f.isAbsolute())
f = new File(_context.getLogDir(), _historyFile);
f.delete();
_writeJob.getTiming().setStartAfter(_context.clock().now() + WRITE_DELAY);
_context.jobQueue().addJob(_writeJob);
_firstPass = false;
}
_firstPass = false;
addEntry(getPrefix() + "** Router initialized (started up or changed identities)");
_context.jobQueue().addJob(_writeJob);
if (_doLog)
addEntry(getPrefix() + "** Router initialized (started up or changed identities)");
//_submitMessageHistoryJob.getTiming().setStartAfter(_context.clock().now() + 2*60*1000);
//_context.jobQueue().addJob(_submitMessageHistoryJob);
}
@@ -589,11 +608,8 @@ public class MessageHistory {
*/
private void addEntry(String entry) {
if (entry == null) return;
int sz = 0;
synchronized (_unwrittenEntries) {
_unwrittenEntries.add(entry);
sz = _unwrittenEntries.size();
}
_unwrittenEntries.offer(entry);
int sz = _unwrittenEntries.size();
if (sz > FLUSH_SIZE)
flushEntries();
}
@@ -602,26 +618,25 @@ public class MessageHistory {
* Write out any unwritten entries, and clear the pending list
*/
private void flushEntries() {
if (_doPause) return;
List entries = null;
synchronized (_unwrittenEntries) {
entries = new ArrayList(_unwrittenEntries);
if (!_doLog)
_unwrittenEntries.clear();
}
writeEntries(entries);
else if ((!_unwrittenEntries.isEmpty()) && !_doPause)
writeEntries();
}
/**
* Actually write the specified entries
*
*/
private void writeEntries(List entries) {
if (!_doLog) return;
private synchronized void writeEntries() {
File f = new File(_historyFile);
if (!f.isAbsolute())
f = new File(_context.getLogDir(), _historyFile);
FileOutputStream fos = null;
try {
fos = new FileOutputStream(_historyFile, true);
for (Iterator iter = entries.iterator(); iter.hasNext(); ) {
String entry = (String)iter.next();
fos = new SecureFileOutputStream(f, true);
String entry;
while ((entry = _unwrittenEntries.poll()) != null) {
fos.write(entry.getBytes());
fos.write(NL);
}
@@ -638,7 +653,7 @@ public class MessageHistory {
public WriteJob() {
super(MessageHistory.this._context);
}
public String getName() { return "Write History Entries"; }
public String getName() { return _doLog ? "Message debug log" : "Message debug log (disabled)"; }
public void runJob() {
flushEntries();
updateSettings();
@@ -646,6 +661,7 @@ public class MessageHistory {
}
}
/****
public static void main(String args[]) {
RouterContext ctx = new RouterContext(null);
MessageHistory hist = new MessageHistory(ctx);
@@ -658,4 +674,5 @@ public class MessageHistory {
hist.addEntry("you smell finished");
hist.flushEntries();
}
****/
}

View File

@@ -1022,6 +1022,7 @@ public class Router implements RouterClock.ClockShiftListener {
try { _context.inNetMessagePool().shutdown(); } catch (Throwable t) { _log.error("Error shutting down the inbound net pool", t); }
try { _context.clientMessagePool().shutdown(); } catch (Throwable t) { _log.error("Error shutting down the client msg pool", t); }
try { _context.sessionKeyManager().shutdown(); } catch (Throwable t) { _log.error("Error shutting down the session key manager", t); }
try { _context.messageHistory().shutdown(); } catch (Throwable t) { _log.error("Error shutting down the message history logger", t); }
// do stat manager last to reduce chance of NPEs in other threads
try { _context.statManager().shutdown(); } catch (Throwable t) { _log.error("Error shutting down the stats manager", t); }
_context.deleteTempDir();

View File

@@ -36,7 +36,7 @@ import net.i2p.util.I2PProperties.I2PPropertyCallback;
*
*/
public class RouterContext extends I2PAppContext {
private Router _router;
private final Router _router;
private ClientManagerFacadeImpl _clientManagerFacade;
private ClientMessagePool _clientMessagePool;
private JobQueue _jobQueue;