跳到主要内容

概览

Python 环境下的 CloudTower SDK,适用于 2.7 和 3.4 及以上版本

安装

  • whl

    pip install cloudtower_sdk-1.9.0-py2.py3-none-any.whl
  • tar.gz

    tar xvzf cloudtower-sdk-1.9.0.tar.gz
    cd cloudtower-sdk-1.90.tar.gz
    python setup.py install
  • git 源码安装

    git clone https://github.com/smartxworks/cloudtower-python-sdk.git
    cd clodtower-python-sdk
    python setup.py install
  • git pip 安装

    pip install git+https://github.com/smartxworks/cloudtower-python-sdk.git
  • pypi 安装

    pip install cloudtower-sdk

使用

创建实例

创建 ApiClient 实例

from cloudtower.configuration import Configuration
from cloudtower import ApiClient
# 配置 operation-api endpoint
configuration = Configuration(host="http://192.168.96.133/v2/api")
client = ApiClient(configuration)

如果需要使用 https,可以安装证书,或者忽略证书验证

configuration = Configuration(host="https://192.168.96.133/v2/api")
configuration.verify_ssl = False
client = ApiClient(configuration)

创建对应的 API 实例

根据不同用途的操作创建相关的 API 实例,例如虚拟机相关操作需要创建一个 VmApi

from cloudtower.api.vm_api import VmApi
vm_api = VmApi(client)

鉴权

from cloudtower.api.user_api import UserApi
from cloudtower.models import UserSource
# 通过 UserApi 中的 login 方法来获得 token。
user_api = UserApi(client)
login_res = user_api.login({
"username": "your_username",
"password": "your_password",
"source": UserSource.LOCAL
})
# 将 token 配置在 configuration.api_key["Authorization"] 中,
# 这样所有使用当前 client 的 api 都会获得鉴权的 token 信息。
configuration.api_key["Authorization"] = login_res.data.token

发送请求

获取资源

vms = vm_api.get_vms({
"where": {
"id": "vm_id"
},
"first":1,
})

更新资源

资源更新会产生相关的异步任务,当异步任务结束时,代表资源操作完成且数据已更新。

start_res = vm_api.start_vm({
"where": {
"id": "stopped_vm_id"
},
})

可以通过提供的工具方法同步等待异步任务结束

from cloudtower.utils import wait_tasks
try:
wait_tasks([res.task_id for res in start_res], api_client)
except ApiException as e:
# 处理错误
else:
# task完成后的回调
方法参数说明
参数名类型是否必须说明
idslist[str]需查询的 task 的 id 列表
api_clientApiClient查询所使用的 ApiClient 实例
intervalint轮询的间隔时间,默认为 5s
timeoutint超时时间,默认为 300s
exit_on_errorbool是否在单个 Task 出错时立即退出,否则则会等待全部 Task 都完成后再退出,默认为 False。
错误说明
错误码说明
408超时
500异步任务内部错误

自定义 header

cloudtower api 支持通过设置 header 中的 content-language 来设置返回信息的语言, 可选值 en-US, zh-CN。默认为 en-US

通过 ApiClientset_default_header 方法

可以通过 ApiClientset_default_header 方法设置默认的 header 信息。

api_client.set_default_header("content_language","en-US")
alert_api = AlertApi(api_client)
# 此时得到的 alerts 中的 message, solution, cause, impact 将被转换为英文描述。
alerts = alert_api.get_alerts(
{
"where": {
"cluster": {
"id": "cluster_id"
}
},
"first": 100
},
)
通过设置请求的关键字参数

也可以通过设置请求的关键字参数 content_language 来设置返回信息的语言。

from cloudtower.api.alert_api import AlertApi

alert_api = AlertApi(api_client)
# 此时得到的 alerts 中的 message, solution, cause, impact 将被转换为中文描述。
alerts = alert_api.get_alerts(
{
"where": {
"cluster": {
"id": "cluster_id"
}
},
"first": 100
},
content_language="zh-CN"
)

其他

发送异步请求

上述请求的发送都是同步的请求,会堵塞当前进程。如果需要使用异步请求,请在对应请求的关键字参数中加上 async_req=True。 通过返回结果 ApplyResult.get() 来获取对应的结果。

vms = vm_api.get_vms(
{
"where": {
"id": "vm_id"
}
},
async_req=True
)
print(vms.get()[0].name)

使用完成后销毁 ApiClient 实例

client.close()

场景示例

虚拟机备份

