`

集群管理系统相关技术

 
阅读更多

一、auth认证

集群后台:

1. cloudtool通过username+pwd+dyn 到 auth-manager获取session(实际就是一个字符串):

"http://ip:port/clusterAuth/SessionGenerate?

 返回session后,构造请求头:header = {"X-Auth-Session": session}

以此为header,构造请求去集群后台请求。

 

2. 集群后台获取请求header, 取

"X-Auth-Session",

验证session是否有效(是否合法用户),并且拿到用户role等信息:

'/clusterAuth/SessionQuery?session_id=%s' % session_id)

 

3. 如果是内部接口,无username/pwd/dyn信息,则根据token来认证。即握手方式。

云主机API,产生一个token(uuid)   --- > auth manager

auth manager拿着该token询问云主机API

云主机API验证该token是自己发出的,则向auth manger返回true。

那么

 

auth manger

 

产生一个session,表示通过了验证。

 


二 rabbit mq通迅技术
 
2.1 carrot_connection.BrokerConnection

broker是一个amqp服务端,rabbit-mq server需要安装,把服务开启。创建Conection类,

指定rabbit-mq server的地址。以供consumer连接。

 
class Connection(carrot_connection.BrokerConnection):
    """
	Connection Instance Object
	"""
    @classmethod
    def instance(cls, new=True):
        """
		Return the Instance
		"""
        if new or not hasattr(cls, '_instance'):
            params = {"hostname":config.getValue("BROKER_HOST", True),
                        "port":config.getValue("BROKER_PORT", True),
                        "virtual_host":config.getValue("BROKER_VHOST", True),
                        "userid":config.getValue("BROKER_USER", True),
                        "password":config.getValue("BROKER_PASSWORD", True)}
            if new:
                return cls(**params)
            else:
                cls._instance = cls(**params)
        return cls._instance

    @classmethod
    def recreate(cls):
        try:
            del cls._instance

        except AttributeError, e:
            pass

        return cls.instance()

 

2.2 compute-node/instance-manager/volume-manager.py分别是三个consumer
 

rpc.ManagerConsumer("Compute_Node",compute_node.ComuputeNode(), inner_ip).wait()

rpc.ManagerConsumer("INSTANCE_MANAGER",instance_manager.InstanceManager()).wait()

rpc.ManagerConsumer("VOLUME_MANAGE",volume.VolumeManager()).wait()

其中, managerComsumer继承自carrot.messaging.Consumer的构造参数是:

 def __init__(self, role, proxy, hostid=None)。

其queue=routing_key=role+":"+hostid, auto_declare=True, exchange_type="topic".

 

managerComsumer会override receive函数,spawn a thead to deal with message.

如果是cast消息,则处理完就不管了。如果是call消息,则还需要把处理结果回复消息发送者。

 

回复本身又是一个cast.

consumer:

  class ManagerConsumer(messaging.Consumer):
    """
	Base Consumer Class for kins of manager: Volume, Instance...
	"""
    def __init__(self, role, proxy, hostid=None):
        if not proxy:
            LOG.error("No PROXY INPUT!")
            sys.exit(1)
        self.proxy = proxy
        self.pool = eventlet.greenpool.GreenPool(config.getValue("POOLS"))

        node_exchange = role
        if not hostid:
            node_queue = role
        else:
            node_queue = role + ": " + str(hostid)

        try:
            super(ManagerConsumer, self).__init__(connection = Connection.instance(),
                                            queue = node_queue,
                                            exchange = node_exchange,
                                            auto_declare = True,
                                            exchange_type = "topic",
                                            routing_key = node_queue)
        except Exception,e:
            print e
            LOG.error("ERROR IN __init__ ManagerConsumer for %s" %e)
            sys.exit(1)

    def receive(self, message_data, message):
        """
        spawn a thread to deal with the message by __receive
        """
        self.pool.spawn_n(self.__receive, message_data, message)
        eventlet.greenthread.sleep(0)

    def __receive(self, message_data, message):
        message.ack()
        result = self.proxy.dispatch(message_data["rpc_info"])
        if message_data["call_or_cast"] == "call":
            msg_id = message_data["msg_id"]
            msg_reply = {}
            msg_reply["result"] = result
            msg_reply["failure"] = False
            cast(msg_reply,msg_id)
2.3生产者:publisher。发送消息,实际就是一个send操作。消息有可能是同步或异步的。
CAST:
def cast(message, dest=None):
    """
    send the message to the dest with no resonse
    if dest is None, the dest will be set by the message["dest"] if available
    """
    message["call_or_cast"] = "cast"
    if dest == None:
        dest_exchange = message["dest"]
        dest_queue = message["dest_queue"]
    else:
        dest_exchange = dest
        dest_queue = dest
    try:
        publish_cast = messaging.Publisher(connection = Connection.instance(),
                                                exchange = dest_exchange,
                                                auto_declare = False,
                                                routing_key = dest_queue,
                                                )
        publish_cast.send(message)

        publish_cast.close()
    except Exception,e:
        LOG.error("ERROR IN CAST %s for %s" %(message,e))
 
CALL:生产者发出消息,希望得到同步执行的返回结果。如同步创建卷。
则call消息到volume-manger.具体流程是:
volume-api send to volume-manger, 同时构造一个consumer监听在msg_id上。
volume-manger(consumer) receive and dispatch. cast result to msg_id  
def call(message):
    """
    Send a message to the manager and wait response
    """
    message["call_or_cast"] = "call"

    dest_exchange = message["dest"]
    dest_queue = message["dest_queue"]

    try:
        msg_id = uuid.uuid4().hex
        #Set up a temp queue by generating a uuid
        consum_call = messaging.Consumer(connection = Connection.instance(),
                                        queue = msg_id,
                                        exchange = msg_id,
                                        auto_declare = True,
                                        exchange_type = "direct",
                                        exclusive = True,
                                        routing_key = msg_id)
        class WaitMessage(object):
            def __call__(self, data, message):
                message.ack()
                if data["failure"]:
                #    self.result = RemoteError(*data["failure"])
                    pass
                else:
                    self.result = data["result"]

        wait_msg = WaitMessage()
        consum_call.register_callback(wait_msg)

        publish_call = messaging.Publisher(connection = Connection.instance(),
                                        exchange = dest_exchange,
                                        auto_declare = False,
                                        routing_key = dest_queue
                                        )
        message["msg_id"] = msg_id

        publish_call.send(message)
        publish_call.close()

        try:
            consum_call.wait(limit =1)
        except StopIteration:
            pass

        consum_call.close()
        return wait_msg.result

    except Exception, e:
        LOG.error("ERROR IN CALL for %s" %e)
        return None
 
2.2 而api-server.py是一个WEB server,在一个端口上监听http请求:
service = ApiService.create()
service.start()
service.wait()
 
 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics