1
0
Fork 0
nixos-config/nixos/roles/matrix-synapse/synapse-cleanup.hs
2021-05-23 03:19:54 +02:00

162 lines
6.7 KiB
Haskell
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE ExtendedDefaultRules #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# OPTIONS_GHC -Wall -Wno-missing-signatures -Wno-type-defaults -Wno-orphans #-}
import Control.Concurrent (threadDelay)
import Data.Aeson (FromJSON, decode)
import Data.String.Interpolate (i)
import Data.Time (UTCTime, addUTCTime, getCurrentTime, nominalDay)
import Data.Time.Clock.POSIX (posixSecondsToUTCTime, utcTimeToPOSIXSeconds)
import Database.PostgreSQL.Simple as PSQL (
Connection,
Only (Only, fromOnly),
connectPostgreSQL,
query,
query_,
)
import Network.HTTP (
Header (Header),
HeaderName (HdrAuthorization),
RequestMethod (DELETE),
Request_String,
Response (rspBody, rspReason),
getRequest,
insertHeaders,
postRequest,
postRequestWithBody,
rqMethod,
rspCode,
simpleHTTP,
)
import Relude
import Say (say, sayErr)
import Shh (ExecReference (Absolute), load, (|>))
import System.IO (BufferMode (LineBuffering), hSetBuffering)
-- Executables used.
load Absolute ["synapse-compress-state", "cat", "psql", "rm"]
newtype PurgeResult = PurgeResult {purge_id :: Text} deriving (Generic, FromJSON)
newtype Status = Status {status :: Text} deriving (Generic, FromJSON)
apiUrl = [i|http://localhost:8008/_synapse/admin/v1|] :: Text
daysOld = 30
lastMessages = 500
minUsersToPurgeRoom = 5
filename = "/var/lib/matrix-synapse/tmp-storage-compression.sql"
contentType = "application/json"
giveToken :: Text -> Request_String -> Request_String
giveToken token = insertHeaders [Header HdrAuthorization [i|Bearer #{token}|]]
getToken :: PSQL.Connection -> IO Text
getToken conn = extractFromList <$> query_ conn queryString
where
extractFromList = fromMaybe (error "No admin token in database") . viaNonEmpty head . fmap fromOnly
queryString = "SELECT token FROM access_tokens JOIN users ON user_id=name WHERE admin=1 ORDER BY id DESC LIMIT 1"
waitForPurge :: Text -> Text -> IO ()
waitForPurge token purgeId = do
result <- go 1
say [i|Purge result: #{result}|]
where
handeResponse timeout =
either
(\e -> pure [i|purge failed with error: #{e}|])
( \resp -> do
let res = maybe [i|couldnt parse purge response #{rspBody resp}|] status (decode . encodeUtf8 . rspBody $ resp)
if res == "active" then go (min (timeout * 2) 60) else pure res
)
go timeout = do
say [i|purge #{purgeId} is going on waiting #{timeout} seconds|]
threadDelay (timeout * 1000000)
handeResponse timeout =<< (simpleHTTP . giveToken token . getRequest) [i|#{apiUrl}/purge_history_status/#{purgeId}|]
queryLastKeptEvent :: PSQL.Connection -> Text -> IO (Maybe (Text, UTCTime))
queryLastKeptEvent conn roomId =
let process = fmap (second (posixSecondsToUTCTime . (/ 1000) . realToFrac)) . viaNonEmpty head . mapMaybe sequence
queryString = "SELECT event_id, received_ts from events WHERE type='m.room.message' AND room_id =? ORDER BY received_ts DESC LIMIT 1 offset ?"
in process <$> query conn queryString (roomId, lastMessages - 1)
purgeUpToEvent :: Text -> Text -> UTCTime -> (Text, UTCTime) -> IO ()
purgeUpToEvent token roomId upToTime (eventName, eventTime) =
handleResponse =<< simpleHTTP . giveToken token . postRequestWithBody url contentType =<< getBody
where
upToTimeStamp = floor . (* 1000) . utcTimeToPOSIXSeconds $ upToTime
timeOutBody = [i|{"delete_local_events":"true", "purge_up_to_ts":#{upToTimeStamp}}|]
url = [i|#{apiUrl}/purge_history/#{roomId}|]
getBody =
if eventTime < upToTime
then do
say [i|Deleting up to #{eventName} in #{roomId}.|]
pure [i|{"delete_local_events":"true", "purge_up_to_event_id":"#{eventName}"}|]
else do
say [i|Deleting up to last #{daysOld} days in #{roomId}.|]
pure timeOutBody
handleResponse =
either
(\e -> sayErr [i|Could not get purge status in #{roomId}. Error: #{e}|])
( \resp ->
maybe
(sayErr [i|Could not parse purge result: #{rspBody resp}|])
(\(purge_id -> purgeResult) -> say [i|Purging with id #{purgeResult} for room #{roomId}.|] >> waitForPurge token purgeResult)
(decode . encodeUtf8 . rspBody $ resp)
)
purgeRoom :: Text -> Text -> IO ()
purgeRoom token roomID = do
say [i|Deleting #{roomID}...|]
handleResponse =<< (simpleHTTP . giveToken token . \x -> x{rqMethod = DELETE}) (postRequestWithBody url contentType "{}")
where
url = [i|#{apiUrl}/rooms/#{roomID}|]
handleResponse = either printErr (\x -> say [i|#{rspCode x}: #{rspReason x}\n#{rspBody x}|])
printErr e = sayErr [i|Could not purge room #{roomID}. Error #{e}|]
processRoom :: Text -> PSQL.Connection -> UTCTime -> Text -> IO ()
processRoom token conn upToTime roomId = do
whenJustM (queryLastKeptEvent conn roomId) (purgeUpToEvent token roomId upToTime)
say [i|Compressing state in room #{roomId} ...|]
synapse_compress_state "-o" filename "-p" "host=/run/postgresql user=matrix-synapse dbname=matrix-synapse" "-r" (toString roomId)
cat filename |> psql "matrix-synapse"
rm filename
locallyUnjoinedRoomsQuery = "SELECT r.room_id FROM rooms AS r LEFT JOIN (SELECT room_id FROM local_current_membership WHERE membership = 'join' GROUP BY room_id) AS l ON l.room_id = r.room_id WHERE l.room_id IS NULL"
largeRoomsQuery = "SELECT q.room_id FROM (select count(*) as numberofusers, room_id FROM current_state_events WHERE type ='m.room.member' AND membership = 'join' GROUP BY room_id) AS q LEFT JOIN room_aliases a ON q.room_id=a.room_id WHERE q.numberofusers > ? ORDER BY numberofusers desc"
main :: IO ()
main = do
_ <- missingExecutables
hSetBuffering stdout LineBuffering
upToTime <- addUTCTime ((-1) * realToFrac daysOld * nominalDay) <$> getCurrentTime
let upToTimeStamp = floor . (* 1000) . utcTimeToPOSIXSeconds $ upToTime
conn <- connectPostgreSQL "dbname='matrix-synapse'"
token <- getToken conn
-- Get rooms without locally member and purge them
say "Purging obsolete rooms ..."
obsoleteRoomIds <- fromOnly @Text <<$>> query conn locallyUnjoinedRoomsQuery ()
mapM_ (purgeRoom token) obsoleteRoomIds
-- Get large rooms and then
-- 1. only keep max(last 30 days, 500 events) of history
-- 2. compress state events
say "Compressing large rooms ..."
largeRoomIds <- fromOnly @Text <<$>> query conn largeRoomsQuery (Only minUsersToPurgeRoom)
mapM_ (processRoom token conn upToTime) largeRoomIds
say "Pruning remote media ..."
_ <- simpleHTTP . giveToken token . postRequest $ [i|#{apiUrl}/purge_media_cache/?before_ts=#{upToTimeStamp}|]
say "Finished"