from cloudtower import ApiClient
from cloudtower.api.vm_api import VmApi
from cloudtower.api.vm_snapshot_api import VmSnapshotApi
from cloudtower.api.iscsi_lun_snapshot_api import IscsiLunSnapshotApi
from cloudtower.models import (
ConsistentType,
VmToolsStatus
)
from cloudtower.utils import wait_tasks


def create_vm_snapshot(
api_client: ApiClient,
target_vm_name: str,
target_snapshot_name: str,
consistent_type: ConsistentType
):
vm_api = VmApi(api_client)
vm_snapshot_api = VmSnapshotApi(api_client)
iscsi_lun_snapshot_api = IscsiLunSnapshotApi(api_client)
# 1. 获取所需备份的虚拟机的信息,这里我们需要vm的id来构建创建snapshot的参数
vm = vm_api.get_vms({
"where": {
"name": target_vm_name
},
"first": 1
})
# vm 已安装并启动 VMTools 时,consistent_type 可以使用 FILE_SYSTEM_CONSISTENT 代表文件系统一致性快照
if vm.vm_tools_status != VmToolsStatus.RUNNING and consistent_type == ConsistentType.FILE_SYSTEM_CONSISTENT:
consistent_type = ConsistentType.CRASH_CONSISTENT

# 2. 创建虚拟机快照
snapshots_with_task = vm_snapshot_api.create_vm_snapshot({
"data": [
{
"vm_id": vm.id,
"name": target_snapshot_name,
"consistent_type": consistent_type
}
]
})

# 3. 等待Task完成
wait_tasks([snapshots_with_task[0].task_id], api_client)

# 4. 根据返回的id查询生成的虚拟机快照
snapshot = vm_snapshot_api.get_vm_snapshots({
"where": {
"id": snapshots_with_task.data.id
}
})[0]
# 5. 根据返回的snapshot中的vm_disks包含了快照的虚拟盘信息
# type 为 DISK 表示对应一个卷,其中会包含一个 snapshot_local_id 则表示该虚拟卷对应的lun快照的 local_id
# type 为 CD-ROM则代表为被挂载的CD-ROM,不会产生lun快照
lun_snapshot_ids = []
for disk in snapshot.vm_disks:
if disk.type == "DISK":
lun_snapshot_ids.append(disk.snapshot_local_id)

lun_snapshots = iscsi_lun_snapshot_api.get_iscsi_lun_snapshots({
"where": {
"name_in": lun_snapshot_ids
}
})

return {
"vm_snapshot": snapshot,
"lun_snapshots": lun_snapshots
}

Dashboard 构建

定义工具方法

from functools import reduce
from datetime import datetime, timedelta
from cloudtower import ApiClient
from cloudtower.configuration import Configuration
from cloudtower.models import SeverityEnum, ClusterType, Hypervisor, DiskType, DiskUsageStatus, DiskHealthStatus
from cloudtower.api import VmApi, ClusterApi, AlertApi, HostApi, DiskApi, ClusterSettingsApi, GlobalSettingsApi

api_client = ApiClient(Configuration(host="http://192.168.96.133/v2/api"))

byte_units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]
hz_units = ["Hz", "KHz", "MHz", "GHz", "THz"]


def format_unit(base: int, units, step=1024):
if not len(units):
raise Exception("no unit provided")
if base <= 0:
return "0" + units[0]
for unit in units:
if base < step:
return "{:.2f}{}".format(base, unit)
base /= step
return "{:.2f}{}".format(base, units[-1])

构建报警信息

def build_alerts(api_client: ApiClient, cluster_ids):
alert_api = AlertApi(api_client)
alerts = alert_api.get_alerts({
"where": {
"ended": False,
"cluster": {
"id_in": cluster_ids
},
}
})
critial_alerts = [
alert for alert in alerts if alert.severity == SeverityEnum.CRITICAL]
notice_alerts = [
alert for alert in alerts if alert.severity == SeverityEnum.NOTICE]
info_alerts = [
alert for alert in alerts if alert.severity == SeverityEnum.INFO]
return {
"critical": critial_alerts,
"notice": notice_alerts,
"info": info_alerts
}

构建硬盘信息

这里以机械硬盘为例

def build_hdd_info(api_client: ApiClient, cluster_ids):
disk_api = DiskApi(api_client)
disks = disk_api.get_disks({
"where": {
"host": {
"cluster": {
"id_in": cluster_ids
}
}
}
})
hdd = {
"healthy": 0,
"warning": 0,
"error": 0,
"total": 0,
}
for disk in disks:
if disk.type == DiskType.HDD:
if disk.health_status in [DiskHealthStatus.UNHEALTHY, DiskHealthStatus.SUBHEALTHY, DiskHealthStatus.SMART_FAILED]:
hdd['error'] += 1
elif disk.usage_status in [DiskUsageStatus.UNMOUNTED, DiskUsageStatus.PARTIAL_MOUNTED]:
hdd['warning'] += 1
else:
hdd['healthy'] += 1
hdd['total'] += 1
return hdd

