hook up core and backend

This commit is contained in:
Zlatin Balevsky
2019-11-11 22:42:55 +00:00
parent 3db167bade
commit 29e499fe9d
12 changed files with 301 additions and 36 deletions

View File

@@ -311,7 +311,7 @@ public class Core {
connectionEstablisher = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache)
log.info("initializing chat server")
chatServer = new ChatServer(eventBus, props, trustService, me)
chatServer = new ChatServer(eventBus, props, trustService, me, spk)
eventBus.with {
register(ChatMessageEvent.class, chatServer)
register(ChatDisconnectionEvent.class, chatServer)

View File

@@ -1,5 +1,17 @@
package com.muwire.core.chat;
enum ChatAction {
JOIN, LEAVE, SAY
JOIN(true, false),
LEAVE(false, false),
SAY(false, false),
LIST(true, true),
HELP(true, true),
INFO(true, true);
final boolean console;
final boolean stateless;
ChatAction(boolean console, boolean stateless) {
this.console = console;
this.stateless = stateless;
}
}

View File

@@ -3,16 +3,26 @@ package com.muwire.core.chat
class ChatCommand {
private final ChatAction action
private final String payload
final String source
ChatCommand(String source) {
int space = source.indexOf(' ')
if (space < 0)
throw new Exception("Invalid command $source")
String command = source.substring(0, space)
if (command.charAt(0) != '/')
if (source.charAt(0) != '/')
throw new Exception("command doesn't start with / $source")
command = command.substring(1)
int position = 1
StringBuilder sb = new StringBuilder()
while(position < source.length()) {
char c = source.charAt(position)
if (c == ' ')
break
sb.append(c)
position++
}
String command = sb.toString().toUpperCase()
action = ChatAction.valueOf(command)
payload = source.substring(space + 1)
if (position < source.length())
payload = source.substring(position + 1)
else
payload = ""
this.source = source
}
}

View File

@@ -23,7 +23,7 @@ import net.i2p.data.Signature
import net.i2p.data.SigningPrivateKey
@Log
class ChatConnection implements Closeable {
class ChatConnection implements ChatLink {
private static final long PING_INTERVAL = 20000
private static final long MAX_CHAT_AGE = 5 * 60 * 1000
@@ -39,6 +39,7 @@ class ChatConnection implements Closeable {
private final BlockingQueue messages = new LinkedBlockingQueue()
private final Thread reader, writer
private final LinkedList<Long> timestamps = new LinkedList<>()
private final BlockingQueue incomingEvents = new LinkedBlockingQueue()
private final DataInputStream dis
private final DataOutputStream dos
@@ -77,6 +78,11 @@ class ChatConnection implements Closeable {
writer.start()
}
@Override
public boolean isUp() {
running.get()
}
@Override
public void close() {
if (!running.compareAndSet(true, false)) {
@@ -193,11 +199,13 @@ class ChatConnection implements Closeable {
def event = new ChatMessageEvent( uuid : uuid, payload : payload, sender : sender,
host : host, room : room, chatTime : chatTime, sig : sig)
eventBus.publish(event)
incomingEvents.put(event)
}
private void handleLeave(def json) {
Persona leaver = fromString(json.persona)
eventBus.publish(new UserDisconnectedEvent(user : leaver, host : persona))
incomingEvents.put(leaver)
}
private static Persona fromString(String base64) {
@@ -257,4 +265,8 @@ class ChatConnection implements Closeable {
leave.persona = p.toBase64()
messages.put(leave)
}
public Object nextEvent() {
incomingEvents.take()
}
}

View File

@@ -6,5 +6,5 @@ import com.muwire.core.Persona
class ChatConnectionEvent extends Event {
ChatConnectionAttemptStatus status
Persona persona
ChatConnection connection
ChatLink connection
}

View File

@@ -0,0 +1,13 @@
package com.muwire.core.chat;
import java.io.Closeable;
import com.muwire.core.Persona;
public interface ChatLink extends Closeable {
public boolean isUp();
public void sendChat(ChatMessageEvent e);
public void sendLeave(Persona p);
public void sendPing();
public Object nextEvent() throws InterruptedException;
}

View File

@@ -30,10 +30,13 @@ class ChatManager {
}
void onUIConnectChatEvent(UIConnectChatEvent e) {
if (e.host == me)
return
ChatClient client = new ChatClient(connector, eventBus, e.host, me, trustService, settings)
clients.put(e.host, client)
if (e.host == me) {
eventBus.publish(new ChatConnectionEvent(status : ChatConnectionAttemptStatus.SUCCESSFUL,
persona : me, connection : LocalChatLink.INSTANCE))
} else {
ChatClient client = new ChatClient(connector, eventBus, e.host, me, trustService, settings)
clients.put(e.host, client)
}
}
void onUIDisconnectChatEvent(UIDisconnectChatEvent e) {
@@ -44,7 +47,7 @@ class ChatManager {
}
void onChatMessageEvent(ChatMessageEvent e) {
if (e.host == me)
if (e.host == me)
return
if (e.sender != me)
return

View File

@@ -18,6 +18,7 @@ import com.muwire.core.util.DataUtil
import groovy.util.logging.Log
import net.i2p.data.Base64
import net.i2p.data.Destination
import net.i2p.data.SigningPrivateKey
import net.i2p.util.ConcurrentHashSet
@Log
@@ -27,18 +28,20 @@ class ChatServer {
private final MuWireSettings settings
private final TrustService trustService
private final Persona me
private final SigningPrivateKey spk
private final Map<Destination, ChatConnection> connections = new ConcurrentHashMap()
private final Map<Destination, ChatLink> connections = new ConcurrentHashMap()
private final Map<String, Set<Persona>> rooms = new ConcurrentHashMap<>()
private final Map<Persona, Set<String>> memberships = new ConcurrentHashMap<>()
private final AtomicBoolean running = new AtomicBoolean()
ChatServer(EventBus eventBus, MuWireSettings settings, TrustService trustService, Persona me) {
ChatServer(EventBus eventBus, MuWireSettings settings, TrustService trustService, Persona me, SigningPrivateKey spk) {
this.eventBus = eventBus
this.settings = settings
this.trustService = trustService
this.me = me
this.spk = spk
Timer timer = new Timer("chat-server-pinger", true)
timer.schedule({sendPings()} as TimerTask, 1000, 1000)
@@ -46,6 +49,7 @@ class ChatServer {
public void start() {
running.set(true)
connections.put(me.destination, LocalChatLink.INSTANCE)
}
private void sendPings() {
@@ -175,11 +179,18 @@ class ChatServer {
log.log(Level.WARNING, "bad chat command",badCommand)
return
}
if ((command.action.console && e.room != CONSOLE) ||
(!command.action.console && e.room == CONSOLE))
return
switch(command.action) {
case ChatAction.JOIN : processJoin(command.payload, e); break
case ChatAction.LEAVE : processLeave(e); break
case ChatAction.SAY : processSay(e); break
case ChatAction.LIST : processList(e); break
case ChatAction.INFO : processInfo(e); break
case ChatAction.HELP : processHelp(e); break
}
}
@@ -215,6 +226,38 @@ class ChatServer {
}
}
private void processList(ChatMessageEvent e) {
String roomList = "/SAY " + String.join("\n", rooms.keySet())
echo(roomList, e)
}
private void processInfo(ChatMessageEvent e) {
String info = "/SAY The address of this server is \n${me.toBase64()}\nCopy/paste this and share it"
echo(info, e)
}
private void processHelp(ChatMessageEvent e) {
String help = "/SAY Available commands: /JOIN /LEAVE /SAY /LIST /INFO /HELP"
echo(help, e)
}
private void echo(String payload, ChatMessageEvent e) {
log.info "echoing $payload"
UUID uuid = UUID.randomUUID()
long now = System.currentTimeMillis()
byte [] sig = ChatConnection.sign(uuid, now, CONSOLE, payload, me, me, spk)
ChatMessageEvent echo = new ChatMessageEvent(
uuid : uuid,
payload : payload,
sender : me,
host : me,
room : CONSOLE,
chatTime : now,
sig : sig
)
connections[e.sender.destination]?.sendChat(echo)
}
void stop() {
if (running.compareAndSet(true, false)) {
connections.each { k, v ->

View File

@@ -0,0 +1,45 @@
package com.muwire.core.chat
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import com.muwire.core.Persona
import groovy.util.logging.Log
@Log
class LocalChatLink implements ChatLink {
public static final LocalChatLink INSTANCE = new LocalChatLink()
private final BlockingQueue messages = new LinkedBlockingQueue()
private LocalChatLink() {}
@Override
public void close() throws IOException {
}
@Override
public void sendChat(ChatMessageEvent e) {
messages.put(e)
}
@Override
public void sendLeave(Persona p) {
messages.put(p)
}
@Override
public void sendPing() {}
@Override
public Object nextEvent() {
messages.take()
}
@Override
public boolean isUp() {
true
}
}

View File

@@ -4,18 +4,24 @@ import griffon.core.artifact.GriffonController
import griffon.core.controller.ControllerAction
import griffon.inject.MVCMember
import griffon.metadata.ArtifactProviderFor
import groovy.util.logging.Log
import net.i2p.crypto.DSAEngine
import net.i2p.data.DataHelper
import net.i2p.data.Signature
import java.nio.charset.StandardCharsets
import java.util.logging.Level
import javax.annotation.Nonnull
import com.muwire.core.Persona
import com.muwire.core.chat.ChatCommand
import com.muwire.core.chat.ChatAction
import com.muwire.core.chat.ChatConnection
import com.muwire.core.chat.ChatMessageEvent
import com.muwire.core.chat.ChatServer
@Log
@ArtifactProviderFor(GriffonController)
class ChatRoomController {
@MVCMember @Nonnull
@@ -28,25 +34,78 @@ class ChatRoomController {
String words = view.sayField.text
view.sayField.setText(null)
ChatCommand command
try {
command = new ChatCommand(words)
} catch (Exception nope) {
command = new ChatCommand("/SAY $words")
}
long now = System.currentTimeMillis()
UUID uuid = UUID.randomUUID()
String room = model.console ? ChatServer.CONSOLE : model.room
byte [] sig = ChatConnection.sign(uuid, now, room, words, model.core.me, mvcGroup.parentGroup.model.host, model.core.spk)
byte [] sig = ChatConnection.sign(uuid, now, room, command.source, model.core.me, mvcGroup.parentGroup.model.host, model.core.spk)
def event = new ChatMessageEvent(uuid : uuid,
payload : words,
sender : model.core.me,
host : mvcGroup.parentGroup.model.host,
room : room,
chatTime : now,
sig : sig)
payload : command.source,
sender : model.core.me,
host : mvcGroup.parentGroup.model.host,
room : room,
chatTime : now,
sig : sig)
model.core.eventBus.publish(event)
if (command.payload.length() > 0) {
String toShow = DataHelper.formatTime(now) + " <" + model.core.me.getHumanReadableName() + "> "+command.payload
view.roomTextArea.append(toShow)
view.roomTextArea.append('\n')
}
}
void handleChatMessage(ChatMessageEvent e) {
ChatCommand command
try {
command = new ChatCommand(e.payload)
} catch (Exception bad) {
log.log(Level.WARNING,"bad chat command",bad)
return
}
model.core.eventBus.publish(event)
String toShow = DataHelper.formatTime(now) + " <" + model.core.me.getHumanReadableName() + "> "+words
view.roomTextArea.append(toShow)
view.roomTextArea.append('\n')
switch(command.action) {
case ChatAction.SAY : processSay(e, command.payload);break
case ChatAction.JOIN : processJoin(e.timestamp, e.sender); break
case ChatAction.LEAVE : processLeave(e.timestamp, e.sender); break
}
}
private void processSay(ChatMessageEvent e, String text) {
log.info "processing say $text"
String toDisplay = DataHelper.formatTime(e.timestamp) + " <"+e.sender.getHumanReadableName()+"> " + text + "\n"
runInsideUIAsync {
view.roomTextArea.append(toDisplay)
}
}
private void processJoin(long timestamp, Persona p) {
String toDisplay = DataHelper.formatTime(timestamp) + " " + p.getHumanReadableName() + " joined the room\n"
runInsideUIAsync {
view.roomTextArea.append(toDisplay)
}
}
private void processLeave(long timestamp, Persona p) {
String toDisplay = DataHelper.formatTime(timestamp) + " " + p.getHumanReadableName() + " left the room\n"
runInsideUIAsync {
view.roomTextArea.append(toDisplay)
}
}
void handleLeave(Persona p) {
String toDisplay = DataHelper.formatTime(System.currentTimeMillis()) + " " + p.getHumanReadableName() + " disconnected\n"
runInsideUIAsync {
view.roomTextArea.append(toDisplay)
}
}
}

View File

@@ -2,20 +2,83 @@ package com.muwire.gui
import com.muwire.core.Core
import com.muwire.core.Persona
import com.muwire.core.chat.ChatConnectionAttemptStatus
import com.muwire.core.chat.ChatConnectionEvent
import com.muwire.core.chat.ChatLink
import com.muwire.core.chat.ChatMessageEvent
import com.muwire.core.chat.UIConnectChatEvent
import griffon.core.artifact.GriffonModel
import griffon.transform.Observable
import groovy.util.logging.Log
import griffon.metadata.ArtifactProviderFor
@Log
@ArtifactProviderFor(GriffonModel)
class ChatServerModel {
Persona host
Core core
@Observable boolean disconnectActionEnabled
@Observable ChatConnectionAttemptStatus status
volatile ChatLink link
volatile Thread poller
volatile boolean running
void mvcGroupInit(Map<String, String> params) {
disconnectActionEnabled = host != core.me // can't disconnect from myself
core.eventBus.with {
register(ChatConnectionEvent.class, this)
publish(new UIConnectChatEvent(host : host))
}
running = true
poller = new Thread({eventLoop()} as Runnable)
poller.setDaemon(true)
poller.start()
}
void mvcGroupDestroy() {
running = false
poller?.interrupt()
}
void onChatConnectionEvent(ChatConnectionEvent e) {
if (e.connection != null)
link = e.connection
runInsideUIAsync {
status = e.status
}
}
private void eventLoop() {
while(running) {
ChatLink link = this.link
if (link == null || !link.isUp()) {
Thread.sleep(100)
continue
}
Object event = link.nextEvent()
if (event instanceof ChatMessageEvent)
handleChatMessage(event)
else if (event instanceof Persona)
handleLeave(event)
else
throw new IllegalArgumentException("event type $event")
}
}
private void handleChatMessage(ChatMessageEvent e) {
log.info("dispatching to room ${e.room}")
mvcGroup.childrenGroups[e.room]?.controller?.handleChatMessage(e)
}
private void handleLeave(Persona p) {
mvcGroup.childrenGroups.each {
it.controller.handleLeave(p)
}
}
}

View File

@@ -5,6 +5,8 @@ import griffon.inject.MVCMember
import griffon.metadata.ArtifactProviderFor
import javax.swing.SwingConstants
import com.muwire.core.chat.ChatServer
import java.awt.BorderLayout
import javax.annotation.Nonnull
@@ -24,7 +26,10 @@ class ChatServerView {
borderLayout()
tabbedPane(id : model.host.getHumanReadableName()+"-chat-rooms", constraints : BorderLayout.CENTER)
panel(constraints : BorderLayout.SOUTH) {
gridLayout(rows : 1, cols : 3)
panel {}
button(text : "Disconnect", enabled : bind {model.disconnectActionEnabled}, disconnectAction)
label(text : bind {model.status.toString()})
}
}
}
@@ -55,7 +60,7 @@ class ChatServerView {
params['tabName'] = model.host.getHumanReadableName() + "-chat-rooms"
params['room'] = 'Console'
params['console'] = true
mvcGroup.createMVCGroup("chat-room","Console", params)
mvcGroup.createMVCGroup("chat-room",ChatServer.CONSOLE, params)
}
def closeTab = {