Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog.d/5-internal/WPB-21964-delete-conv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- Moved conversation deletion logic from `Galley.API.Action` to `Wire.ConversationSubsystem.Interpreter`
- Relocated remote member deletion utilities:
- `Galley.API.Action.deleteMembersInRemoteConversation` → `Wire.ConversationSubsystem.Util.deleteMembersInRemoteConversation`
- Updated conversation deletion implementation to handle:
- MLS group cleanup (clients and proposals)
- Sub-conversation removal
- Code key cleanup
- Team conversation vs regular conversation handling
- Added tests for conversation deletion edge cases
11 changes: 11 additions & 0 deletions charts/background-worker/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,15 @@ data:
{{- if .postgresMigration }}
postgresMigration: {{- toYaml .postgresMigration | nindent 6 }}
{{- end }}
settings:
{{- if .settings.conversationCodeURI }}
conversationCodeURI: {{ .settings.conversationCodeURI | quote }}
{{- else if .settings.multiIngress }}
multiIngress: {{- toYaml .settings.multiIngress | nindent 8 }}
{{- else }}
{{ fail "Either settings.conversationCodeURI or settings.multiIngress have to be set"}}
{{- end }}
{{- if (and .settings.conversationCodeURI .settings.multiIngress) }}
{{ fail "settings.conversationCodeURI and settings.multiIngress are mutually exclusive" }}
{{- end }}
{{- end }}
14 changes: 14 additions & 0 deletions charts/background-worker/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ config:
migrateConversationsOptions:
pageSize: 10000
parallelism: 2

settings:
# Either `conversationCodeURI` or `multiIngress` must be set
# conversationCodeURI is the URI prefix for conversation invitation links
# It should be of form https://{ACCOUNT_PAGES}/conversation-join/
conversationCodeURI: null
# multiIngress is a Z-Host dependent setting of conversationCodeURI.
# Use this only if you want to expose the instance on multiple ingresses.
# If set it must be a map from Z-Host to URI prefix
# Example:
# multiIngress:
# wire.example: https://accounts.wire.example/conversation-join/
# example.net: https://accounts.example.net/conversation-join/
multiIngress: null
# This will start the migration of conversation codes.
# It's important to set `settings.postgresMigration.conversationCodes` to `migration-to-postgresql`
# before starting the migration.
Expand Down
2 changes: 2 additions & 0 deletions hack/helm_vars/wire-server/values.yaml.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,8 @@ background-worker:
conversation: {{ .Values.conversationStore }}
conversationCodes: {{ .Values.conversationCodesStore }}
teamFeatures: {{ .Values.teamFeaturesStore }}
settings:
conversationCodeURI: https://kube-staging-nginz-https.zinfra.io/conversation-join/
rabbitmq:
port: 5671
adminPort: 15671
Expand Down
1 change: 1 addition & 0 deletions libs/wire-subsystems/src/Wire/ConversationSubsystem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,6 @@ data ConversationSubsystem m a where
ConvId ->
UserId ->
ConversationSubsystem m (Maybe LocalMember)
DeleteConversation :: ConvId -> ConversationSubsystem m ()

