Skip to content

Commit

Permalink
feat: 订阅下发支持动态分组 (closed #2507)
Browse files Browse the repository at this point in the history
# Reviewed, transaction id: 27913
  • Loading branch information
ping15 committed Dec 26, 2024
1 parent b5814a6 commit 9564f6e
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 14 deletions.
206 changes: 192 additions & 14 deletions apps/backend/subscription/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,14 @@ def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id:
host_info_result = batch_request(
call_func, dict(bk_service_template_ids=template_ids, bk_biz_id=bk_biz_id, fields=fields)
)
elif bk_obj_id == models.Subscription.NodeType.DYNAMIC_GROUP:
# 集群模板
call_func = client_v2.cc.find_host_by_set_template
template_ids = [info["bk_inst_id"] for info in template_info_list]
bk_set_ids = [info["bk_set_id"] for info in template_info_list]
host_info_result = batch_request(
call_func, dict(bk_set_template_ids=template_ids, bk_set_ids=bk_set_ids, bk_biz_id=bk_biz_id, fields=fields)
)
else:
# 集群模板
call_func = client_v2.cc.find_host_by_set_template
Expand All @@ -489,6 +497,26 @@ def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id:
return host_info_result


def get_host_module_info_by_host_ids(bk_host_id_chunks, bk_biz_id):
"""
根据主机id列表查询主机业务关系信息
:param bk_host_id_chunks: 主机id列表
:param bk_biz_id: 业务ID
"""
# 补充实例所属模块ID
host_biz_relations = []

with ThreadPoolExecutor(max_workers=settings.CONCURRENT_NUMBER) as ex:
tasks = [
ex.submit(client_v2.cc.find_host_biz_relations, dict(bk_host_id=chunk, bk_biz_id=bk_biz_id))
for chunk in bk_host_id_chunks
]
for future in as_completed(tasks):
host_biz_relations.extend(future.result())

return host_biz_relations


def get_service_instances_by_template(bk_obj_id, template_info_list: list, bk_biz_id: int = None):
"""
根据集群模板ID/服务模板ID获得服务实例列表
Expand Down Expand Up @@ -718,6 +746,14 @@ def set_template_scope_nodes(scope):
# 兼容 service_template_id 不存在的场景
if "service_template_id" in node and node["service_template_id"] in template_ids
]
elif scope["node_type"] == models.Subscription.NodeType.DYNAMIC_GROUP:
# 转化服务模板为node
scope["nodes"] = [
{"bk_inst_id": node["bk_module_id"], "bk_obj_id": "module"}
for node in modules_info
# 兼容 bk_set_id 不存在的场景
if "bk_set_id" in node and node["bk_set_id"] in template_ids
]
else:
# 转化集群模板为node
scope["nodes"] = [
Expand Down Expand Up @@ -823,6 +859,35 @@ def get_scope_labels_func(
}


def execute_dynamic_groups(nodes: List[dict], bk_biz_id: int, bk_obj_id: str, fields: List[str]):
"""
执行动态分组
:param nodes: 节点列表
:param bk_biz_id: 业务id
:param bk_obj_id: 分组目标(目前只支持 host 和 set )
:param fields: 属性列表
"""
params = [
{
"func": CCApi.execute_dynamic_group,
"params": {
"fields": fields,
"bk_biz_id": bk_biz_id,
"id": node["bk_inst_id"],
"no_request": True,
},
"sort": "id",
"limit": constants.LIST_SERVICE_INSTANCE_DETAIL_LIMIT,
}
for node in nodes
if node["bk_obj_id"] == bk_obj_id
]

return batch_call(
batch_request, params, extend_result=True, interval=constants.LIST_SERVICE_INSTANCE_DETAIL_INTERVAL
)


def get_instances_by_scope_with_checker(
scope: Dict[str, Union[Dict, int, Any]], steps: List[models.SubscriptionStep], *args, **kwargs
) -> Dict[str, Dict[str, Union[Dict, Any]]]:
Expand Down Expand Up @@ -888,7 +953,7 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str,
else:
module_to_topo = {}

nodes = scope["nodes"]
nodes: List[dict] = scope["nodes"]
if not nodes:
# 兼容节点为空的情况
return {}
Expand Down Expand Up @@ -935,27 +1000,21 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str,
# 校验是否都选择了同一种模板
bk_obj_id_set = check_instances_object_type(nodes)
if scope["object_type"] == models.Subscription.ObjectType.HOST:
# 补充实例所属模块ID
host_biz_relations = []
instances.extend(
[
{"host": inst}
for inst in get_host_detail_by_template(list(bk_obj_id_set)[0], nodes, bk_biz_id=bk_biz_id)
]
)
bk_host_id_chunks = chunk_lists([instance["host"]["bk_host_id"] for instance in instances], 500)
with ThreadPoolExecutor(max_workers=settings.CONCURRENT_NUMBER) as ex:
tasks = [
ex.submit(client_v2.cc.find_host_biz_relations, dict(bk_host_id=chunk, bk_biz_id=bk_biz_id))
for chunk in bk_host_id_chunks
]
for future in as_completed(tasks):
host_biz_relations.extend(future.result())

host_biz_relations = get_host_module_info_by_host_ids(
bk_host_id_chunks=chunk_lists([instance["host"]["bk_host_id"] for instance in instances], 500),
bk_biz_id=bk_biz_id,
)

# 转化模板为节点
nodes = set_template_scope_nodes(scope)
instances = add_host_module_info(host_biz_relations, instances)

else:
# 补充服务实例中的信息
# 转化模板为节点,**注意不可在get_service_instance_by_inst之后才转换**
Expand All @@ -964,6 +1023,125 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str,
[{"service": inst} for inst in get_service_instance_by_inst(bk_biz_id, nodes, module_to_topo)]
)

# 按照动态分组查询
elif scope["node_type"] == models.Subscription.NodeType.DYNAMIC_GROUP:
# 获取动态分组主机信息
host_infos = execute_dynamic_groups(
nodes=nodes,
bk_biz_id=bk_biz_id,
bk_obj_id=constants.CmdbGroupObjId.HOST.value,
fields=["bk_host_id"],
)

# 获取动态分组集群信息
set_infos = execute_dynamic_groups(
nodes=nodes,
bk_biz_id=bk_biz_id,
bk_obj_id=constants.CmdbGroupObjId.SET.value,
fields=["bk_set_id", "set_template_id"],
)

if scope["object_type"] == models.Subscription.ObjectType.HOST:
# 根据主机信息填充主机实例
if host_infos:
instances.extend(
[
{"host": inst, "source": "host_infos"}
for inst in get_host_detail(
host_info_list=[
{
"bk_biz_id": bk_biz_id,
"bk_host_id": host_info["bk_host_id"],
}
for host_info in host_infos
],
bk_biz_id=bk_biz_id,
source="get_instances_by_scope",
)
]
)

# 根据集群信息填充主机实例
if set_infos:
instances.extend(
[
{"host": inst, "source": "set_infos"}
for inst in get_host_detail_by_template(
bk_obj_id=models.Subscription.NodeType.DYNAMIC_GROUP,
template_info_list=[
{
"bk_set_id": set_info["bk_set_id"],
"bk_inst_id": set_info["set_template_id"],
}
for set_info in set_infos
],
bk_biz_id=bk_biz_id,
)
]
)

host_biz_relations = get_host_module_info_by_host_ids(
bk_host_id_chunks=chunk_lists([instance["host"]["bk_host_id"] for instance in instances], 500),
bk_biz_id=bk_biz_id,
)

# 转化模板为节点
nodes = set_template_scope_nodes(
scope={
"bk_biz_id": bk_biz_id,
"node_type": models.Subscription.NodeType.DYNAMIC_GROUP,
"nodes": [
{
"bk_inst_id": set_info["bk_set_id"],
}
for set_info in set_infos
],
}
)
instances = add_host_module_info(host_biz_relations, instances)

# 去重主机id去重
instances = list({instance["host"]["bk_host_id"]: instance for instance in instances}.values())

else:
# 根据主机信息填充服务实例
if host_infos:
instances.extend(
[
{"service": inst, "source": "host_infos"}
for inst in get_service_instances(
bk_biz_id=bk_biz_id,
filter_id_list=[host_info["bk_host_id"] for host_info in host_infos],
filter_field_name=FilterFieldName.BK_HOST_LIST,
ignore_exception=False,
)
]
)

# 根据集群信息填充服务实例
if set_infos:
nodes = set_template_scope_nodes(
scope={
"bk_biz_id": bk_biz_id,
"node_type": models.Subscription.NodeType.DYNAMIC_GROUP,
"nodes": [
{
"bk_inst_id": set_info["bk_set_id"],
}
for set_info in set_infos
],
}
)
instances.extend(
[
{"service": inst, "source": "set_infos"}
for inst in get_service_instance_by_inst(bk_biz_id, nodes, module_to_topo)
]
)

# 根据服务实例id去重
instances = list({instance["service"]["id"]: instance for instance in instances}.values())

if not need_register:
# 补充必要的主机或实例相关信息

Expand Down Expand Up @@ -1010,7 +1188,7 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str,
return instances_dict


def add_host_info_to_instances(bk_biz_id: int, scope: Dict, instances: Dict):
def add_host_info_to_instances(bk_biz_id: int, scope: Dict, instances: List):
"""
补全实例的主机信息
:param bk_biz_id: 业务ID
Expand Down Expand Up @@ -1087,7 +1265,7 @@ def add_scope_info_to_instances(nodes: List, scope: Dict, instances: List[Dict],
:return:
"""
for instance in instances:
if scope["node_type"] == models.Subscription.NodeType.INSTANCE:
if scope["node_type"] == models.Subscription.NodeType.INSTANCE or instance.pop("source", "") == "host_infos":
_add_scope_info_to_inst_instances(scope, instance)
else:
_add_scope_info_to_topo_instances(scope, instance, nodes, module_to_topo)
Expand Down
11 changes: 11 additions & 0 deletions apps/node_man/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,17 @@ def _get_member__alias_map(cls) -> Dict[Enum, str]:
return {cls.V4: _("IPv4"), cls.V6: _("IPv6")}


class CmdbGroupObjId(EnhanceEnum):
"""CMDB 动态分组目标"""

HOST = "host"
SET = "set"

@classmethod
def _get_member__alias_map(cls) -> Dict[Enum, str]:
return {cls.HOST: _("主机"), cls.SET: _("集群")}


class PolicyRollBackType:
SUPPRESSED = "SUPPRESSED"
LOSE_CONTROL = "LOSE_CONTROL"
Expand Down
18 changes: 18 additions & 0 deletions apps/node_man/migrations/0085_alter_subscription_node_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.4 on 2024-12-26 08:38

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('node_man', '0084_update_isp_and_accesspoint_regionid_cityid'),
]

operations = [
migrations.AlterField(
model_name='subscription',
name='node_type',
field=models.CharField(choices=[('TOPO', '动态实例(拓扑)'), ('INSTANCE', '静态实例'), ('SERVICE_TEMPLATE', '服务模板'), ('SET_TEMPLATE', '集群模板'), ('DYNAMIC_GROUP', '动态分组')], db_index=True, max_length=20, verbose_name='节点类型'),
),
]
2 changes: 2 additions & 0 deletions apps/node_man/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1831,12 +1831,14 @@ class NodeType(object):
INSTANCE = "INSTANCE"
SERVICE_TEMPLATE = "SERVICE_TEMPLATE"
SET_TEMPLATE = "SET_TEMPLATE"
DYNAMIC_GROUP = "DYNAMIC_GROUP"

NODE_TYPE_CHOICES = (
(NodeType.TOPO, _("动态实例(拓扑)")),
(NodeType.INSTANCE, _("静态实例")),
(NodeType.SERVICE_TEMPLATE, _("服务模板")),
(NodeType.SET_TEMPLATE, _("集群模板")),
(NodeType.DYNAMIC_GROUP, _("动态分组")),
)

class CategoryType(object):
Expand Down
9 changes: 9 additions & 0 deletions common/api/modules/cc.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,12 @@ def __init__(self):
before_request=add_esb_info_before_request,
api_name="list_service_instance_detail",
)
self.execute_dynamic_group = DataAPI(
method="POST",
url=CC_APIGATEWAY_ROOT_V2 + "execute_dynamic_group/",
module=self.MODULE,
simple_module=self.SIMPLE_MODULE,
description="执行动态分组",
before_request=add_esb_info_before_request,
api_name="search_set_v2",
)

0 comments on commit 9564f6e

Please sign in to comment.