构建性能指标

获取指定集群的 CPU 核数,CPU 频率总数,CPU 使用率,内存总量,内存使用量,存储资源总量,存储资源已使用量,存储资源失效量与存储资源可用量。

def build_metrics(api_client: ApiClient, clusters, cluster_ids):
result = {}
host_api = HostApi(api_client)
hosts = host_api.get_hosts({
"where": {
"cluster": {
"id_in": cluster_ids
}
}
})
cpu = {
"total_cpu_cores": 0,
"total_cpu_hz": 0,
"used_cpu_hz": 0,
}
memory = {
"total_memory": 0,
"used_memory": 0,
}
storage = {
"total": 0,
"used": 0,
"invalid": 0,
"available": 0
}

for host in hosts:
cluster = next(
cluster for cluster in clusters if cluster.id == host.cluster.id)
if cluster.hypervisor == Hypervisor.ELF:
memory['total_memory'] += 0 if host.total_memory_bytes is None else host.total_memory_bytes
memory['used_memory'] += (0 if host.running_pause_vm_memory_bytes is None else host.running_pause_vm_memory_bytes) + \
(0 if host.os_memory_bytes is None else host.os_memory_bytes)

for cluster in clusters:
if cluster.type == ClusterType.SMTX_OS:
cpu["total_cpu_cores"] += 0 if cluster.total_cpu_cores is None else cluster.total_cpu_cores
cpu["total_cpu_hz"] += 0 if cluster.total_cpu_hz is None else cluster.total_cpu_hz
cpu["used_cpu_hz"] += 0 if cluster.used_cpu_hz is None else cluster.used_cpu_hz
if cluster.hypervisor == Hypervisor.VMWARE:
memory["total_memory"] += 0 if cluster.total_memory_bytes is None else cluster.total_memory_bytes
memory["used_memory"] += 0 if cluster.used_memory_bytes is None else cluster.used_memory_bytes
storage["total"] += 0 if cluster.total_data_capacity is None else cluster.total_data_capacity
storage["used"] += 0 if cluster.used_data_space is None else cluster.used_data_space
storage["invalid"] += 0 if cluster.failure_data_space is None else cluster.failure_data_space
if len([cluster for cluster in clusters if cluster.type != ClusterType.SMTX_ZBS]) > 1:
cpu["cpu_usage"] = "{:.2f}%".format(
cpu["used_cpu_hz"] / cpu["total_cpu_hz"])
cpu["total_cpu_hz"] = format_unit(cpu["total_cpu_hz"], hz_units, 1000)
cpu["used_cpu_hz"] = format_unit(cpu["used_cpu_hz"], hz_units, 1000)
result['cpu'] = cpu
memory["memory_usage"] = "{:.2f}%".format(
memory["used_memory"] / memory["total_memory"])
memory["total_memory"] = format_unit(
memory["total_memory"], byte_units)
memory["used_memory"] = format_unit(
memory["used_memory"], byte_units)
result["memory"] = memory
storage["available"] = format_unit(
storage["total"] - storage["used"] - storage["invalid"], byte_units)
storage["total"] = format_unit(storage["total"], byte_units)
storage["used"] = format_unit(storage["used"], byte_units)
storage["invalid"] = format_unit(storage["invalid"], byte_units)
result["storage"] = storage
return result

构建 Dashboard

def build_dashboard(api_client: ApiClient, datacenter_id: str = None, cluster_id: str = None):
result = {}
cluster_api = ClusterApi(api_client)
clusters = cluster_api.get_clusters({
"where": {"id": cluster_id} if cluster_id is not None else {"datacenters_some": {"id": datacenter_id}} if datacenter_id is not None else None
})
cluster_ids = [cluster.id for cluster in clusters]

result["alerts"] = build_alerts(api_client, cluster_ids)
result["hdd"] = build_hdd_info(api_client, cluster_ids)
metric = build_metrics(api_client, clusters, cluster_ids)
if "cpu" in metric:
result["cpu"] = metric["cpu"]
if "memory" in metric:
result["memory"] = metric["memory"]
if "storage" in metric:
result["storage"] = metric["storage"]
return result