makeSem ''ConversationSubsystem
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import Control.Lens hiding ((??))
import Data.Default
import Data.Id
import Data.Json.Util (ToJSONObject (toJSONObject))
import Data.Map.Strict qualified as Map
import Data.Misc (FutureWork (FutureWork))
import Data.Qualified
import Data.Range
Expand All @@ -44,6 +45,7 @@ import Wire.API.Conversation qualified as Public
import Wire.API.Conversation.Action
import Wire.API.Conversation.CellsState
import Wire.API.Conversation.Config
import Wire.API.Conversation.Protocol (ConversationMLSData (cnvmlsGroupId))
import Wire.API.Conversation.Role
import Wire.API.Error
import Wire.API.Error.Galley
Expand All @@ -65,6 +67,8 @@ import Wire.API.Team.Permission hiding (self)
import Wire.API.User
import Wire.BackendNotificationQueueAccess (BackendNotificationQueueAccess, enqueueNotificationsConcurrently)
import Wire.BrigAPIAccess
import Wire.CodeStore (CodeStore)
import Wire.CodeStore qualified as CodeStore
import Wire.ConversationStore (ConversationStore)
import Wire.ConversationStore qualified as ConvStore
import Wire.ConversationSubsystem
Expand All @@ -76,6 +80,8 @@ import Wire.FeaturesConfigSubsystem
import Wire.FederationAPIAccess (FederationAPIAccess)
import Wire.LegalHoldStore (LegalHoldStore)
import Wire.NotificationSubsystem as NS
import Wire.ProposalStore (ProposalStore)
import Wire.ProposalStore qualified as ProposalStore
import Wire.Sem.Now (Now)
import Wire.Sem.Now qualified as Now
import Wire.Sem.Random (Random)
Expand Down Expand Up @@ -126,7 +132,9 @@ interpretConversationSubsystem ::
Member (Input ConversationSubsystemConfig) r,
Member LegalHoldStore r,
Member TeamStore r,
Member UserClientIndexStore r
Member UserClientIndexStore r,
Member CodeStore r,
Member ProposalStore r
) =>
Sem (ConversationSubsystem : r) a ->
Sem r a
Expand All @@ -145,6 +153,8 @@ interpretConversationSubsystem = interpret $ \case
internalGetClientIdsImpl uids
ConversationSubsystem.InternalGetLocalMember cid uid ->
ConvStore.getLocalMember cid uid
ConversationSubsystem.DeleteConversation cid ->
deleteConversationImpl cid

createGroupConversationGeneric ::
forall r.
Expand Down Expand Up @@ -820,3 +830,31 @@ internalGetClientIdsImpl users = do
if isInternal
then fromUserClients <$> lookupClients users
else UserClientIndexStore.getClients users

deleteConversationImpl ::
( Member ConversationStore r,
Member CodeStore r,
Member ProposalStore r
) =>
ConvId ->
Sem r ()
deleteConversationImpl cid = do
mConv <- ConvStore.getConversation cid
forM_ mConv $ \storedConv -> do
let deleteGroup groupId = do
ConvStore.removeAllMLSClients groupId
ProposalStore.deleteAllProposals groupId

for_ (storedConv & mlsMetadata <&> cnvmlsGroupId . fst) $ \gidParent -> do
sconvs <- ConvStore.listSubConversations cid
for_ (Map.assocs sconvs) $ \(subid, mlsData) -> do
let gidSub = cnvmlsGroupId mlsData
ConvStore.deleteSubConversation cid subid
deleteGroup gidSub
deleteGroup gidParent

key <- CodeStore.makeKey cid
CodeStore.deleteCode key
case Data.convTeam storedConv of
Nothing -> ConvStore.deleteConversation cid
Just tid -> ConvStore.deleteTeamConversation tid cid
1 change: 1 addition & 0 deletions services/background-worker/background-worker.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ library
, bytestring-conversion
, cassandra-util
, containers
, cql-io
, data-timeout
, exceptions
, extended
Expand Down
14 changes: 14 additions & 0 deletions services/background-worker/background-worker.integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,17 @@ postgresMigration:
conversation: postgresql
conversationCodes: postgresql
teamFeatures: postgresql

settings:
# Either `conversationCodeURI` or `multiIngress` must be set
# conversationCodeURI is the URI prefix for conversation invitation links
# It should be of form https://{ACCOUNT_PAGES}/conversation-join/
conversationCodeURI: https://account.wire.com/conversation-join/
# multiIngress is a Z-Host dependent setting of conversationCodeURI.
# Use this only if you want to expose the instance on multiple ingresses.
# If set it must be a map from Z-Host to URI prefix
# Example:
# multiIngress:
# wire.example: https://accounts.wire.example/conversation-join/
# example.net: https://accounts.example.net/conversation-join/
multiIngress: null
2 changes: 2 additions & 0 deletions services/background-worker/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
, bytestring-conversion
, cassandra-util
, containers
, cql-io
, data-default
, data-timeout
, exceptions
Expand Down Expand Up @@ -70,6 +71,7 @@ mkDerivation {
bytestring-conversion
cassandra-util
containers
cql-io
data-timeout
exceptions
extended
Expand Down
47 changes: 42 additions & 5 deletions services/background-worker/src/Wire/BackgroundWorker/Env.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE RecordWildCards #-}

-- This file is part of the Wire Server implementation.
--
Expand All @@ -26,7 +25,8 @@ import Control.Monad.Base
import Control.Monad.Catch
import Control.Monad.Trans.Control
import Data.Domain (Domain)
import Data.Map.Strict qualified as Map
import Data.Map qualified as Map
import Data.Misc (HttpsUrl)
import HTTP2.Client.Manager
import Hasql.Pool qualified as Hasql
import Hasql.Pool.Extended
Expand Down Expand Up @@ -86,7 +86,8 @@ data Env = Env
gundeckEndpoint :: Endpoint,
sparEndpoint :: Endpoint,
galleyEndpoint :: Endpoint,
brigEndpoint :: Endpoint
brigEndpoint :: Endpoint,
convCodeURI :: Either HttpsUrl (Map Text HttpsUrl)
}

