abstract connect out into it's own function

git-svn-id: file:///home/svn/framework3/trunk@9617 4d416f70-5f16-0410-b530-b9f4589650da
unstable
James Lee 2010-06-25 00:39:48 +00:00
parent 01702e8506
commit 2638153f46
2 changed files with 59 additions and 44 deletions

View File

@ -312,6 +312,7 @@ function stdapi_sys_process_kill($req, &$pkt) {
if (!function_exists('stdapi_net_socket_tcp_shutdown')) { if (!function_exists('stdapi_net_socket_tcp_shutdown')) {
function stdapi_net_socket_tcp_shutdown($req, &$pkt) { function stdapi_net_socket_tcp_shutdown($req, &$pkt) {
global $channels; global $channels;
my_print("doing stdapi_net_socket_tcp_shutdown");
$cid_tlv = packet_get_tlv(TLV_TYPE_CHANNEL_ID, $req); $cid_tlv = packet_get_tlv(TLV_TYPE_CHANNEL_ID, $req);
$c = get_channel_by_id($cid_tlv['value']); $c = get_channel_by_id($cid_tlv['value']);
@ -360,27 +361,30 @@ function channel_create_stdapi_fs_file($req, &$pkt) {
if (!function_exists('channel_create_stdapi_net_tcp_client')) { if (!function_exists('channel_create_stdapi_net_tcp_client')) {
function channel_create_stdapi_net_tcp_client($req, &$pkt) { function channel_create_stdapi_net_tcp_client($req, &$pkt) {
global $channels, $readers; global $channels;
my_print("creating tcp client");
$peer_host_tlv = packet_get_tlv($req, TLV_TYPE_PEER_HOST); $peer_host_tlv = packet_get_tlv($req, TLV_TYPE_PEER_HOST);
$peer_port_tlv = packet_get_tlv($req, TLV_TYPE_PEER_PORT); $peer_port_tlv = packet_get_tlv($req, TLV_TYPE_PEER_PORT);
$local_host_tlv = packet_get_tlv($req, TLV_TYPE_LOCAL_HOST); $local_host_tlv = packet_get_tlv($req, TLV_TYPE_LOCAL_HOST);
$local_port_tlv = packet_get_tlv($req, TLV_TYPE_LOCAL_PORT); $local_port_tlv = packet_get_tlv($req, TLV_TYPE_LOCAL_PORT);
$retries_tlv = packet_get_tlv($req, TLV_TYPE_CONNECT_RETRIES); $retries_tlv = packet_get_tlv($req, TLV_TYPE_CONNECT_RETRIES);
if ($retries_tlv['value']) {
if (is_callable('socket_create')) { $retries = $retries_tlv['value'];
$sock=socket_create(AF_INET,SOCK_STREAM,SOL_TCP);
$res = socket_connect($sock, $peer_host_tlv['value'], $peer_port_tlv['value']);
if (!$res) {
return ERROR_FAILURE;
}
register_socket($sock);
} else { } else {
$sock = fsockopen($peer_host_tlv['value'], $peer_port_tlv['value']); $retries = 1;
}
for ($i = 0; $i < $retries; $i++) {
$sock = connect($peer_host_tlv['value'], $peer_port_tlv['value']);
if ($sock) {
break;
}
}
if (!$sock) { if (!$sock) {
return ERROR_FAILURE; return ERROR_FAILURE;
} }
register_stream($sock);
}
# #
# If we got here, the connection worked, respond with the new channel ID # If we got here, the connection worked, respond with the new channel ID
@ -390,7 +394,7 @@ function channel_create_stdapi_net_tcp_client($req, &$pkt) {
$id = count($channels) - 1; $id = count($channels) - 1;
my_print("Created new channel $sock, with id $id"); my_print("Created new channel $sock, with id $id");
packet_add_tlv($pkt, create_tlv(TLV_TYPE_CHANNEL_ID, $id)); packet_add_tlv($pkt, create_tlv(TLV_TYPE_CHANNEL_ID, $id));
array_push($readers, $sock); add_reader($sock);
return ERROR_SUCCESS; return ERROR_SUCCESS;
} }
} }

View File

@ -666,6 +666,33 @@ function register_stream($stream) {
$resource_type_map[(int)$stream] = 'stream'; $resource_type_map[(int)$stream] = 'stream';
} }
function connect($ipaddr, $port) {
my_print("Doing connect($ipaddr, $port)");
$sock = false;
# Prefer the stream versions so we don't have to use both select functions
# unnecessarily, but fall back to socket_create if they aren't available.
if (is_callable('stream_socket_client')) {
my_print("stream_socket_client");
$sock = stream_socket_client("tcp://{$ipaddr}:{$port}");
if (!$sock) { return false; }
register_stream($sock);
} else
if (is_callable('fsockopen')) {
my_print("fsockopen");
$sock = fsockopen($ipaddr,$port);
if (!$sock) { return false; }
register_stream($sock);
} elseif (is_callable('socket_create')) {
my_print("socket_create");
$sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
$res = socket_connect($sock, $ipaddr, $port);
if (!$res) { return false; }
register_socket($sock);
}
return $sock;
}
function close($resource) { function close($resource) {
my_print("Closing resource $resource"); my_print("Closing resource $resource");
global $readers, $resource_type_map; global $readers, $resource_type_map;
@ -758,7 +785,7 @@ function select(&$r, &$w, &$e, $tv_sec=0, $tv_usec=0) {
$n_sockets = count($sockets_r) + count($sockets_w) + count($sockets_e); $n_sockets = count($sockets_r) + count($sockets_w) + count($sockets_e);
$n_streams = count($streams_r) + count($streams_w) + count($streams_e); $n_streams = count($streams_r) + count($streams_w) + count($streams_e);
my_print("Selecting $n_sockets sockets and $n_streams streams with timeout $tv_sec.$tv_usec"); #my_print("Selecting $n_sockets sockets and $n_streams streams with timeout $tv_sec.$tv_usec");
$r = array(); $r = array();
$w = array(); $w = array();
$e = array(); $e = array();
@ -789,7 +816,7 @@ function select(&$r, &$w, &$e, $tv_sec=0, $tv_usec=0) {
if (is_array($e) && is_array($streams_e)) { $e = array_merge($e, $streams_e); } if (is_array($e) && is_array($streams_e)) { $e = array_merge($e, $streams_e); }
$count += $res; $count += $res;
} }
my_print(sprintf("total: $count, Modified counts: r=%s w=%s e=%s", count($r), count($w), count($e))); #my_print(sprintf("total: $count, Modified counts: r=%s w=%s e=%s", count($r), count($w), count($e)));
return $count; return $count;
} }
@ -839,35 +866,19 @@ if (!isset($msgsock)) {
# ipv6 requires brackets around the address # ipv6 requires brackets around the address
$ipaddr = "[".$ipaddr."]"; $ipaddr = "[".$ipaddr."]";
} }
if (is_callable('stream_socket_client')) { $msgsock = connect($ipaddr, $port);
$msgsock = stream_socket_client("tcp://{$ipaddr}:{$port}");
if (!$msgsock) { die(); } if (!$msgsock) { die(); }
$msgsock_type = 'stream'; } else {
} elseif (is_callable('fsockopen')) { switch ($msgsock_type) {
$msgsock = fsockopen($ipaddr,$port); case 'socket':
if (!$msgsock) { die(); }
$msgsock_type = 'stream';
} elseif (is_callable('socket_create')) {
$msgsock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
$res = socket_connect($msgsock, $ipaddr, $port);
if (!$res) { die(); }
$msgsock_type = 'socket';
} else {
die();
}
}
switch ($msgsock_type) {
case 'socket':
register_socket($msgsock); register_socket($msgsock);
break; break;
case 'stream': case 'stream':
# fall through # fall through
default: default:
register_stream($msgsock); register_stream($msgsock);
break; }
} }
add_reader($msgsock); add_reader($msgsock);
# #
@ -875,7 +886,7 @@ add_reader($msgsock);
# #
$r=$GLOBALS['readers']; $r=$GLOBALS['readers'];
while (false !== ($cnt = select($r, $w=null, $e=null, 1))) { while (false !== ($cnt = select($r, $w=null, $e=null, 1))) {
my_print(sprintf("Returned from select with %s readers", count($r))); #my_print(sprintf("Returned from select with %s readers", count($r)));
$read_failed = false; $read_failed = false;
for ($i = 0; $i < $cnt; $i++) { for ($i = 0; $i < $cnt; $i++) {
$ready = $r[$i]; $ready = $r[$i];