forked from I2P_Developers/i2p.i2p
Some changes to make the SAM module never block if called on a socket
which select() says is safe to read/write or called in any case on a socket which is O_NONBLOCK Significant work is still required.
This commit is contained in:
@@ -10,6 +10,8 @@ package Net::SAM;
|
|||||||
|
|
||||||
use strict;
|
use strict;
|
||||||
|
|
||||||
|
use POSIX;
|
||||||
|
|
||||||
use Switch;
|
use Switch;
|
||||||
|
|
||||||
use IO::Socket;
|
use IO::Socket;
|
||||||
@@ -73,71 +75,199 @@ sub lookup {
|
|||||||
|
|
||||||
#}
|
#}
|
||||||
|
|
||||||
|
|
||||||
|
sub readprocesswrite {
|
||||||
|
my $self = shift;
|
||||||
|
$self->readprocess();
|
||||||
|
$self->dowrite();
|
||||||
|
}
|
||||||
|
|
||||||
|
sub doread {
|
||||||
|
my $self = shift;
|
||||||
|
my $rv;
|
||||||
|
my $data;
|
||||||
|
|
||||||
|
$rv = $self->recv($data, $POSIX::BUFSIZE, 0);
|
||||||
|
|
||||||
|
if ( defined($rv) && ( length($data) >= 1 ) ) {
|
||||||
|
# We received some data. Put it in our buffer.
|
||||||
|
${*$self}->{inbuffer} += $data;
|
||||||
|
} else {
|
||||||
|
# No data. Either we're on a non-blocking socket, or there
|
||||||
|
# was an error or EOF
|
||||||
|
if ( $!{EAGAIN} ) {
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
# I suppose caller can look at $! for details
|
||||||
|
return undef;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
sub dowrite {
|
||||||
|
my $self = shift;
|
||||||
|
my $rv;
|
||||||
|
my $data;
|
||||||
|
|
||||||
|
$rv = $self->send(${*$self}->{outbuffer}, 0);
|
||||||
|
|
||||||
|
if ( ! defined($rv) ) {
|
||||||
|
warn "SAM::dowrite - Couldn't write for no apparent reason.\n";
|
||||||
|
return undef;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( $rv == length(${*$self}->{outbuffer}) || $!{EWOULDBLOCK} ) {
|
||||||
|
substr(${*$self}->{outbuffer},0, $rv) = ''; # Remove from buffer
|
||||||
|
|
||||||
|
# Nuke buffer if empty
|
||||||
|
delete ${*$self}->{outbuffer} unless length(${*$self}->{outbuffer});
|
||||||
|
} else {
|
||||||
|
# Socket closed on us or something?
|
||||||
|
return undef;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub messages {
|
||||||
|
my $self = shift;
|
||||||
|
|
||||||
|
return @{ ${*$self}->{messages} };
|
||||||
|
}
|
||||||
|
|
||||||
|
sub queuemessage {
|
||||||
|
|
||||||
|
my $self = shift;
|
||||||
|
my $message = shift;
|
||||||
|
|
||||||
|
push @{ ${*$self}->{messages} } , $message;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub unqueuemessage {
|
||||||
|
my $self = shift;
|
||||||
|
|
||||||
|
return unshift(@{ ${*$self}->{messages} } );
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
sub readprocess {
|
sub readprocess {
|
||||||
my $self = shift;
|
my $self = shift;
|
||||||
my $chunk;
|
|
||||||
|
$self->doread();
|
||||||
|
$self->process();
|
||||||
|
}
|
||||||
|
|
||||||
|
sub process {
|
||||||
|
my $self = shift;
|
||||||
|
my %tvhash;
|
||||||
my $payload;
|
my $payload;
|
||||||
|
|
||||||
print "readprocess: " . $self->connected() . "\n";
|
|
||||||
|
|
||||||
# May block if the SAM bridge gets hosed
|
# Before we can read any new messages, if an existing message has payload
|
||||||
my $response = <$self>;
|
# we must read it in. Otherwise we'll create garbage messages containing
|
||||||
|
# the payload of previous messages.
|
||||||
|
|
||||||
print "readprocess: $!" . $self->connected() . "\n";
|
if ( ${*$self}->{payloadrequired} >= 1 ) {
|
||||||
|
|
||||||
chomp $response;
|
if ( length( ${*$self}->{inbuffer} ) >= ${*$self}->{payloadrequired} ) {
|
||||||
my ($primative, $more, $extra) = split (' ', $response, 3);
|
# Scarf payload from inbuffer into $payload
|
||||||
|
$payload = substr(${*$self}->{inbuffer}, 0,
|
||||||
|
${*$self}->{payloadrequired});
|
||||||
|
|
||||||
$primative = uc($primative);
|
# Nuke payload from inbuffer
|
||||||
|
substr(${*$self}->{inbuffer}, 0,
|
||||||
|
${*$self}->{payloadrequired} ) = '';
|
||||||
|
|
||||||
print "readprocess: " . $self->connected() . " -- $primative -- $more -- $extra\n";
|
# Put message with payload into spool
|
||||||
|
push @{ ${*$self}->{messages} } ,
|
||||||
|
${*$self}->{messagerequiringpayload}.$payload;
|
||||||
|
|
||||||
switch ($primative) {
|
# Delete the saved message requiring payload
|
||||||
|
delete ${*$self}->{messagerequiringpayload};
|
||||||
case "HELLO" {
|
} else {
|
||||||
if ($more !~ m/REPLY/ ) { die ("Bogus HELLO response") }
|
# Insufficient payload in inbuffer. Try again later.
|
||||||
if ($extra =~ m/NOVERSION/ ) {
|
return 1;
|
||||||
die("SAM Bridge Doesn't support my version") ;
|
|
||||||
}
|
}
|
||||||
$self->_hashtv($extra);
|
|
||||||
${*$self}->{greeted} = 1;
|
|
||||||
};
|
|
||||||
case "SESSION" {
|
|
||||||
if ( $more !~ m/STATUS/ ) {
|
|
||||||
die("Bogus SESSION response");
|
|
||||||
}
|
|
||||||
$self->_hashtv($extra);
|
|
||||||
}
|
|
||||||
case "STREAM" {};
|
|
||||||
case "DATAGRAM" {
|
|
||||||
if ( $more !~ m/RECEIVE/ ) {
|
|
||||||
die("Bogus DATAGRAM response.");
|
|
||||||
}
|
|
||||||
$self->_hashtv($extra);
|
|
||||||
push @{ ${*$self}->{incomingdatagram } },
|
|
||||||
[ ${*$self}->{DESTINATION},
|
|
||||||
$self->_readblock(${*$self}->{SIZE}) ];
|
|
||||||
|
|
||||||
};
|
|
||||||
case "RAW" {
|
|
||||||
if ( $more !~ m/RECEIVE/ ) {
|
|
||||||
die("Bogus RAW response.");
|
|
||||||
}
|
}
|
||||||
$self->_hashtv($extra);
|
|
||||||
|
|
||||||
push @{ $self->{incomingraw} }, $self->_readblock($self->{SIZE});
|
|
||||||
};
|
if ( ${*$self}->{inbuffer} =~ s/(.*\n)// ) {
|
||||||
case "NAMING" {
|
%tvhash = $self->_hashtv($1); # Returns a tag/value hash
|
||||||
if ( $more !~ m/REPLY/ ) {
|
if ( $tvhash{SIZE} ) {
|
||||||
die("Bogus NAMING response");
|
# We've got a message with payload on our hands. :(
|
||||||
|
${*$self}->{payloadrequired} = $tvhash{SIZE};
|
||||||
|
${*$self}->{messagerequiringpayload} = $1;
|
||||||
|
return 1; # Could call ourself here, but we'll get called again.
|
||||||
|
} else {
|
||||||
|
push @{ ${*$self}->{messages} } , $1;
|
||||||
}
|
}
|
||||||
$self->_hashtv($extra);
|
|
||||||
};
|
|
||||||
case "DEST" {};
|
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# sub junk {
|
||||||
|
|
||||||
|
|
||||||
|
# print "readprocess: " . $self->connected() . "\n";
|
||||||
|
|
||||||
|
# # May block if the SAM bridge gets hosed
|
||||||
|
# my $response = <$self>;
|
||||||
|
|
||||||
|
# print "readprocess: $!" . $self->connected() . "\n";
|
||||||
|
|
||||||
|
# chomp $response;
|
||||||
|
# my ($primative, $more, $extra) = split (' ', $response, 3);
|
||||||
|
|
||||||
|
# $primative = uc($primative);
|
||||||
|
|
||||||
|
# print "readprocess: " . $self->connected() . " -- $primative -- $more -- $extra\n";
|
||||||
|
|
||||||
|
# switch ($primative) {
|
||||||
|
|
||||||
|
# case "HELLO" {
|
||||||
|
# if ($more !~ m/REPLY/ ) { die ("Bogus HELLO response") }
|
||||||
|
# if ($extra =~ m/NOVERSION/ ) {
|
||||||
|
# die("SAM Bridge Doesn't support my version") ;
|
||||||
|
# }
|
||||||
|
# $self->_hashtv($extra);
|
||||||
|
# ${*$self}->{greeted} = 1;
|
||||||
|
# };
|
||||||
|
# case "SESSION" {
|
||||||
|
# if ( $more !~ m/STATUS/ ) {
|
||||||
|
# die("Bogus SESSION response");
|
||||||
|
# }
|
||||||
|
# $self->_hashtv($extra);
|
||||||
|
# }
|
||||||
|
# case "STREAM" {};
|
||||||
|
# case "DATAGRAM" {
|
||||||
|
# if ( $more !~ m/RECEIVE/ ) {
|
||||||
|
# die("Bogus DATAGRAM response.");
|
||||||
|
# }
|
||||||
|
# $self->_hashtv($extra);
|
||||||
|
# push @{ ${*$self}->{incomingdatagram } },
|
||||||
|
# [ ${*$self}->{DESTINATION},
|
||||||
|
# $self->_readblock(${*$self}->{SIZE}) ];
|
||||||
|
|
||||||
|
# };
|
||||||
|
# case "RAW" {
|
||||||
|
# if ( $more !~ m/RECEIVE/ ) {
|
||||||
|
# die("Bogus RAW response.");
|
||||||
|
# }
|
||||||
|
# $self->_hashtv($extra);
|
||||||
|
|
||||||
|
# push @{ $self->{incomingraw} }, $self->_readblock($self->{SIZE});
|
||||||
|
# };
|
||||||
|
# case "NAMING" {
|
||||||
|
# if ( $more !~ m/REPLY/ ) {
|
||||||
|
# die("Bogus NAMING response");
|
||||||
|
# }
|
||||||
|
# $self->_hashtv($extra);
|
||||||
|
# };
|
||||||
|
# case "DEST" {};
|
||||||
|
# }
|
||||||
|
# return 1;
|
||||||
|
# }
|
||||||
|
|
||||||
sub getfh {
|
sub getfh {
|
||||||
# Return the FH of the SAM socket so apps can select() or poll() on it
|
# Return the FH of the SAM socket so apps can select() or poll() on it
|
||||||
my $self = shift;
|
my $self = shift;
|
||||||
@@ -161,12 +291,14 @@ sub _readblock {
|
|||||||
|
|
||||||
sub _hashtv {
|
sub _hashtv {
|
||||||
my $self = shift;
|
my $self = shift;
|
||||||
my $extra = shift;
|
my $tvstring = shift;
|
||||||
|
my $tvhash;
|
||||||
|
|
||||||
while ( $extra=~ m/(\S+)=(\S+)/sg ) {
|
while ( $tvstring =~ m/(\S+)=(\S+)/sg ) {
|
||||||
${*$self}->{$1}=$2;
|
$tvhash->{$1}=$2;
|
||||||
print "$1=$2\n"
|
print "hashtv: $1=$2\n"
|
||||||
}
|
}
|
||||||
|
return $tvhash;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub DESTROY {
|
sub DESTROY {
|
||||||
|
Reference in New Issue
Block a user