与solr配合使用Rabbitmq消息队列

:::brush:php;toolbar:false
receive();

    }   

    //发送消息
    $message = array("id"=>1,"content"=>array("set"=>"msg"));
    $rabbitmq->send($message);
/**
 *  消息队列操作类(需要php amqp扩展)
 *
 */
class Rabbitmq{ 
    //消息队列连接配置
    public $conn_args = array( 'host'=>'10.10.10.1' ,'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest','vhost' =>'/');
    public $ex_name = "postmessage";
    public $chroute = "key1";
    public $queue_name = "queue_namea1";
    public $type2method = array(        //操作类型=>方法
                        'update' => 'updateMessage',
                        'updateAll'=>'updateMessageById',
                        'del'    => 'deleteMessage',
                        );

    function __construct(){
        $this->switch = 1;          //启用rabbitmq开关
        if(class_exists("AMQPConnection")  && $this->switch){
            //连接RabbitMQ
            $this->conn = new AMQPConnection($this->conn_args);
            $this->conn->connect();
            //创建channel
            $this->channel = new AMQPChannel($this->conn);
                
            //创建exchange名称和类型
                
            $this->ex = new AMQPExchange($this->channel);
            $this->ex->setName($this->ex_name);
            $this->ex->setType(AMQP_EX_TYPE_DIRECT);
            $this->ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
            $this->ex->declare();
            
        }
        
    }

    //消息发布
    public function send($message = ""){
        if(class_exists("AMQPConnection")  && $this->switch){
            
            $this->channel->startTransaction();
            $message = json_encode($message);
            $this->ex->publish($message, $this->chroute);
            $stat = $this->channel->commitTransaction();
            return $stat;
            //          $this->conn->disconnect();
        }
    }
    
    //消息接收
    public function receive(){
        if(class_exists("AMQPConnection")  && $this->switch){
            //创建消息队列
            $this->q = new AMQPQueue($this->channel);
            $this->q->setName($this->queue_name);
            $this->q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
            $this->q->declare();
            $this->q->bind($this->ex_name,$this->chroute);

            //消息获取
            $messages = $this->q->get(AMQP_AUTOACK) ;
            if ($messages){
//              print_r($messages->getRoutingKey());
//              print_r($messages->getExchangeName());
                return $messages->getBody();
            }
            return false;
//          while ($envelope = $this->q->get(AMQP_AUTOACK)) {
//              echo ($envelope->isRedelivery()) ? 'Redelivery' : 'New Message';
//              echo PHP_EOL;
//              echo $envelope->getBody(), PHP_EOL;
//          }
        }
    }
    

    /**
     * 更新单个字段
    */
    public function updateMessage($id = 0,$type = "", $field_type = "",$data = array()){
        if(class_exists("AMQPConnection")  && $this->switch){
            $post_message = array();
            $post_message['type'] = $type ;
            
            $updateData = array();
        
            foreach($data as $field=>$value){
                $updateData[$field]=array("set"=>$value);
            }
            $updateData['id'] = $ id;
            $post_message['params'] = $updateData ;
            return $post_message;
        }
    }


    
    /**
     * 删除
     * @param int $id        id
     * @param string $type   操作类型
     * @param string $fields 指定更新字段
     */
    public function deleteMessage($id = 0,$type = "", $fields = ""){
        if(class_exists("AMQPConnection")  && $this->switch){
            $post_message = array();
            if(class_exists("AMQPConnection") || $this->unverify_class){
                $post_message['type'] = $type ;
                $post_message['params']['id'] = $id;
            }
            return $post_message;
        }   
    }
    
}

This article is my 7th oldest. It is 1 words long