Workerman UDP 服务器的流程和概念
先运行php 1.php start
在运行 php 2.php start
1.php
/**
* Workerman UDP 服务器示例
* 演示 UDP 协议的基本使用
*/
require_once __DIR__ . '/vendor/autoload.php';
use WorkermanWorker;
// 创建 UDP Worker,监听 8080 端口
$udp_worker = new Worker('udp://0.0.0.0:8080');
// 设置进程名称
$udp_worker->name = 'UDPServer';
// 设置进程数(UDP 可以多进程处理)
$udp_worker->count = 1;
// 统计信息
$stats = [
'total_packets' => 0,
'total_bytes' => 0,
'clients' => [],
'start_time' => 0,
];
// Worker 启动时
$udp_worker->onWorkerStart = function($worker) use (&$stats) {
$stats['start_time'] = time();
echo "
";
echo "╔════════════════════════════════════════╗
";
echo "║ Workerman UDP 服务器 ║
";
echo "╚════════════════════════════════════════╝
";
echo "服务器启动成功!
";
echo "监听地址: udp://0.0.0.0:8080
";
echo "进程 ID: " . posix_getpid() . "
";
echo "启动时间: " . date('Y-m-d H:i:s') . "
";
echo "========================================
";
echo "⚠ UDP 特性提示:
";
echo " • 无连接状态(无 onConnect/onClose)
";
echo " • 不保证送达(可能丢包)
";
echo " • 不保证顺序(可能乱序)
";
echo " • 低延迟、高性能
";
echo "========================================
";
echo "等待 UDP 数据包...
";
};
// 收到 UDP 数据包时触发
$udp_worker->onMessage = function($connection, $data) use (&$stats) {
// 更新统计信息
$stats['total_packets']++;
$stats['total_bytes'] += strlen($data);
// 获取客户端地址
$remote_address = $connection->getRemoteAddress();
$remote_ip = $connection->getRemoteIp();
$remote_port = $connection->getRemotePort();
// 记录客户端
if (!isset($stats['clients'][$remote_address])) {
$stats['clients'][$remote_address] = [
'first_seen' => time(),
'packet_count' => 0,
];
}
$stats['clients'][$remote_address]['packet_count']++;
$stats['clients'][$remote_address]['last_seen'] = time();
echo "====================================
";
echo "收到 UDP 数据包 #{$stats['total_packets']}
";
echo "来源地址: {$remote_address}
";
echo "来源 IP: {$remote_ip}
";
echo "来源端口: {$remote_port}
";
echo "数据长度: " . strlen($data) . " 字节
";
echo "数据内容: " . trim($data) . "
";
echo "接收时间: " . date('Y-m-d H:i:s') . "
";
// 尝试解析 JSON
$json = json_decode($data, true);
if ($json) {
echo "JSON 解析:
";
print_r($json);
}
// 处理不同类型的请求
$response = processUdpRequest($data, $stats);
// 发送响应(UDP 也可以回复)
$connection->send($response);
echo "已发送响应: " . strlen($response) . " 字节
";
echo "====================================
";
};
/**
* 处理 UDP 请求
*/
function processUdpRequest($data, &$stats) {
// 尝试解析 JSON
$request = json_decode($data, true);
if ($request && isset($request['type'])) {
switch ($request['type']) {
case 'ping':
// 心跳检测
return json_encode([
'type' => 'pong',
'timestamp' => microtime(true),
'server_time' => date('Y-m-d H:i:s')
]);
case 'stats':
// 返回统计信息
return json_encode([
'type' => 'stats',
'data' => [
'total_packets' => $stats['total_packets'],
'total_bytes' => $stats['total_bytes'],
'client_count' => count($stats['clients']),
'uptime' => time() - $stats['start_time'],
]
]);
case 'echo':
// 回显消息
return json_encode([
'type' => 'echo',
'data' => $request['data'] ?? ''
]);
default:
return json_encode([
'type' => 'error',
'message' => 'Unknown request type'
]);
}
}
// 纯文本回显
return "UDP Echo: {$data}";
}
// Worker 停止时
$udp_worker->onWorkerStop = function($worker) use (&$stats) {
echo "
";
echo "========================================
";
echo "服务器正在关闭...
";
echo "总接收数据包: {$stats['total_packets']}
";
echo "总接收字节: {$stats['total_bytes']}
";
echo "服务过的客户端: " . count($stats['clients']) . "
";
echo "运行时长: " . (time() - $stats['start_time']) . " 秒
";
echo "========================================
";
};
// 运行
Worker::runAll();
2.php
/**
* UDP 客户端测试工具
*/
echo "╔════════════════════════════════════════╗
";
echo "║ UDP 客户端测试工具 ║
";
echo "╚════════════════════════════════════════╝
";
$server_ip = '127.0.0.1';
$server_port = 8080;
// 创建 UDP Socket
$socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
if (!$socket) {
die("❌ 创建 Socket 失败: " . socket_strerror(socket_last_error()) . "
");
}
echo "✓ Socket 创建成功
";
echo "目标服务器: {$server_ip}:{$server_port}
";
echo "========================================
";
// 设置超时
socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, ['sec' => 2, 'usec' => 0]);
// 测试用例
$tests = [
[
'name' => '纯文本消息',
'data' => 'Hello UDP Server!'
],
[
'name' => 'Ping 请求',
'data' => json_encode(['type' => 'ping'])
],
[
'name' => 'Echo 请求',
'data' => json_encode([
'type' => 'echo',
'data' => 'Test message 123'
])
],
[
'name' => '统计信息请求',
'data' => json_encode(['type' => 'stats'])
],
[
'name' => '中文消息',
'data' => '你好,UDP 服务器!'
],
];
foreach ($tests as $index => $test) {
echo "【测试 #" . ($index + 1) . "】{$test['name']}
";
echo "------------------------------------
";
// 发送数据
$sent = socket_sendto(
$socket,
$test['data'],
strlen($test['data']),
0,
$server_ip,
$server_port
);
if ($sent === false) {
echo "❌ 发送失败: " . socket_strerror(socket_last_error($socket)) . "
";
continue;
}
echo "发送: {$test['data']}
";
echo "✓ 已发送 {$sent} 字节
";
// 接收响应
$response = '';
$from_ip = '';
$from_port = 0;
$received = @socket_recvfrom(
$socket,
$response,
8192,
0,
$from_ip,
$from_port
);
if ($received === false) {
echo "⚠ 未收到响应(超时或丢包)
";
} else {
echo "收到响应: {$response}
";
echo "✓ 接收 {$received} 字节(来自 {$from_ip}:{$from_port})
";
// 尝试解析 JSON
$json = json_decode($response, true);
if ($json) {
echo "JSON 数据:
";
print_r($json);
}
}
echo "
";
usleep(300000); // 300ms 间隔
}
// 性能测试
echo "【性能测试】发送 1000 个数据包
";
echo "------------------------------------
";
$start_time = microtime(true);
$success_count = 0;
$lost_count = 0;
for ($i = 0; $i < 1000; $i++) {
$data = "Packet #{$i}";
$sent = socket_sendto($socket, $data, strlen($data), 0, $server_ip, $server_port);
if ($sent !== false) {
$success_count++;
}
}
$duration = microtime(true) - $start_time;
$pps = 1000 / $duration; // Packets per second
echo "发送完成!
";
echo "总数据包: 1000
";
echo "发送成功: {$success_count}
";
echo "发送失败: " . (1000 - $success_count) . "
";
echo "耗时: " . round($duration, 3) . " 秒
";
echo "速率: " . round($pps, 2) . " 包/秒
";
echo "平均延迟: " . round($duration / 1000 * 1000, 3) . " ms
";
// 关闭 Socket
socket_close($socket);
echo "========================================
";
echo "测试完成!
";
📋 整体架构
这是一个基于 WorkerMan 框架的 UDP 服务器示例,包含:
- UDP 服务器(监听并处理数据包)
- 测试客户端(发送各种请求测试服务器)
🔄 完整工作流程
1️⃣ 服务器启动阶段
$udp_worker = new Worker(‘udp://0.0.0.0:8080’);
- 创建 UDP Worker,监听所有网卡的 8080 端口
- UDP 协议特点:无连接、不可靠、快速
udpworker−>onWorkerStart=function(udp_worker->onWorkerStart = function(udpworker−>onWorkerStart=function(worker) { … }
- Worker 进程启动时执行
- 初始化统计数据(数据包数、字节数、客户端列表)
- 打印启动信息和 UDP 特性提示
2️⃣ 接收数据阶段
udpworker−>onMessage=function(udp_worker->onMessage = function(udpworker−>onMessage=function(connection, $data) { … }
每收到一个 UDP 数据包时触发,执行以下步骤:
步骤 A:更新统计信息
$stats[‘total_packets’]++; // 总数据包数 +1
stats[′totalbytes′]+=strlen(stats['total_bytes'] += strlen(stats[′totalbytes′]+=strlen(data); // 累计字节数
步骤 B:记录客户端信息
$remote_address = $connection->getRemoteAddress(); // 获取客户端地址
stats[′clients′][stats['clients'][stats[′clients′][remote_address] = […]; // 记录首次/最后访问时间
步骤 C:打印接收详情
- 数据包编号
- 来源 IP 和端口
- 数据内容和长度
- 接收时间
- JSON 解析结果(如果是 JSON 格式)
步骤 D:处理请求并响应
response=processUdpRequest(response = processUdpRequest(response=processUdpRequest(data, $stats);
connection−>send(connection->send(connection−>send(response);
3️⃣ 请求处理逻辑
processUdpRequest() 函数根据请求类型分发处理:
| 请求类型 | 功能 | 响应内容 |
|---|---|---|
| ping | 心跳检测 | {“type”:“pong”, “timestamp”:…, “server_time”:…} |
| stats | 获取统计信息 | {“type”:“stats”, “data”:{总包数, 总字节, 客户端数, 运行时长}} |
| echo | 回显消息 | {“type”:“echo”, “data”:“原始数据”} |
| 其他 | 未知类型 | {“type”:“error”, “message”:“Unknown request type”} |
| 纯文本 | 非 JSON 数据 | “UDP Echo: {原始数据}” |
4️⃣ 服务器关闭阶段
udpworker−>onWorkerStop=function(udp_worker->onWorkerStop = function(udpworker−>onWorkerStop=function(worker) { … }
- 打印最终统计信息
- 显示总数据包数、总字节数、服务客户端数、运行时长
🧪 测试客户端流程
客户端初始化
socket=socketcreate(AFINET,SOCKDGRAM,SOLUDP);socketsetoption(socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); socket_set_option(socket=socketcreate(AFINET,SOCKDGRAM,SOLUDP);socketsetoption(socket, SOL_SOCKET, SO_RCVTIMEO, [‘sec’=>2, ‘usec’=>0]);
- 创建 UDP socket
- 设置接收超时为 2 秒(避免永久阻塞)
测试用例
客户端发送 4 种类型的测试请求:
- Ping 请求 → 测试连通性
- Echo 请求 → 测试数据回显
- 统计信息请求 → 获取服务器状态
- 中文消息 → 测试编码兼容性
发送-接收流程
socket_sendto($socket, data,...);//发送数据包socketrecvfrom(data, ...); // 发送数据包 socket_recvfrom(data,...);//发送数据包socketrecvfrom(socket, $response, …); // 接收响应
性能测试
发送 1000 个数据包,统计:
- 成功数量
- 丢包数量
- 总耗时
⚠️ UDP 关键特性
● 与 TCP 的区别
| 特性 | UDP | TCP |
|---|---|---|
| 连接 | 无连接(无需握手) | 面向连接(三次握手) |
| 可靠性 | 不保证送达 | 保证送达 |
| 顺序 | 可能乱序 | 保证顺序 |
| 速度 | 快速、低延迟 | 较慢 |
| 事件 | 只有 onMessage | 有 onConnect、onMessage、onClose |
UDP 适用场景
- 实时游戏(位置同步)
- 视频直播(少量丢包可接受)
- DNS 查询
- 物联网设备通信
🎯 数据流转示意图
客户端 服务器
│ │
├─发送 ping 请求─────────>│
│ ├─收到数据包
│ ├─更新统计
│ ├─processUdpRequest()
│ ├─返回 pong 响应
│<────────────────────────┤
├─接收响应 │
│ │
├─发送 stats 请求────────>│
│<────────────────────────┤
│ 返回统计数据 │
💡 关键代码说明
为什么使用 &$stats?
udpworker−>onMessage=function(udp_worker->onMessage = function(udpworker−>onMessage=function(connection, KaTeX parse error: Expected 'EOF', got '&' at position 12: data) use (&̲stats)
- & 表示引用传递,允许在闭包内修改外部变量
- 所有回调函数共享同一份统计数据
为什么 UDP 能 send() 响应?
connection−>send(connection->send(connection−>send(response);
- 虽然 UDP 无连接,但 WorkerMan 记录了客户端地址
- send() 实际是 sendto() 的封装,自动发送到来源地址







