Sincronización de procesos y memoria compartida

A través del estudio de los artículos anteriores, creo que tiene una cierta comprensión del proceso de Swoole. Ya sea un proceso único o un grupo de procesos, nos enfocamos en la comunicación entre procesos. Después de todo, para los procesos, están aislados de memoria y la comunicación es un problema relativamente grande. El contenido del que hablamos antes en realidad no usa herramientas de terceros para la comunicación, pero de hecho, una forma más conveniente es usar directamente algunas herramientas de terceros como un medio de almacenamiento intermedio, para que diferentes procesos puedan leer directamente el contenido aquí. la capacidad de comunicarse. Por ejemplo, normalmente usamos Redis, pero incluso si se usa Redis, incluso si se usa el conjunto de conexiones, habrá un proceso de establecimiento de la conexión, por lo que no es el más eficiente. Hoy vamos a aprender una tabla de memoria compartida, que es un método de sincronización de datos más eficiente proporcionado por Swoole. Además, tenemos que aprender otras dos funciones de sincronización entre procesos muy comunes, una es un contador sin bloqueo y la otra es un bloqueo de proceso.

Sincronización de procesos

En cuanto al problema de sincronización de procesos, lo explicamos muy temprano. Fue en términos de variables globales y explicó por qué las constantes globales tradicionales no se pueden usar en Swoole. Para lograr funciones globales similares entre procesos, además de Table o herramientas externas de terceros que se discutirán más adelante, también hay algunas herramientas pequeñas que merecen nuestra atención.

Contador sin bloqueo entre procesos (atómico)

El contador sin bloqueo entre procesos es una clase de operación de conteo atómico proporcionada por la capa inferior de Swoole, que puede realizar fácilmente el incremento y decremento atómico sin bloqueo de números enteros. ¿Te suena familiar la palabra átomo? Así es, es esa atomicidad de ACID en la base de datos. Ya sea éxito o fracaso, las operaciones atómicas son operaciones que no se interrumpen por la programación de subprocesos o multiproceso, y se ejecutan hasta el final una vez que comienzan.

El contador atómico es en realidad una aplicación de función de contador con capacidad de operación atómica que simplemente se coloca en la memoria compartida, que es para implementar operaciones simples de asignación de sumas y restas.

$atomic = new Swoole\Atomic();

(new \Swoole\Process(function (\Swoole\Process $worker) use ($atomic) {
   while($atomic->get() < 5){
       $atomic->add();
       echo "Atomic Now: {$atomic->get()}, pid: {$worker->pid}", PHP_EOL;
       sleep(1);
   }
   echo "Shutdown {$worker->pid}", PHP_EOL;

}))->start();

(new \Swoole\Process(function (\Swoole\Process $worker) use ($atomic) {
   while($atomic->get() < 10){
       $atomic->add();
       echo "Atomic Now: {$atomic->get()}, pid: {$worker->pid}", PHP_EOL;
       sleep(1);
   }
   echo "Shutdown {$worker->pid}", PHP_EOL;
}))->start();

\Swoole\Process::wait();
\Swoole\Process::wait();

// [root@localhost source]# php 3.6进程同步与共享内存.php
// Atomic Now: 1, pid: 1469
// Atomic Now: 2, pid: 1468
// Atomic Now: 3, pid: 1468
// Atomic Now: 4, pid: 1469
// Atomic Now: 5, pid: 1468
// Atomic Now: 6, pid: 1469
// Shutdown 1468
// Atomic Now: 7, pid: 1469
// Atomic Now: 8, pid: 1469
// Atomic Now: 9, pid: 1469
// Atomic Now: 10, pid: 1469
// Shutdown 1469

Nada especial, puede pensar en un objeto Atomic como un Int, pero es más pequeño que un Int, solo un número entero sin signo de 32 bits. Si necesita números grandes, puede usar Swoole\Atomic\Long, que es un objeto entero con signo de 64 bits. Pero los objetos en formato largo no admiten las siguientes operaciones wait() y debilitamiento().

$atomic = new Swoole\Atomic();

//$atomic->cmpset(0, 1);