data BackendNotificationMetrics = BackendNotificationMetrics
Expand All @@ -106,6 +107,14 @@ mkWorkerRunningGauge :: IO (Vector Text Gauge)
mkWorkerRunningGauge =
register (vector "worker" $ gauge $ Prometheus.Info "wire_background_worker_running_workers" "Set to 1 when a worker is running")

validateSettings :: Opts -> IO (Either HttpsUrl (Map Text HttpsUrl))
validateSettings opts =
case (opts.settings.conversationCodeURI, opts.settings.multiIngress) of
(Nothing, Nothing) -> error "Either settings.conversationCodeURI or settings.multiIngress must be set"
(Just uri, Nothing) -> pure (Left uri)
(Nothing, Just mi) -> pure (Right mi)
(Just _, Just _) -> error "settings.conversationCodeURI and settings.multiIngress are mutually exclusive"

mkEnv :: Opts -> IO Env
mkEnv opts = do
logger <- Log.mkLogger opts.logLevel Nothing opts.logFormat
Expand All @@ -129,6 +138,8 @@ mkEnv opts = do
(BackgroundJobConsumer, False)
]
backendNotificationMetrics <- mkBackendNotificationMetrics
convCodeURI <- validateSettings opts
workerRunningGauge <- mkWorkerRunningGauge
let backendNotificationsConfig = opts.backendNotificationPusher
backgroundJobsConfig = opts.backgroundJobs
federationDomain = opts.federationDomain
Expand All @@ -137,15 +148,41 @@ mkEnv opts = do
galleyEndpoint = opts.galley
gundeckEndpoint = opts.gundeck
sparEndpoint = opts.spar
workerRunningGauge <- mkWorkerRunningGauge
hasqlPool <- initPostgresPool opts.postgresqlPool opts.postgresql opts.postgresqlPassword
amqpJobsPublisherChannel <-
mkRabbitMqChannelMVar logger (Just "background-worker-jobs-publisher") $
either id demoteOpts opts.rabbitmq.unRabbitMqOpts
amqpBackendNotificationsChannel <-
mkRabbitMqChannelMVar logger (Just "background-worker-backend-notifications") $
either id demoteOpts opts.rabbitmq.unRabbitMqOpts
pure Env {..}
pure
Env
{ http2Manager,
rabbitmqAdminClient,
rabbitmqVHost,
logger,
federatorInternal,
httpManager,
defederationTimeout,
backendNotificationMetrics,
backendNotificationsConfig,
backgroundJobsConfig,
workerRunningGauge,
statuses,
cassandra,
cassandraGalley,
cassandraBrig,
hasqlPool,
amqpJobsPublisherChannel,
amqpBackendNotificationsChannel,
federationDomain,
postgresMigration,
convCodeURI,
gundeckEndpoint,
sparEndpoint,
galleyEndpoint,
brigEndpoint
}

