socket是操作系统提供的通信层的一组抽象API
函数介绍
socket_create(int $domain , int $type , int $protocol)
正确时返回一个套接字(通讯),失败时返回 FALSE。要读取错误代码,可以调用 socket_last_error()。这个错误代码可以通过 socket_strerror() 读取文字的错误说明。
创建一个通讯节点,socket_create包含三个参数
- $domain 指定哪个协议用在当前套接字(通讯节点),有以下三种:
- AF_INET:IPv4网络协议。TCP 和 UDP 都可使用此协议。
- AF_INET6: IPv6网络协议。TCP 和 UDP 都可使用此协议。
- AF_UNIX: 本地通讯协议。具有高性能和低成本的 IPC(进程间通讯)。
- $type 选择套接字(通讯节点)使用的类型,有以下五种:
- SOCK_STREAM:提供一个顺序化的、可靠的、全双工的、基于连接的字节流。支持数据传送流量控制机制。TCP 协议即基于这种流式套接字。
- SOCK_DGRAM:提供数据报文的支持。(无连接,不可靠、固定最大长度).UDP协议即基于这种数据报文套接字。
- SOCK_SEQPACKET:提供一个顺序化的、可靠的、全双工的、面向连接的、固定最大长度的数据通信;数据端通过接收每一个数据段来读取整个数据包。
- SOCK_RAW:提供读取原始的网络协议。这种特殊的套接字可用于手工构建任意类型的协议。一般使用这个套接字来实现 ICMP 请求(例如 ping)。
- SOCK_RDM:提供一个可靠的数据层,但不保证到达顺序。一般的操作系统都未实现此功能。
- $protocol 设定$domain套接字(通讯节点)下的具体协议。这个值可以使用 getprotobyname() 函数进行读取。如果所需的协议是 TCP 或 UDP,可以直接使用常量 SOL_TCP 和 SOL_UDP 。
- icmp:Internet Control Message Protocol 主要用于网关和主机报告错误的数据通信。
- udp(SOL_UDP):User Datagram Protocol 是一个无连接的、不可靠的、具有固定最大长度的报文协议。
- tcp(SOL_TCP):Transmission Control Protocol 是一个可靠的、基于连接的、面向数据流的全双工协议
- $domain 指定哪个协议用在当前套接字(通讯节点),有以下三种:
socket_set_option ( resource $socket , int $level , int $optname , mixed $optval )
成功时返回 TRUE, 或者在失败时返回 FALSE。
设置套接字的套接字选项
- $socket: 套接节(通讯节点)
- $level: 指定选项所在的协议级别
- $optname: 可用的套接字选项与 socket_get_option() 选项相同
- $optval: 可选项值
socket_read ( resource $socket , int $length [, int $type = PHP_BINARY_READ ] ) 从套接字读取最大长度字节
- $socket: 套接节(通讯节点)
- $length: socket资源中的buffer的长度
- $type: 可选类型参数
- PHP_BINARY_READ 默认值,安全的读取二进制数
- PHP_NORMAL_READ 读取停止
socket_getpeername ( resource $socket , string &$address [, int &$port ] ) 查询远端套接字
- $socket: 套接节(通讯节点)
- $address: 查询地址
- $port: 查询端口(非必填)
socket_recv ( resource $socket , string &$buf , int $len , int $flags ) 从已连接的socket接收数据
- $socket: 套接节(通讯节点)
- $buf: 从socket中获取的数据将被保存在由 buf 制定的变量中
- $len: 长度最多为 len 字节的数据将被接收
- $flags: 可以为下列任意flag的组合:
- MSG_OOB 处理超出边界的数据
- MSG_PEEK 从接受队列的起始位置接收数据,但不将他们从接受队列中移除。
- MSG_WAITALL 在接收到至少 len 字节的数据之前,造成一个阻塞,并暂停脚本运行(block)
- MSG_DONTWAIT 如果制定了该flag,函数将不会造成阻塞,即使在全局设置中指定了阻塞设置
socket_select ( array &$read , array &$write , array &$except , int $tv_sec [, int $tv_usec = 0 ] ) 系统调用给定的套接字数组并指定超时
- $read: 监听读取数组中列出的套接字
- $write: 将监听写入数组中列出的套接字以查看写入是否不会阻塞
- $except: 将监视except数组中列出的套接字是否有异常。
- $tv_sec: tv_sec和tv_usec一起形成超时参数。 超时是socket_select()返回之前经过的时间量的上限。 tv_sec可能为零,导致socket_select()立即返回。 这对轮询非常有用。 如果tv_sec为NULL(无超时),则socket_select()可以无限期地阻塞。
- $tv_usec 同上
socket_accept ( resource $socket ) 接受套接字上的连接
socket_write ( resource $socket , string $buffer [, int $length = 0 ] ) 写入套接字
- $socket: 套接节(通讯节点)
- $buffer:要写入的缓冲区。
- $length:可选参数 长度
socket_close
TODO : 关闭 socket 资源 函数原型: void socket_close ( resource $socket )
- socket: socket_accept或者socket_create产生的资源,不能用于stream资源的关闭
stream_socket_server
由于创建一个SOCKET的流程总是 socket、bind、listen,所以PHP提供了一个非常方便的函数一次性创建、绑定端口、监听端口
函数原型: resource stream_socket_server ( string $local_socket [, int &$errno [, string &$errstr [, int $flags = STREAM_SERVER_BIND | STREAM_SERVER_LISTEN [, resource $context ]]]] )
- local_socket: 协议名://地址:端口号
- errno: 错误码
- errstr: 错误信息
- flags: 只使用该函数的部分功能
- context: 使用stream_context_create函数创建的资源流上下文
socket通信示例
<?php
class SocketServer
{
protected $address;
protected $port;
public function __construct($address = '127.0.0.1', $port = '8080')
{
$this->address = $address;
$this->port = $port;
}
public function startSocket()
{
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_bind($socket, $this->address, $this->port);
socket_listen($socket);
for ( ; ; ) {
$conn = socket_accept($socket);
$write_buffer = "HTTP/1.0 200 OK\r\nServer: my_server\r\nContent-Type: text/html; charset=utf-8\r\n\r\n{'code':100020,'data':{'name':'艾瑞可erik','url':'https://erik.xyz'},'msg':'ok'}";
socket_write($conn, $write_buffer);
socket_close($conn);
}
}
public function run(){
$this->startSocket();
}
}
$sock=new SocketServer();
$sock->run();
运行结果如下:
也可以用stream,即流集成实现。
<?php
class StreamSocketServer
{
protected $address;
protected $port;
public function __construct($address='127.0.0.1',$port=8080)
{
$this->address=$address;
$this->port=$port;
}
public function startSocket(){
$socket=stream_socket_server("tcp://".$this->address.":".$this->port,$errno,$errstr);
$data="{'code':100020,'msg':'ok','data':{'name':'艾瑞可erik','url':'https://erik.xyz'}}";
for ( ; ; ) {
$conn = stream_socket_accept($socket);
$write_buffer = "HTTP/1.0 200 OK\r\nServer: my_server\r\nContent-Type: text/html; charset=utf-8\r\n\r\n".$data;
fwrite($conn, $write_buffer);
fclose($conn);
}
}
public function run(){
$this->startSocket();
}
}
$socket=new StreamSocketServer();
$socket->run();
多进程
多进程示例
<?php
header("Content-type:text/html;charset=utf-8");
class ProcessTest
{
public function add(){
$pid=pcntl_fork();
if($pid){
echo "这是一个父进程\n";
pcntl_waitpid($pid,$status);
}elseif ($pid==0){
echo "这是子进程\n";
}else{
die("进程结束\n");
}
}
public function run(){
$this->add();
}
}
$process=new ProcessTest();
$process->run();
运行的效果
pcntl_fork
函数原型: int pcntl_fork ( void )
执行该函数,会复制当前进程产生另一个进程,称之为当前进程的子进程,该函数在父进程和子进程的返回值不相同,在父进程中返回的是fork出的子进程的进程ID,而在子进程中返回值为0。
要注意的是在复制进程时,会复制该进程的数据(堆数据、栈数据和静态数据),包括在父进程打开的文件描述符,在子进程中也是打开的,这意味着当你在父进程使用了大量内存时,fork出来的子进程必须拥有等量的内存资源,否则可能会导致fork失败。
pcntl_waitpid
函数原型: int pcntl_waitpid ( int $pid , int &$status [, int $options = 0 ] )
- pid: 进程ID
- status: 子进程的退出状态
- option: 取决于操作系统是否提供wait3函数,如果提供该函数,则该选项参数才生效.
上面的进程还是有缺陷,处理多任务时并不太合适。
就用一个非常简单的leader-follower模型,创建一个进程池,随机选出一个进程作为leader进程,该进程监听是否有新连接,如果有则提升另一个follower为leader进程来继续监听,而原leader进程则去处理新连接的请求
socket多任务示例
<?php
class StreamSocketServer
{
protected $address;
protected $port;
public function __construct($address='127.0.0.1',$port=8080)
{
$this->address=$address;
$this->port=$port;
}
public function startSocket(){
$socket=stream_socket_server("tcp://".$this->address.":".$this->port,$errno,$errstr);
$data="{'code':100020,'msg':'ok','data':{'name':'艾瑞可erik','url':'https://erik.xyz'}}";
$pids=[];
for($i=0;$i<10;$i++){
$pid=pcntl_fork();
$pids[]=$pid;
if($pid==0){
for ( ; ; ) {
$conn = stream_socket_accept($socket);
$write_buffer = "HTTP/1.0 200 OK\r\nServer: my_server\r\nContent-Type: text/html; charset=utf-8\r\n\r\n".$data;
fwrite($conn, $write_buffer);
fclose($conn);
}
exit("结束了\n");
}
}
foreach ($pids as $pid){
pcntl_waitpid($pid,$status);
}
}
public function run(){
$this->startSocket();
}
}
$socket=new StreamSocketServer();
$socket->run();
运行结果
运行10个任务没问题,但是多进程是消耗cpu资源的,如果任务多,进程不断的增加,服务器是无法承受的。这时候多进程处理大的并发就不合适了。那就用IO复用。
IO复用
阻塞/非阻塞
这两个概念是针对 IO 过程中进程的状态来说的,阻塞 IO 是指调用结果返回之前,当前线程会被挂起;相反,非阻塞指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。
同步/异步
这两个概念是针对调用如果返回结果来说的,所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回;相反,当一个异步过程调用发出后,调用者不能立刻得到结果,实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。
阻塞与非阻塞
在介绍IO复用技术之前,先介绍一下阻塞和非阻塞,在我们前几节的WEB服务器中,调用socket_accept函数会使整个进程阻塞,直到有新连接,操作系统才唤醒进程继续执行。而非阻塞模式, stream_socket_accept的行为就不一样了,如果没有新连接,不会阻塞进程,而是马上返回false。
I/O多路复用
多路复用(IO/Multiplexing):为了提高数据信息在网络通信线路中传输的效率,在一条物理通信线路上建立多条逻辑通信信道,同时传输若干路信号的技术就叫做多路复用技术。对于 Socket 来说,应该说能同时处理多个连接的模型都应该被称为多路复用,目前比较常用的有 select/poll/epoll/kqueue 这些 IO 模型(目前也有像 Apache 这种每个连接用单独的进程/线程来处理的 IO 模型,但是效率相对比较差,也很容易出问题,所以暂时不做介绍了)。在这些多路复用的模式中,异步阻塞/非阻塞模式的扩展性和性能最好。
select 轮询
使用select会轮询连接池,当有连接可读或可写时,select函数返回可读写的连接数,然后再轮询一遍连接池,查找活动连接进行读写操作
socket_select只支持socket类型的资源,而不支持stream类型的资源,所以这里需要使用socket_create创建socket资源
select轮询示例
<?php
class SocketServer
{
protected $address;
protected $port;
public function __construct($address = '127.0.0.1', $port = '8080')
{
$this->address = $address;
$this->port = $port;
}
public function startSocket()
{
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_bind($socket, $this->address, $this->port);
socket_listen($socket);
$reads = [];
$clients = [];
$writes = null;
$exceptions = null;
socket_set_nonblock($socket);
$write_buffer = "HTTP/1.0 200 OK\r\nServer: my_server\r\nContent-Type: text/html; charset=utf-8\r\n\r\n{'code':100,'data':{'name':'艾瑞可erik'},'msg':'ok'}";
for (; ;) {
$reads = array_merge(array($socket), $clients);
$activity_counts = socket_select($reads, $writes, $exceptions, 0);
if ($activity_counts > 0) {
if (($conn = socket_accept($socket)) !== false) {
$clients[] = $conn;
}
}
$this->client($clients,$write_buffer);
}
}
/**
* @param $clients
* @param $write_buffer
*/
public function client($clients,$write_buffer){
$length = count($clients);
for ($i = 0; $i < $length; $i++) {
$client = $clients[$i];
if (($read_buff = socket_read($client, 1024))!=false) {
socket_write($client, $write_buffer);
socket_close($client);
break;
}
}
}
public function run()
{
$this->startSocket();
}
}
$sock = new SocketServer();
$sock->run();
select虽然可以监听多个连接,但是它最多只能监听1024个连接。这虽然在poll中得到了改进,但是select和poll本质上都是通过轮询的方式进行监听,这意味着当监听了上万连接时,就算只有一个连接是活动的,依然要把上万连接都遍历一次。显然,这无疑是极大的性能浪费,而epoll的出现彻底地解决了这个问题
epoll
epoll并不是只有一个函数来实现,而是多个函数。我们这里并不讨论epoll相关的函数,因为PHP并不提供相关的函数,但它提供了基于libevent库的libevent扩展,以及基于libevent库的event扩展。libevent库实现了Reactor模型,关于Reactor模型,这里只作简单的介绍Reactor模型,包含了几个组件:句柄,事件分发器,事件处理器。
- 句柄,就是文件描述符,在Socket编程中,就是使用socket_create创建的socket资源.
- 事件分发器, 通过事件循环,事件循环是通过诸如epoll
Select
Poll等IO复用技术实现的,监听句柄期待的事件是否发生,发生了则将事件分发给事件处理器. - 事件处理器,当事件发生时,处理相关的逻辑.
libevent库已经实现了Reactor模型,安装event扩展即可。
示例
<?php
$address = '127.0.0.1';
$port = 8080;
//创建句柄
$data = "{'code':100020,'msg':'ok','data':{'name':'艾瑞可erik','url':'https://erik.xyz'}}";
$write_buffer = "HTTP/1.0 200 OK\r\nServer: my_server\r\nContent-Type: text/html; charset=utf-8\r\n\r\n" . $data;
$socket = @stream_socket_server("tcp://" . $address . ":" . $port, $errno, $errstr);
stream_set_blocking($socket, 0);
//创建事件循环器
$event_base = new EventBase();
//创建事件,并指定事件监听的事件类型及注册事件处理器
$event = new Event($event_base, $socket, Event::READ | Event::PERSIST, function ($socket) use (&$event_base, $write_buffer) {
$conn = stream_socket_accept($socket);
fwrite($conn, $write_buffer);
fclose($conn);
}, $socket);
//向循环器中添加事件
$event->add();
$event_base->loop();
在浏览器运行请求,或者用工具检测。
运行结果
这时需要考虑一个问题:如果进程断了怎么办?
那就需要进程守护。
一般PPID为0的,都是内核态进程。一般PPID为1的都是守护进程
守护进程创建的标准流程
让WEB服务器进程变为守护进程,成为守护进程有几个标准的步骤:- 设置文件创建掩码,一般设置为0,umask(0)
- pcntl_fork一个子进程,并马上退出,这样做的目的是让子进程继承进程组ID并获取一个新的进程ID,这样就可以确保子进程一定不是进程组组长,因为进程组组长不能创建新会话
- posix_setsid创建新会话和新进程组,并成为会话组长和进程组组长,并和原来的控制终端脱离关系,这样该进程就不会被原来终端的控制信号中断
- pcntl_fork,再fork一次并不是必须的,只是在基于System-V的系统上,有人建议再fork一次,避免打开终端设备,使程序的通用性更强。
守护进程示例
<?php
header('Content-type:text/html;charset:utf-8');
//守护进程
function daemon(){
umask(0);
//创建进程,并退出进程
if(pcntl_fork()){
exit("退出进程\n");
}
//创建新的会话和进程组,并退出原来的控制端
posix_setsid();
//再次创建进程,并退出
if(pcntl_fork()){
exit("再次创建进程,并退出\n");
}
}
daemon();
$address = '127.0.0.1';
$port = 8080;
//创建句柄
$data = "{'code':100020,'msg':'ok','data':{'name':'艾瑞可erik','url':'https://erik.xyz'}}";
$write_buffer = "HTTP/1.0 200 OK\r\nServer: my_server\r\nContent-Type: text/html; charset=utf-8\r\n\r\n" . $data;
$socket = @stream_socket_server("tcp://" . $address . ":" . $port, $errno, $errstr);
stream_set_blocking($socket, 0);
//创建事件循环器
$event_base = new EventBase();
//创建事件,并指定事件监听的事件类型及注册事件处理器
$event = new Event($event_base, $socket, Event::READ | Event::PERSIST, function ($socket) use (&$event_base, $write_buffer) {
$conn = stream_socket_accept($socket);
fwrite($conn, $write_buffer);
fclose($conn);
}, $socket);
//向循环器中添加事件
$event->add();
$event_base->loop();
运行结果
那么守护进程有了,如何重启、关闭呢。那就调用函数发送信号
posix_kill
函数原型: bool posix_kill ( int $pid , int $sig )- pid: 进程ID
- sig: 系统预定义的信号常量
pcntl_signal
函数原型: bool pcntl_signal ( int $signo , callback $handler [, bool $restart_syscalls = true ] )- signo: 系统预定义的信号常量
- handler: 信号处理器,一个回调函数
- restart_syscalls: 当进程在进行系统调用时,被信号中断时,系统调用是否重新调用,一般默认为true
那么根据以上几个步骤,我做个整合。完整版的socket进程及多进程控制
EventServer
<?php
class EventServer
{
public $event_base;
public $events = [];
public function __construct()
{
$this->event_base = new EventBase();
}
public function add($fd, $what, $callback, $callback_arg)
{
$event = new Event($this->event_base, $fd, $what, $callback, $callback_arg);
$this->events[intval($fd)] = $event;
$event->add();
}
public function remove($fb)
{
$event = $this->events[intval($fb)];
$event->free();
}
public function loop()
{
$this->event_base->loop();
}
}
StreamServer
<?php
require "EventServer.php";
class StreamServer
{
protected $ip = '127.0.0.1';
protected $port = 8080;
protected $path = './pid.txt';
protected $event;
protected $data = "{'code':100020,'msg':'ok','data':{'name':'艾瑞可erik','url':'https://erik.xyz'}}";
protected $write_buffer = "HTTP/1.0 200 OK\r\nServer: my_server\r\nContent-Type: text/html; charset=utf-8\r\n\r\n";
public static function daemon()
{
umask(0);
$pid = pcntl_fork();
if ($pid) {
exit(0);
} elseif ($pid < 0) {
die("进程启动失败\n");
}
$sid = posix_setsid();
$pid = pcntl_fork();
if ($pid) {
exit(0);
} elseif ($pid < 0) {
die("进程启动失败\n");
}
if ($sid < 0) {
die("创建服务失败\n");
}
}
public function __construct($ip, $port = 80)
{
$this->ip = $ip;
$this->port = $port;
$this->event = new EventServer();
}
/**
* 启动
*/
public function run()
{
if ($GLOBALS['argc'] > 1) {
$this->sendSignal();
exit(0);
} else {
self::daemon();
}
$this->installSignalHandler();
$this->recordPid();
$this->start();
}
//存储信号
public function sendSignal()
{
if (posix_kill($this->getPid(), 0)) {
if (strpos($GLOBALS['argv'][1], "stop") !== false) {
posix_kill($this->getPid(), SIGUSR1);
}
}
}
//启动进程
public function start()
{
$domain = sprintf("tcp://%s:%d", $this->ip, $this->port);
$fd = stream_socket_server($domain, $errno, $errstr);
if (!$fd) {
die("$errno $errstr\n");
}
stream_set_blocking($fd, 0);
$this->event->add($fd, Event::READ | Event::PERSIST, [$this, 'requestHandler'], $fd);
$this->event->loop();
}
/**响应信息
* @param $fd
*/
public function requestHandler($fd)
{
$write_buffer = $this->write_buffer . $this->data;
$conn = stream_socket_accept($fd);
fwrite($conn, $write_buffer);
fclose($conn);
}
//添加信号
public function installSignalHandler()
{
$this->event->add(SIGUSR1, Event::SIGNAL, [$this, "handler"], SIGUSR1);
}
/**终止信号
* @param $signo
*/
public function handler($signo)
{
switch ($signo) {
default:
case SIGUSR1:
$this->event->remove($signo);
$this->stop();
break;
}
}
public function stop()
{
exit("终止信号\n");
}
public function getPid()
{
return file_get_contents($this->path);
}
private function recordPid()
{
file_put_contents($this->path, posix_getpid());
}
}
$server = new StreamServer("127.0.0.1", 8080);
$server->run();
运行结果
本文链接: https://erik.xyz/2019/09/18/socket-jin-cheng-tong-xin-ji-zhi/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!