(new \Swoole\Process(function (\Swoole\Process $worker) use ($atomic) {
   $atomic->wait(3);
   echo "Shutdown wait Process: {$worker->pid}", PHP_EOL;

}))->start();

(new \Swoole\Process(function (\Swoole\Process $worker) use ($atomic) {
   sleep(2);
   $atomic->wakeup();
//    $atomic->cmpset(0, 1);
   echo "Shutdown other Process: {$worker->pid}", PHP_EOL;
}))->start();

\Swoole\Process::wait();
\Swoole\Process::wait();

//  [root@localhost source]# php 3.6进程同步与共享内存.php
//  Shutdown other Process: 1511
//  Shutdown wait Process: 1510

¿Qué significan estos dos métodos? Cuando el valor de atomic es 0, si se llama a wait(), comenzará a entrar en el estado de espera. ¿Que estas esperando? wait() finaliza cuando se llama al método debilitado() o el valor atómico se establece en 1. Si el valor atómico no es 0 al principio, entonces el método wait() no funcionará.

En este código de prueba, nuestro resultado final es que otro se ejecuta primero, es decir, después de esperar 2 segundos, después de llamar al método debilitado (), finaliza el proceso anterior que internamente llamó al método esperar (). El parámetro de espera () indica cuánto tiempo esperar. Si se establece en -1, esperará para siempre. De lo contrario, esperará de acuerdo con la cantidad de segundos del valor del parámetro. Después del tiempo de espera, no esperará y seguir corriendo Aquí, puede probar que atomic está configurado en un valor distinto de cero al principio, es decir, la línea que llama al método cmpset() en el comentario está activada. Luego, si lo vuelve a ejecutar, encontrará que wait() no funciona y el primer proceso se ejecuta directamente.

De hecho, esta función puede realizar una especie de capacidad de bloqueo, pero no es particularmente flexible.Después de todo, necesita esperar () por un tiempo y debilitar () por un tiempo, y tal vez el valor se cambie directamente. También tenemos una función más conveniente de operar bloqueos directamente en Swoole, que es el bloqueo entre procesos del que hablaremos a continuación.

Bloqueo entre procesos (Lock)

Las operaciones de bloqueo son muy importantes para las operaciones relacionadas con procesos múltiples y subprocesos múltiples. ¿por qué? Uno de los problemas más importantes al ejecutar código en paralelo es que es posible modificar una cosa al mismo tiempo. Puede ser una base de datos, datos de memoria o recursos de archivo. El MySQL que usamos tiene varios mecanismos de bloqueo. Los hilos son utilizado para el procesamiento, incluidos los mecanismos de procesamiento de transacciones, niveles, etc., para resolver varios problemas en la lectura y escritura de datos bajo alta concurrencia. Después de esto, lo estudiaremos cuidadosa y exhaustivamente cuando aprendamos contenido relacionado con MySQL. En el código del programa, es la situación más común que las operaciones de memoria y las operaciones de archivos operen al mismo tiempo y ocurra un conflicto.Por ejemplo, dos procesos modificamos un valor al mismo tiempo, un proceso se cambia a 2, un proceso es cambiado a 3, ¿cuál es el resultado final?, ¿es así?

Esta situación en realidad se ve en base al escenario empresarial ¿Es un cambio acumulativo de 3? ¿O el 3 representa un estado? En cualquier caso, el procesamiento de los dos procesos debe ser en orden, y no pueden operarse al mismo tiempo.El resultado obtenido por la operación simultánea real es ambiguo y no puede ser predicho por nuestra deducción. En este momento, podemos bloquear este tipo de operación, de modo que solo un proceso pueda operar este recurso al mismo tiempo, de modo que su resultado sea determinista en lugar de ambiguo.

$lock = new Swoole\Lock();

(new \Swoole\Process(function (\Swoole\Process $worker) use ($lock) {
    echo "Process: {$worker->pid} Wait", PHP_EOL;
    $lock->lock();
    echo "Process: {$worker->pid} Locked", microtime(true), PHP_EOL;
    sleep(3);
    $lock->unlock();
    echo "Process: {$worker->pid} exit;", PHP_EOL;
}))->start();

