直接使用
// ! 测试开始
$prama = [
'get_list' => "par1",
'get_info' => "par2"
];
$back_data = [];
$wg = new \Swoole\Coroutine\WaitGroup();
foreach ($prama as $key => $value) {
go(function () use ($wg, $key, $value, &$back_data) {
$wg->add();
if ($key == 'get_list') {
\co::sleep(10);
} else {
\co::sleep(10);
}
$data = $key . '_data';
$back_data[$key] = $data;
// 该协程处理完了
$wg->done();
});
}
// 等待协程 如果是携程0.9就执行完了
$wg->wait(15);
// 返回协程的数据
return json($back_data);
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;
// ! 协程
$wg = new \Swoole\Coroutine\WaitGroup();
$channel = new \Swoole\Coroutine\Channel(800);
foreach ($list_data as $_key => $account_id) {
go(function () use (&$wg, &$channel, $account_id) {
$wg->add();
try {
// TODO: 操作数据
} catch (\Throwable $th) {
error_log($th->getMessage());
} finally {
$wg->done();
}
if (!empty($one_go_data)) {
$channel->push($one_go_data);
}
});
}
$wg->wait(); // 等待所有协程完成
$channel->close(); // 关闭Channel
// 从Channel中读取所有结果
$csvData = []; // 存储所有CSV行
while ($ongodata = $channel->pop()) {
$csvData[] = $ongodata;
}
csv转Excel
use Vtiful\Kernel\Excel;
use Vtiful\Kernel\Format;
// php bin/hyperf.php test csv2Xlsx
public function csv2Xlsx($csvPath = '')
{
// $csvPath = BASE_PATH . "/.vscode/20250806_213328_明细.csv";
$re_path = BASE_PATH . '/.vscode/';
$config = ['path' => $re_path]; // 输出目录
$excel = new Excel($config);
// 创建 Excel 文件对象
$filename = pathinfo($csvPath)['filename'] ?? time();
$file_name = "{$filename}.xlsx";
$fileObject = $excel->fileName($file_name);
// 打开 CSV 文件
$csvFile = fopen($csvPath, "r");
if ($csvFile === false) {
die("无法打开 CSV 文件");
}
// 读取第一行作为表头
$headers = fgetcsv($csvFile);
$fileObject->header($headers); // 写入表头
// 逐行读取并写入 Excel
$row_all = [];
while (($row = fgetcsv($csvFile)) !== false) {
$row_all[] = $row;
}
$fileObject->data($row_all);
// 关闭 CSV 文件
fclose($csvFile);
// 保存 Excel 文件
$fileObject->output();
return $re_path . $file_name;
}
协程数据迭代器
<?php
namespace App\Common\Service;
use function Swoole\Coroutine\go;
/**
* 协程数据迭代器
* 实例化这个类, 通过addTask()的方式对数据进行协程获取
* @author jcleng
*/
class ConcurrentDataFetcher
{
private $wg;
private $channel;
private $taskList = [];
private $errorList = []; // 每个协程的异常message
/**
* 底层使用 PHP 引用计数来保存变量,缓存区只需要占用 $capacity * sizeof(zval) 字节的内存,PHP7 版本下 zval 为 16 字节,如 $capacity = 1024 时,Channel 最大将占用 16K 内存
*
* @param integer $capacity
* @author jcleng
*/
public function __construct($capacity = 1024)
{
$this->wg = new \Swoole\Coroutine\WaitGroup();
$this->channel = new \Swoole\Coroutine\Channel($capacity);
}
/**
* 添加任务
*
* @param \Closure $function
* @return void
* @author jcleng
*/
public function addTask(\Closure $function)
{
array_push($this->taskList, $function);
}
/**
* 执行所有任务并获取最终的任务结果
*
* @return array
* @author jcleng
*/
public function execute()
{
foreach ($this->taskList as $_key => $function) {
go(function () use ($function) {
$this->wg->add();
try {
$one_go_data = null;
$one_go_data = $function();
if (!empty($one_go_data)) {
$res_push = $this->channel->push($one_go_data, -1);
if (empty($res_push)) {
throw new \Exception('channel push error: ' . $this->channel->errCode);
}
}
} catch (\Throwable $th) {
error_log($th->getMessage());
array_push($this->errorList, $th);
} finally {
$this->wg->done();
}
});
}
$this->wg->wait();
$this->channel->close();
if (!empty($this->errorList)) {
throw $this->errorList[0];
}
// 从Channel中读取所有结果
$retData = []; // 存储所有CSV行
while ($ongodata = $this->channel->pop()) {
$retData[] = $ongodata;
}
return $retData;
}
}