initHttp2Manager :: IO Http2Manager
initHttp2Manager = do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import Data.Qualified
import Data.Tagged (Tagged)
import Data.Text qualified as T
import Data.Text.Lazy qualified as TL
import Database.CQL.IO (ClientState)
import Galley.Types.Error (InternalError, InvalidInput, internalErrorDescription, legalHoldServiceUnavailable)
import Hasql.Pool (UsageError)
import Hasql.Pool (Pool, UsageError)
import Imports
import Network.HTTP.Client qualified as Http
import OpenSSL.Session qualified as SSL
Expand All @@ -59,6 +60,10 @@ import Wire.BackgroundJobsRunner (runJob)
import Wire.BackgroundJobsRunner.Interpreter hiding (runJob)
import Wire.BackgroundWorker.Env (AppT, Env (..))
import Wire.BrigAPIAccess.Rpc
import Wire.CodeStore (CodeStore)
import Wire.CodeStore.Cassandra
import Wire.CodeStore.DualWrite
import Wire.CodeStore.Postgres
import Wire.ConversationStore.Cassandra
import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres)
import Wire.ConversationSubsystem.Interpreter (interpretConversationSubsystem)
Expand All @@ -76,6 +81,7 @@ import Wire.LegalHoldStore.Env (LegalHoldEnv (..))
import Wire.NotificationSubsystem.Interpreter
import Wire.ParseException
import Wire.PostgresMigrationOpts
import Wire.ProposalStore.Cassandra (interpretProposalStoreToCassandra)
import Wire.Rpc
import Wire.Sem.Concurrency (ConcurrencySafety (Unsafe))
import Wire.Sem.Concurrency.IO (unsafelyPerformConcurrency)
Expand Down Expand Up @@ -174,6 +180,19 @@ dispatchJob job = do
let makeReq fpr url rb = makeVerifiedRequestIO env.logger extEnv fpr url rb
makeReqFresh fpr url rb = makeVerifiedRequestFreshManagerIO env.logger fpr url rb
in LegalHoldEnv {makeVerifiedRequest = makeReq, makeVerifiedRequestFreshManager = makeReqFresh}
convCodesStoreInterpreter ::
( Member (Input (Either HttpsUrl (Map Text HttpsUrl))) r,
Member (Input Pool) r,
Member (Error UsageError) r,
Member (Input ClientState) r,
Member (Embed IO) r
) =>
InterpreterFor CodeStore r
convCodesStoreInterpreter =
case env.postgresMigration.conversationCodes of
CassandraStorage -> interpretCodeStoreToCassandra
MigrationToPostgresql -> interpretCodeStoreToCassandraAndPostgres
PostgresqlStorage -> interpretCodeStoreToPostgres
runFinal @IO
. unsafelyPerformConcurrency @_ @'Unsafe
. embedToFinal @IO
Expand Down Expand Up @@ -212,6 +231,7 @@ dispatchJob job = do
. runInputConst env.cassandraGalley
. runInputConst legalHoldEnv
. runInputConst (ExposeInvitationURLsAllowlist [])
. runInputConst env.convCodeURI
. interpretServiceStoreToCassandra env.cassandraBrig
. interpretUserStoreCassandra env.cassandraBrig
. interpretUserGroupStoreToPostgres
Expand Down Expand Up @@ -244,6 +264,8 @@ dispatchJob job = do
. runFeaturesConfigSubsystem
. runInputSem getAllTeamFeaturesForServer
. interpretTeamCollaboratorsSubsystem
. convCodesStoreInterpreter
. interpretProposalStoreToCassandra
. interpretConversationSubsystem
. interpretBackgroundJobsRunner

Expand Down
16 changes: 14 additions & 2 deletions services/background-worker/src/Wire/BackgroundWorker/Options.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ module Wire.BackgroundWorker.Options where

import Data.Aeson
import Data.Domain (Domain)
import Data.Map
import Data.Misc
import Data.Range (Range)
import GHC.Generics
import Hasql.Pool.Extended
import Imports
import Network.AMQP.Extended
import System.Logger.Extended
import System.Logger.Extended hiding (Settings)
import Util.Options
import Wire.Migration
import Wire.PostgresMigrationOpts
Expand Down Expand Up @@ -57,7 +58,8 @@ data Opts = Opts
migrateConversationCodes :: !Bool,
migrateTeamFeatures :: !Bool,
backgroundJobs :: BackgroundJobsConfig,
federationDomain :: Domain
federationDomain :: Domain,
settings :: !Settings
}
deriving (Show, Generic)
deriving (FromJSON) via Generically Opts
Expand Down Expand Up @@ -99,3 +101,13 @@ data BackgroundJobsConfig = BackgroundJobsConfig
}
deriving (Show, Generic)
deriving (FromJSON) via Generically BackgroundJobsConfig

data Settings = Settings
{ -- | URI prefix for conversations with access mode 'code'
conversationCodeURI :: !(Maybe HttpsUrl),
-- | Map from Z-Host header to URI prefix for conversations with access mode 'code'
-- conversationCodeURI and multiIngress are mutually exclusive. One of both must be configured.
multiIngress :: !(Maybe (Map Text HttpsUrl))
}
deriving (Show, Generic)
deriving (FromJSON) via Generically Settings
Loading