Skip to content

Commit

Permalink
Allow changing recalculation_type from UI
Browse files Browse the repository at this point in the history
This is labeled in the UI as experimental with some description of
each current option.

This also refactors `Deployments` a few ways:
* Shared logic for doing the deployment update was consolidate into a
  `Repo.transaction` and only the branching calculation logic was split
* auditing the update was moved into a shared function
* For device calculated path (current default), the refactor changed
  to ensure the `deployments/*` broadcasted events are only sent once.
  Previously they could potentially be sent twice if `conditions` and
  `is_active` was changed at the same time
  • Loading branch information
jjcarstens authored and oestrich committed Sep 16, 2024
1 parent 4e7c82d commit 78ce1c1
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 177 deletions.
313 changes: 143 additions & 170 deletions lib/nerves_hub/deployments.ex
Original file line number Diff line number Diff line change
Expand Up @@ -110,196 +110,158 @@ defmodule NervesHub.Deployments do
Update a deployment
- Records audit logs depending on changes
- Will force a recalculation
- May force a recalculation if conditions or active changed
"""
@spec update_deployment(Deployment.t(), map) :: {:ok, Deployment.t()} | {:error, Changeset.t()}
def update_deployment(deployment, params) do
case deployment.recalculation_type do
"broadcast" ->
update_deployment_broadcast(deployment, params)
Repo.transaction(fn ->
device_count =
Device
|> select([d], count(d))
|> where([d], d.deployment_id == ^deployment.id)
|> Repo.one()

"calculator" ->
update_deployment_calculator(deployment, params)
end
end
changeset =
deployment
|> Deployment.with_firmware()
|> Deployment.changeset(params)
|> Ecto.Changeset.put_change(:total_updating_devices, device_count)

# in the process of being deprecated
@doc false
def update_deployment_broadcast(deployment, params) do
device_count =
Device
|> select([d], count(d))
|> where([d], d.deployment_id == ^deployment.id)
|> Repo.one()
case Repo.update(changeset) do
{:ok, deployment} ->
deployment = Repo.preload(deployment, [:firmware], force: true)

changeset =
deployment
|> Deployment.with_firmware()
|> Deployment.changeset(params)
|> Ecto.Changeset.put_change(:total_updating_devices, device_count)
audit_changes!(deployment, changeset)
recalculate_devices(deployment, changeset)

case Repo.update(changeset) do
{:ok, deployment} ->
deployment = Repo.preload(deployment, [:firmware], force: true)

payload = %{
id: deployment.id,
active: deployment.is_active,
product_id: deployment.product_id,
platform: deployment.firmware.platform,
architecture: deployment.firmware.architecture,
version: deployment.firmware.version,
conditions: deployment.conditions
}

# if the conditions changed, we should reset all devices and tell any connected
if Map.has_key?(changeset.changes, :conditions) do
Device
|> where([d], d.deployment_id == ^deployment.id)
|> Repo.update_all(set: [deployment_id: nil])

if deployment.conditions["version"] in [nil, ""] and deployment.is_active do
# The version condition is the only one not done with the DB.
# This opens up a minor optimization to preemptively set matching
# devices to the new deployment all at once since the version
# condition can be skipped.
#
# This also helps with offline devices by potentially reducing the
# need to do the expensive deployment check on next connect which
# reduces the load when a lot of devices come online at once.
Device
|> where([d], d.product_id == ^deployment.product_id)
|> where(
[d],
fragment("?->>'platform' = ?", d.firmware_metadata, ^deployment.firmware.platform)
)
|> where(
[d],
fragment(
"?->>'architecture' = ?",
d.firmware_metadata,
^deployment.firmware.architecture
)
)
|> where([d], fragment("? <@ ?", ^deployment.conditions["tags"], d.tags))
|> Repo.update_all(set: [deployment_id: deployment.id])
end
# Inform those who care that the deployment updated
_ = broadcast(deployment, "deployments/update")

_ = broadcast(deployment, "deployments/changed", payload)
_ = broadcast(:none, "deployments/changed", payload)
deployment

description = "deployment #{deployment.name} conditions changed and removed all devices"
AuditLogs.audit!(deployment, deployment, description)
end
{:error, changeset} ->
Repo.rollback(changeset)
end
end)
end

defp audit_changes!(deployment, changeset) do
Enum.each(changeset.changes, fn
{:archive_id, archive_id} ->
# Trigger the new archive to get downloaded by devices
if Map.has_key?(changeset.changes, :archive_id) do
payload = %{
archive_id: deployment.archive_id
}
payload = %{archive_id: archive_id}
_ = broadcast(deployment, "archives/updated", payload)

_ = broadcast(deployment, "archives/updated", payload)
description = "deployment #{deployment.name} has a new archive"
AuditLogs.audit!(deployment, deployment, description)

description = "deployment #{deployment.name} has a new archive"
AuditLogs.audit!(deployment, deployment, description)
end

# if is_active is false, wipe it out like above
# if its now true, tell the none deployment devices
if Map.has_key?(changeset.changes, :is_active) do
if deployment.is_active do
broadcast(:none, "deployments/changed", payload)
else
Device
|> where([d], d.deployment_id == ^deployment.id)
|> Repo.update_all(set: [deployment_id: nil])

_ = broadcast(deployment, "deployments/changed", payload)

description = "deployment #{deployment.name} is inactive and removed all devices"
AuditLogs.audit!(deployment, deployment, description)
end
end
{:conditions, _new_conditions} ->
description = "deployment #{deployment.name} conditions changed"
AuditLogs.audit!(deployment, deployment, description)

_ = broadcast(deployment, "deployments/update")
{:is_active, is_active} when is_active != true ->
description = "deployment #{deployment.name} is inactive"
AuditLogs.audit!(deployment, deployment, description)

{:ok, deployment}
_ ->
:ignore
end)
end

{:error, changeset} ->
{:error, changeset}
defp recalculate_devices(%{recalculation_type: :calculator_queue} = deployment, changeset) do
if Enum.any?(
[:conditions, :is_active, :recalculation_type],
&Map.has_key?(changeset.changes, &1)
) do
create_inflight_checks(deployment)
else
:ok
end
end

@doc false
def update_deployment_calculator(deployment, params) do
result =
Repo.transaction(fn ->
device_count =
Device
|> select([d], count(d))
|> where([d], d.deployment_id == ^deployment.id)
|> Repo.one()

changeset =
deployment
|> Deployment.with_firmware()
|> Deployment.changeset(params)
|> Ecto.Changeset.put_change(:total_updating_devices, device_count)

case Repo.update(changeset) do
{:ok, deployment} ->
messages = []

deployment = Repo.preload(deployment, [:firmware], force: true)

if Enum.any?([:conditions, :is_active], &Map.has_key?(changeset.changes, &1)) do
create_inflight_checks(deployment)
end

if Map.has_key?(changeset.changes, :conditions) do
description = "deployment #{deployment.name} conditions changed"
AuditLogs.audit!(deployment, deployment, description)
end

# Trigger the new archive to get downloaded by devices
messages =
if Map.has_key?(changeset.changes, :archive_id) do
description = "deployment #{deployment.name} has a new archive"
AuditLogs.audit!(deployment, deployment, description)

payload = %{archive_id: deployment.archive_id}
[{"archives/updated", payload} | messages]
else
messages
end

if Map.has_key?(changeset.changes, :is_active) do
if !deployment.is_active do
description = "deployment #{deployment.name} is inactive"
AuditLogs.audit!(deployment, deployment, description)
end
end

messages = [{"deployments/update", %{}} | messages]

{deployment, messages}

{:error, changeset} ->
Repo.rollback(changeset)
end
end)
# Default is to make connected devices perform the recalculation
# within the DeviceSocket process.
# This will eventually be deprecated
defp recalculate_devices(deployment, changeset) do
# Don't want the inflight calculator to continue through devices since
# we have changed calculation types
if changeset.changes[:recalculation_type], do: delete_inflight_checks(deployment)

payload = %{
id: deployment.id,
active: deployment.is_active,
product_id: deployment.product_id,
platform: deployment.firmware.platform,
architecture: deployment.firmware.architecture,
version: deployment.firmware.version,
conditions: deployment.conditions
}

case result do
{:ok, {deployment, messages}} ->
Enum.each(messages, fn {event, payload} ->
broadcast(deployment, event, payload)
end)
conditions_changed? = Map.has_key?(changeset.changes, :conditions)
is_active_changed? = Map.has_key?(changeset.changes, :is_active)
activated? = is_active_changed? and deployment.is_active
deactivated? = is_active_changed? and !deployment.is_active

{:ok, deployment}
if conditions_changed? or deactivated? do
# Wipe all devices attached to this deployment
Device
|> where([d], d.deployment_id == ^deployment.id)
|> Repo.update_all(set: [deployment_id: nil])

description = "deployment #{deployment.name} change removed all devices"
AuditLogs.audit!(deployment, deployment, description)
end

{:error, error} ->
{:error, error}
if conditions_changed? and deployment.conditions["version"] in [nil, ""] and
deployment.is_active do
# The version condition is the only one not done with the DB.
# This opens up a minor optimization to preemptively set matching
# devices to the new deployment all at once since the version
# condition can be skipped.
#
# This also helps with offline devices by potentially reducing the
# need to do the expensive deployment check on next connect which
# reduces the load when a lot of devices come online at once.
Device
|> where([d], d.product_id == ^deployment.product_id)
|> where(
[d],
fragment("?->>'platform' = ?", d.firmware_metadata, ^deployment.firmware.platform)
)
|> where(
[d],
fragment(
"?->>'architecture' = ?",
d.firmware_metadata,
^deployment.firmware.architecture
)
)
|> where([d], fragment("? <@ ?", ^deployment.conditions["tags"], d.tags))
|> Repo.update_all(set: [deployment_id: deployment.id])
end

# Make sure relevant changed messages are broadcast for devices to
# pickup and recalculate
_ =
cond do
conditions_changed? ->
# Conditions change needs attached and unattached devices to recalculate
_ = broadcast(deployment, "deployments/changed", payload)
broadcast(:none, "deployments/changed", payload)

activated? ->
# Now changed to active, so tell the none deployment devices
broadcast(:none, "deployments/changed", payload)

deactivated? ->
# Tell the attached devices to recalculate
broadcast(deployment, "deployments/changed", payload)

true ->
:no_broadcast
end

:ok
end

@doc """
Expand All @@ -311,9 +273,7 @@ defmodule NervesHub.Deployments do
Also clears any previous inflight checks for this deployment.
"""
def create_inflight_checks(deployment) do
InflightDeploymentCheck
|> where([idc], idc.deployment_id == ^deployment.id)
|> Repo.delete_all()
delete_inflight_checks(deployment)

query =
Device
Expand All @@ -328,6 +288,19 @@ defmodule NervesHub.Deployments do
Repo.insert_all(InflightDeploymentCheck, query)
end

@doc """
Delete any matching inflight deployment checks for devices
"""
@spec delete_inflight_checks(Deployment.t()) :: :ok
def delete_inflight_checks(deployment) do
_ =
InflightDeploymentCheck
|> where([idc], idc.deployment_id == ^deployment.id)
|> Repo.delete_all()

:ok
end

@spec create_deployment(map) :: {:ok, Deployment.t()} | {:error, Changeset.t()}
def create_deployment(params) do
changeset = Deployment.creation_changeset(%Deployment{}, params)
Expand Down
5 changes: 3 additions & 2 deletions lib/nerves_hub/deployments/deployment.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ defmodule NervesHub.Deployments.Deployment do
:penalty_timeout_minutes,
:connecting_code,
:total_updating_devices,
:current_updated_devices
:current_updated_devices,
:recalculation_type
]

schema "deployments" do
Expand All @@ -65,7 +66,7 @@ defmodule NervesHub.Deployments.Deployment do
field(:total_updating_devices, :integer, default: 0)
field(:current_updated_devices, :integer, default: 0)
field(:inflight_update_expiration_minutes, :integer, default: 60)
field(:recalculation_type, :string, default: "broadcast")
field(:recalculation_type, Ecto.Enum, values: [:device, :calculator_queue], default: :device)

timestamps()
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule NervesHubWeb.API.DeploymentController do
plug(:validate_role, [org: :manage] when action in [:create, :update, :delete])
plug(:validate_role, [org: :view] when action in [:index, :show])

@whitelist_fields [:name, :org_id, :firmware_id, :conditions, :is_active]
@whitelist_fields [:name, :org_id, :firmware_id, :conditions, :is_active, :recalculation_type]

def index(%{assigns: %{product: product}} = conn, _params) do
deployments = Deployments.get_deployments_by_product(product.id)
Expand Down
Loading

0 comments on commit 78ce1c1

Please sign in to comment.