直接使用

      // ! 测试开始
      $prama = [
        'get_list' => "par1",
        'get_info' => "par2"
      ];
      $back_data = [];
      $wg = new \Swoole\Coroutine\WaitGroup();
      foreach ($prama as $key => $value) {
        go(function () use ($wg, $key, $value, &$back_data) {
          $wg->add();
          if ($key == 'get_list') {
            \co::sleep(10);
          } else {
            \co::sleep(10);
          }
          $data = $key . '_data';
          $back_data[$key] = $data;
          // 该协程处理完了
          $wg->done();
        });
      }
      // 等待协程 如果是携程0.9就执行完了
      $wg->wait(15);
      // 返回协程的数据
      return json($back_data);
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;

// ! 协程
$wg = new \Swoole\Coroutine\WaitGroup();
$channel = new \Swoole\Coroutine\Channel(800);


foreach ($list_data as $_key => $account_id) {
    go(function () use (&$wg, &$channel, $account_id) {
        $wg->add();
        try {
            // TODO: 操作数据
        } catch (\Throwable $th) {
            error_log($th->getMessage());
        } finally {
            $wg->done();
        }
        if (!empty($one_go_data)) {
            $channel->push($one_go_data);
        }
    });
}

$wg->wait(); // 等待所有协程完成
$channel->close(); // 关闭Channel
// 从Channel中读取所有结果
$csvData = []; // 存储所有CSV行
while ($ongodata = $channel->pop()) {
    $csvData[] = $ongodata;
}
  • csv转Excel

use Vtiful\Kernel\Excel;
use Vtiful\Kernel\Format;
// php bin/hyperf.php test csv2Xlsx
public function csv2Xlsx($csvPath = '')
{
    // $csvPath = BASE_PATH . "/.vscode/20250806_213328_明细.csv";
    $re_path = BASE_PATH . '/.vscode/';
    $config = ['path' => $re_path]; // 输出目录
    $excel = new Excel($config);

    // 创建 Excel 文件对象
    $filename = pathinfo($csvPath)['filename'] ?? time();
    $file_name = "{$filename}.xlsx";
    $fileObject = $excel->fileName($file_name);

    // 打开 CSV 文件
    $csvFile = fopen($csvPath, "r");

    if ($csvFile === false) {
        die("无法打开 CSV 文件");
    }

    // 读取第一行作为表头
    $headers = fgetcsv($csvFile);
    $fileObject->header($headers); // 写入表头

    // 逐行读取并写入 Excel
    $row_all = [];
    while (($row = fgetcsv($csvFile)) !== false) {
        $row_all[] = $row;
    }
    $fileObject->data($row_all);

    // 关闭 CSV 文件
    fclose($csvFile);

    // 保存 Excel 文件
    $fileObject->output();
    return $re_path . $file_name;
}
  • 协程数据迭代器

<?php

namespace App\Common\Service;
use function Swoole\Coroutine\go;

/**
 * 协程数据迭代器
 * 实例化这个类, 通过addTask()的方式对数据进行协程获取
 * @author jcleng
 */
class ConcurrentDataFetcher
{
    private $wg;
    private $channel;
    private $taskList = [];
    private $errorList = []; // 每个协程的异常message

    /**
     * 底层使用 PHP 引用计数来保存变量,缓存区只需要占用 $capacity * sizeof(zval) 字节的内存,PHP7 版本下 zval 为 16 字节,如 $capacity = 1024 时,Channel 最大将占用 16K 内存
     *
     * @param integer $capacity
     * @author jcleng
     */
    public function __construct($capacity = 1024)
    {
        $this->wg = new \Swoole\Coroutine\WaitGroup();
        $this->channel = new \Swoole\Coroutine\Channel($capacity);
    }
    /**
     * 添加任务
     *
     * @param \Closure $function
     * @return void
     * @author jcleng
     */
    public function addTask(\Closure $function)
    {
        array_push($this->taskList, $function);
    }
    /**
     * 执行所有任务并获取最终的任务结果
     *
     * @return array
     * @author jcleng
     */
    public function execute()
    {
        foreach ($this->taskList as $_key => $function) {
            go(function () use ($function) {
                $this->wg->add();
                try {
                    $one_go_data = null;
                    $one_go_data = $function();
                    if (!empty($one_go_data)) {
                        $res_push = $this->channel->push($one_go_data, -1);
                        if (empty($res_push)) {
                            throw new \Exception('channel push error: ' . $this->channel->errCode);
                        }
                    }
                } catch (\Throwable $th) {
                    error_log($th->getMessage());
                    array_push($this->errorList, $th);
                } finally {
                    $this->wg->done();
                }
            });
        }
        $this->wg->wait();
        $this->channel->close();
        if (!empty($this->errorList)) {
            throw $this->errorList[0];
        }
        // 从Channel中读取所有结果
        $retData = []; // 存储所有CSV行
        while ($ongodata = $this->channel->pop()) {
            $retData[] = $ongodata;
        }
        return $retData;
    }
}