(new \Swoole\Process(function (\Swoole\Process $worker) use ($lock) {
    sleep(1);
    echo "Process: {$worker->pid} Wait ", PHP_EOL;
    $lock->lock();
    echo "Process: {$worker->pid} Locked",microtime(true), PHP_EOL;
    $lock->unlock();
    echo "Process: {$worker->pid} exit;", PHP_EOL;
}))->start();

\Swoole\Process::wait();
\Swoole\Process::wait();

//[root@localhost source]# php 3.6进程同步与共享内存.php
//Process: 1611 Wait
//Process: 1611 Locked1640572026.9681
//Process: 1612 Wait
//Process: 1611 exit;
//Process: 1612 Locked1640572029.9771
//Process: 1612 exit;

En este código de prueba, los métodos lock() y unlock() del objeto Swoole\Lock se utilizan para bloquear y liberar el bloqueo. El primer proceso se bloquea después de que comienza y luego descansa durante 3 segundos. El segundo proceso también quiere bloquearse después de entrar, pero el primer proceso ya ha agregado el bloqueo, por lo que tiene que esperar a que el primer proceso libere el bloqueo. Se puede ver que después de 3 segundos, el segundo proceso adquirió el bloqueo.

Los estudiantes que hayan estudiado C/C++ o Java y Go deberían entender esto fácilmente. Si hay algunas operaciones de recursos de IO, especialmente escribir datos, deben bloquearse para evitar la confusión causada por múltiples procesos que escriben datos al mismo tiempo. Al mismo tiempo, los bloqueos entre procesos no se pueden usar en corrutinas. Intente no usar API relacionadas con corrutinas en este bloqueo, de lo contrario,  se producirán interbloqueos fácilmente  .

Para obtener más información, puede consultar la documentación oficial y buscar conocimientos relacionados para un aprendizaje y una comprensión más profundos.

Memoria compartida (Tabla)

Los contadores sin bloqueo y las funciones de bloqueo anteriores son en realidad algunas funciones proporcionadas para compartir datos o comunicación entre procesos. Por ejemplo, el contador se puede usar simplemente acumulando números, y cuando se opera el mismo archivo de identificador, se agrega un bloqueo y todos los procesos en este archivo pueden leer sus datos. De hecho, esta es también una forma de comunicación entre procesos y sincronización de datos. Además de estos, Swoole proporciona una herramienta de tabla, que es una estructura de datos en memoria de rendimiento ultraalto implementada directamente en función de la memoria y los bloqueos compartidos. Puede resolver el problema del intercambio de datos multiproceso/multiproceso y el bloqueo de sincronización.

Se caracteriza por un rendimiento potente, bloqueo giratorio de bloqueo de fila incorporado (sin necesidad de una operación de bloqueo por separado), soporte para múltiples procesos y es una herramienta poderosa para compartir datos y comunicarse entre procesos.

$table = new Swoole\Table(1024);
$table->column('worker_id', Swoole\Table::TYPE_INT);
$table->column('count', Swoole\Table::TYPE_INT);
$table->column('data', Swoole\Table::TYPE_STRING, 64);
$table->create();

$ppid = getmypid();
$table->set($ppid, ['worker_id'=>getmypid(), 'count'=>0'data'=>"这里是 " . $ppid]);


(new \Swoole\Process(function (\Swoole\Process $worker) use ($table) {
    $table->set($worker->pid, ['worker_id'=>$worker->pid, 'count'=>0'data'=>"这里是 {$worker->pid}"]);
    sleep(1);
    $table->incr($worker->pid, 'count');
    print_r($table->get($worker->pid));
}))->start();

(new \Swoole\Process(function (\Swoole\Process $worker) use ($table, $ppid) {
    $table->set($worker->pid, ['worker_id'=>$worker->pid, 'count'=>3'data'=>"这里是 {$worker->pid}"]);
    sleep(1);
    $table->decr($worker->pid, 'count');
    print_r($table->get($worker->pid));
    sleep(1);

    echo "{$worker->pid} 内部循环:", PHP_EOL;
    foreach($table as $t){
        print_r($t);
    }
    if($table->exist($ppid)){
        $table->del($ppid);
    }
}))->start();

