如何将Swoole队列实现单生产多消费者模式,以应对长尾词查询需求?
- 内容介绍
- 文章标签
- 相关推荐
本文共计383个文字,预计阅读时间需要2分钟。
使用Swoole多进程队列消费,采用非阻塞模式。生产者结束,向消费者发送消息,消费者递归传递消息。代码示例:
php// 生产者结束,发送消息给消费者$worker->push($data);echo 'push data_'.$data.'\n';usleep(1000);// 消息生产结束,记录
请注意,这段代码是简化后的,可能需要根据实际情况调整和完善。
swoole 多进程队列消费,使用非阻塞模式,生产者结束,发送消息给消费者,消费者传递结束消息给其他消费者push($data); echo '< push data_'.$data."\n"; usleep(1000); } // 消息生产结束 $worker->push('exit'); $worker->exit(0); }); $producer->useQueue($queueName, 2 | swoole_process::IPC_NOWAIT); $processCollect[$producer->start()] = 'product'; // 多个消费者 for ($i = 0; $i < $max_precess; ++$i) { $process = new swoole_process(function (swoole_process $worker) use ($process, $i): void { swoole_set_process_name('test process_'.$i); echo 'test process_'.$i.'_'.posix_getpid()." start\n"; while (true) { $recv = $worker->pop(); if ($recv == false) { continue; } if ($recv == 'exit') { // 传递消息生产结束给其他消费者 $worker->push('exit'); break; } echo '> process_'.$i.'_'.posix_getpid().' pull '.$recv."\n"; } $worker->exit(0); }); $process->useQueue($queueName, 2 | swoole_process::IPC_NOWAIT); $processCollect[$process->start()] = $i; } while (0 != count($processCollect)) { $ret = swoole_process::wait(); if ($ret !== false) { echo '#'.$processCollect[$ret['pid']].' '.$ret['pid']." exit\n"; unset($processCollect[$ret['pid']]); } } $producer->freeQueue();
本文共计383个文字,预计阅读时间需要2分钟。
使用Swoole多进程队列消费,采用非阻塞模式。生产者结束,向消费者发送消息,消费者递归传递消息。代码示例:
php// 生产者结束,发送消息给消费者$worker->push($data);echo 'push data_'.$data.'\n';usleep(1000);// 消息生产结束,记录
请注意,这段代码是简化后的,可能需要根据实际情况调整和完善。
swoole 多进程队列消费,使用非阻塞模式,生产者结束,发送消息给消费者,消费者传递结束消息给其他消费者push($data); echo '< push data_'.$data."\n"; usleep(1000); } // 消息生产结束 $worker->push('exit'); $worker->exit(0); }); $producer->useQueue($queueName, 2 | swoole_process::IPC_NOWAIT); $processCollect[$producer->start()] = 'product'; // 多个消费者 for ($i = 0; $i < $max_precess; ++$i) { $process = new swoole_process(function (swoole_process $worker) use ($process, $i): void { swoole_set_process_name('test process_'.$i); echo 'test process_'.$i.'_'.posix_getpid()." start\n"; while (true) { $recv = $worker->pop(); if ($recv == false) { continue; } if ($recv == 'exit') { // 传递消息生产结束给其他消费者 $worker->push('exit'); break; } echo '> process_'.$i.'_'.posix_getpid().' pull '.$recv."\n"; } $worker->exit(0); }); $process->useQueue($queueName, 2 | swoole_process::IPC_NOWAIT); $processCollect[$process->start()] = $i; } while (0 != count($processCollect)) { $ret = swoole_process::wait(); if ($ret !== false) { echo '#'.$processCollect[$ret['pid']].' '.$ret['pid']." exit\n"; unset($processCollect[$ret['pid']]); } } $producer->freeQueue();