\Swoole\Process::wait();
\Swoole\Process::wait();

echo "Talbe 数量:",$table->count(), PHP_EOL;
echo "主进程循环:", PHP_EOL;
foreach($table as $t){
    print_r($t);
}
echo "Table 状态:", PHP_EOL;
print_r($table->stats());

Después de instanciar el objeto Swoole\Table, necesitamos especificar la información de la columna. El parámetro de instanciación es el número máximo de filas en la Tabla. Este número de filas no es necesariamente exacto y está relacionado con el tamaño de la memoria reservada. Tenga en cuenta que no asigna memoria dinámicamente. Abre un espacio de contenido fijo cuando se instancia directamente. Es necesario planificar el espacio de memoria que necesitamos con anticipación. La operación de especificar una columna es especialmente similar a la operación de construir una tabla de base de datos.Este paso es para serializar fácilmente los datos en la memoria.

Luego, podemos establecer los datos de cada fila mediante el método set(). En diferentes procesos, los datos se comparten y se pueden visualizar.

Finalmente, también implementa funciones relacionadas con el iterador, que pueden ser atravesadas por foreach(), devueltas por count() y devueltas por stats().

El resultado final debería verse así.

//  [root@localhost source]# php 3.6进程同步与共享内存.php
//  Array
//  (
//      [worker_id] => 1551
//      [count] => 1
//      [data] => 这里是 1551
//  )
//  Array
//  (
//      [worker_id] => 1552
//      [count] => 2
//      [data] => 这里是 1552
//  )
//  1552 内部循环:
//  Array
//  (
//      [worker_id] => 1550
//      [count] => 0
//      [data] => 这里是 1550
//  )
//  Array
//  (
//     [worker_id] => 1551
//      [count] => 1
//      [data] => 这里是 1551
//  )
//  Array
//  (
//      [worker_id] => 1552
//     [count] => 2
//     [data] => 这里是 1552
//  )
//  Talbe 数量:2
//  主进程循环:
//  Array
//  (
//      [worker_id] => 1551
//      [count] => 1
//      [data] => 这里是 1551
//  )
//  Array
//  (
//      [worker_id] => 1552
//      [count] => 2
//      [data] => 这里是 1552
//  )
//  Table 状态:
//  Array
//  (
//      [num] => 2
//      [conflict_count] => 0
//      [conflict_max_level] => 0
//      [insert_count] => 3
//      [update_count] => 2
//      [delete_count] => 1
//     [available_slice_num] => 204
//     [total_slice_num] => 204
//  )

Resumir

El contenido del estudio de hoy está relacionado con la sincronización entre procesos, con estos la comunicación y sincronización entre procesos es mucho más conveniente. Sin embargo, debe tenerse en cuenta que si Atomic y Lock se usan en aplicaciones de servidor, no los cree en funciones de devolución de llamada como onReceive, de lo contrario, la memoria puede seguir creciendo, que es el legendario desbordamiento de pérdida de memoria. ¿por qué? De hecho, es porque son compartidos globalmente por el proceso y no se reciclan.Si se crean todo el tiempo, el proceso seguirá creándose sin parar, y finalmente reventará la memoria física de toda la aplicación y el servidor.

Bueno, ya hemos aprendido sobre el contenido básico relacionado con el proceso, y luego entraremos en el siguiente gran capítulo, que es el aprendizaje del contenido relacionado con la rutina. Más contenido emocionante ya está frente a ti, ¡no te lo puedes perder si prestas atención a Sanlian!

Código de prueba:

https://github.com/zhangyue0503/swoole/blob/main/3.Swoole%E8%BF%9B%E7%A8%8B/source/3.6%E8%BF%9B%E7%A8%8B%E5% 90%8C%E6%AD%A5%E4%B8%8E%E5%85%B1%E4%BA%AB%E5%86%85%E5%AD%98.php

Documentación de referencia:

https://wiki.swoole.com/#/process/process_pool

https://wiki.swoole.com/#/process/process_manager