Add schedule module move some ExceptTs around
This commit is contained in:
parent
368bbb0124
commit
67127fd28d
@ -44,3 +44,5 @@ By setting `$BASE_DIR` you can persist the database for later runs.
|
||||
- CI
|
||||
- Docker container (in flake)
|
||||
- try fourmolu
|
||||
- Test leap second handling in the `time` package
|
||||
- use queue to sync logging of indexer and main thread
|
||||
|
||||
91
app/Main.hs
91
app/Main.hs
@ -11,10 +11,10 @@
|
||||
module Main (main) where
|
||||
|
||||
import Control.Monad.IO.Class (liftIO)
|
||||
import Control.Monad.Trans.Except (ExceptT (..), runExceptT)
|
||||
import Control.Monad.Trans.Except (ExceptT (..), catchE, runExceptT)
|
||||
import Data.Proxy (Proxy (..))
|
||||
import Data.Text (Text)
|
||||
import Data.Time (Day, toGregorian)
|
||||
import Data.Time (Day, addDays, toGregorian)
|
||||
import GHC.Generics (Generic)
|
||||
import Network.HTTP.Media ((//), (/:))
|
||||
import Network.HTTP.Types (Status (..), mkStatus, status200)
|
||||
@ -54,10 +54,12 @@ import qualified Text.Blaze.Html5 as H
|
||||
import qualified Text.Blaze.Html5.Attributes as A
|
||||
|
||||
import Envy (type (=@!), type (=@@), type (?))
|
||||
import Yore.DB (DayFile (..))
|
||||
import Yore.DB (DB, DayFile (..))
|
||||
import Yore.Error (Error (..))
|
||||
import Yore.Schedule (schedule)
|
||||
import Yore.Time (addYears, getCurrentDay, getSecondsUntilMidnight)
|
||||
|
||||
import Control.Concurrent (forkIO)
|
||||
import Control.Monad (forM, forM_)
|
||||
import Data.Bifunctor (Bifunctor (..))
|
||||
import qualified Envy
|
||||
@ -93,7 +95,37 @@ main = do
|
||||
Right c ->
|
||||
pure c
|
||||
|
||||
let
|
||||
db <- DB.initDB cfg.yoreDb
|
||||
_ <-
|
||||
runExceptT $
|
||||
catchE
|
||||
( DB.withConn db $ \conn -> do
|
||||
dayThen <- getTodayWithYearOffset (-100)
|
||||
indexDay cfg dayThen conn
|
||||
)
|
||||
(Log.error . show)
|
||||
|
||||
_ <- forkIO $ runIndexer cfg db
|
||||
runServer cfg db
|
||||
|
||||
runIndexer :: Config -> DB -> IO ()
|
||||
runIndexer cfg db = schedule (const True) $ do
|
||||
_ <-
|
||||
runExceptT $
|
||||
catchE
|
||||
( DB.withTransaction db $ \conn -> do
|
||||
dayThen <- getTodayWithYearOffset (-100)
|
||||
indexDay cfg dayThen conn
|
||||
dayThen2 <- addDays 1 <$> getTodayWithYearOffset (-100)
|
||||
indexDay cfg dayThen2 conn
|
||||
)
|
||||
(Log.error . show)
|
||||
pure ()
|
||||
|
||||
runServer :: Config -> DB -> IO ()
|
||||
runServer cfg db =
|
||||
Warp.runSettings settings $ serve (Proxy @API) $ hoistServer (Proxy @API) nt $ server cfg db
|
||||
where
|
||||
settings =
|
||||
foldr
|
||||
($)
|
||||
@ -104,13 +136,6 @@ main = do
|
||||
, Warp.setOnException onException
|
||||
]
|
||||
|
||||
db <- DB.initDB cfg.yoreDb
|
||||
DB.withConn db (runExceptT . indexDayWithOffset cfg (-100)) >>= \case
|
||||
Left err -> Log.error $ show err
|
||||
Right _ -> pure ()
|
||||
|
||||
Warp.runSettings settings $ serve (Proxy @API) $ hoistServer (Proxy @API) nt $ server cfg db
|
||||
where
|
||||
logger req status _ = do
|
||||
Log.info $
|
||||
printf
|
||||
@ -166,20 +191,20 @@ handlerToRaw handler = Tagged $ \_ respond -> do
|
||||
Right response ->
|
||||
respond response
|
||||
|
||||
server :: Config -> DB.DB -> ServerT API (ExceptT Error IO)
|
||||
server :: Config -> DB -> ServerT API (ExceptT Error IO)
|
||||
server cfg db = rootR :<|> todayR :<|> apiTodayR
|
||||
where
|
||||
rootR = todayR 0
|
||||
|
||||
todayR issue = do
|
||||
dateThen <- ExceptT get100YearsAgo
|
||||
count <- ExceptT $ DB.withConn' db $ DB.getNumberOfIssues dateThen
|
||||
dayFile <- ExceptT $ DB.withConn' db $ DB.getDayFileByIssue dateThen issue
|
||||
count <- DB.withConn db $ DB.lift . DB.getNumberOfIssues dateThen
|
||||
dayFile <- DB.withConn db $ DB.lift . DB.getDayFileByIssue dateThen issue
|
||||
pure $ RootModel dateThen dayFile issue count
|
||||
|
||||
apiTodayR issue = handlerToRaw $ do
|
||||
dateThen <- ExceptT get100YearsAgo
|
||||
dayFile <- ExceptT $ DB.withConn' db $ DB.getDayFileByIssue dateThen issue
|
||||
dayFile <- DB.withConn db $ DB.lift . DB.getDayFileByIssue dateThen issue
|
||||
let fullPath = cfg.yoreDownloadDir </> dayFile.relative_path
|
||||
secondsUntilMidnight <- liftIO getSecondsUntilMidnight
|
||||
pure $
|
||||
@ -231,36 +256,30 @@ instance Accept HTML where
|
||||
contentType _ = "text" // "html" /: ("charset", "utf-8")
|
||||
|
||||
get100YearsAgo :: IO (Either Error Day)
|
||||
get100YearsAgo = getTodayWithYearOffset (-100)
|
||||
get100YearsAgo = runExceptT $ getTodayWithYearOffset (-100)
|
||||
|
||||
getTodayWithYearOffset :: Integer -> IO (Either Error Day)
|
||||
getTodayWithYearOffset :: Integer -> ExceptT Error IO Day
|
||||
getTodayWithYearOffset offset =
|
||||
first (GenericError . (Text.pack (printf "can't go back %s years: " (-offset)) <>)) . addYears offset <$> getCurrentDay
|
||||
ExceptT $
|
||||
first (GenericError . (Text.pack (printf "can't go back %s years: " (-offset)) <>)) . addYears offset <$> getCurrentDay
|
||||
|
||||
indexDayWithOffset :: Config -> Integer -> Opium.Connection -> ExceptT Error IO ()
|
||||
indexDayWithOffset cfg offset conn = do
|
||||
dayThen <- ExceptT $ getTodayWithYearOffset offset
|
||||
runDb $ Opium.execute_ "BEGIN" conn
|
||||
indexDay :: Config -> Day -> Opium.Connection -> ExceptT Error IO ()
|
||||
indexDay cfg dayThen conn = do
|
||||
-- Transaction-level lock released automatically after transaction
|
||||
runDb $ Opium.execute_ "SELECT pg_advisory_xact_lock(42);" conn
|
||||
mbDi <- runDb $ DB.readDayIndex dayThen conn
|
||||
DB.lift $ Opium.execute_ "SELECT pg_advisory_xact_lock(42);" conn
|
||||
mbDi <- DB.lift $ DB.readDayIndex dayThen conn
|
||||
case mbDi of
|
||||
Just _ ->
|
||||
liftIO $ Log.info $ printf "index for %s already exists." (show dayThen)
|
||||
Log.info $ printf "index for %s already exists." (show dayThen)
|
||||
Nothing -> do
|
||||
liftIO $ Log.info $ printf "scraping issues for %s" (show dayThen)
|
||||
Log.info $ printf "scraping issues for %s" (show dayThen)
|
||||
issues <- liftIO $ getIssuesByDay dayThen
|
||||
paths <- forM issues $ \issue -> do
|
||||
liftIO $ Log.info $ printf "downloading %s" issue.url
|
||||
Log.info $ printf "downloading %s" issue.url
|
||||
path <- liftIO $ downloadInto cfg.yoreDownloadDir issue.url
|
||||
pure (issue.label, path)
|
||||
liftIO $ Log.info "creating DB entries"
|
||||
dayIndex <- runDb $ DB.createDayIndex dayThen conn
|
||||
Log.info "creating DB entries"
|
||||
dayIndex <- DB.lift $ DB.createDayIndex dayThen conn
|
||||
forM_ paths $ \(text, url) ->
|
||||
runDb $ DB.createDayFile dayIndex.day_index_id text url conn
|
||||
runDb $ Opium.execute_ "COMMIT" conn
|
||||
liftIO $ Log.info "done."
|
||||
|
||||
runDb :: IO (Either Opium.Error a) -> ExceptT Error IO a
|
||||
runDb f =
|
||||
ExceptT $ first DBError <$> f
|
||||
DB.lift $ DB.createDayFile dayIndex.day_index_id text url conn
|
||||
Log.info "done."
|
||||
|
||||
@ -1,3 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
YORE_DB="host=localhost port=${DB_PORT} user=${DB_USER} dbname=${DB_DBNAME}" cabal repl exe:yore
|
||||
@ -26,4 +26,6 @@ fi
|
||||
|
||||
DATABASE_URL="postgres://${DB_USER}@localhost:${DB_PORT}/${DB_DBNAME}?sslmode=disable" dbmate up
|
||||
|
||||
export YORE_DB="host=localhost port=${DB_PORT} user=${DB_USER} dbname=${DB_DBNAME}"
|
||||
|
||||
"$@"
|
||||
|
||||
@ -9,7 +9,7 @@ module Yore.DB
|
||||
, Error (..)
|
||||
, initDB
|
||||
, withConn
|
||||
, withConn'
|
||||
, withTransaction
|
||||
, DayIndex (..)
|
||||
, DayFile (..)
|
||||
, createDayFile
|
||||
@ -18,9 +18,11 @@ module Yore.DB
|
||||
, readDayPaths
|
||||
, getDayFileByIssue
|
||||
, getNumberOfIssues
|
||||
, lift
|
||||
) where
|
||||
|
||||
import Control.Concurrent (getNumCapabilities)
|
||||
import Control.Monad.Trans.Except (ExceptT (..), catchE, runExceptT, throwE)
|
||||
import Data.Functor.Identity (Identity (..))
|
||||
import Data.Pool (Pool, defaultPoolConfig, newPool, withResource)
|
||||
import Data.Text (Text)
|
||||
@ -49,22 +51,33 @@ initDB connString = do
|
||||
|
||||
-- TODO: This should probably also do something like bracket...
|
||||
-- For now let's assume no exceptions are thrown in f.
|
||||
withConn :: DB -> (Opium.Connection -> IO (Either Error a)) -> IO (Either Error a)
|
||||
withConn :: DB -> (Opium.Connection -> ExceptT Error IO a) -> ExceptT Error IO a
|
||||
withConn (DB connPool) f =
|
||||
withResource connPool $ \case
|
||||
ExceptT $ withResource connPool $ \case
|
||||
Left connectionError ->
|
||||
pure $ Left $ ConnectionError connectionError
|
||||
Right conn ->
|
||||
f conn >>= \case
|
||||
Left err -> do
|
||||
-- rollback open transactions and release transaction level locks.
|
||||
_ <- Opium.execute_ "ROLLBACK" conn
|
||||
pure $ Left err
|
||||
Right x ->
|
||||
pure $ Right x
|
||||
runExceptT $ f conn
|
||||
|
||||
withConn' :: DB -> (Opium.Connection -> IO (Either Opium.Error a)) -> IO (Either Error a)
|
||||
withConn' db f = withConn db $ fmap (first DBError) . f
|
||||
withTransaction :: DB -> (Opium.Connection -> ExceptT Error IO a) -> ExceptT Error IO a
|
||||
withTransaction db f =
|
||||
withConn db $ \conn ->
|
||||
( do
|
||||
lift $ Opium.execute_ "BEGIN" conn
|
||||
x <- f conn
|
||||
lift $ Opium.execute_ "COMMIT" conn
|
||||
pure x
|
||||
)
|
||||
`catchE` ( \err -> do
|
||||
-- rollback open transactions and release transaction level locks...
|
||||
lift $ Opium.execute_ "ROLLBACK" conn
|
||||
-- ...then rethrow the error so the app can handle it too.
|
||||
throwE err
|
||||
)
|
||||
|
||||
lift :: IO (Either Opium.Error a) -> ExceptT Error IO a
|
||||
lift f =
|
||||
ExceptT $ first DBError <$> f
|
||||
|
||||
data DayIndex = DayIndex
|
||||
{ day_index_id :: Int
|
||||
|
||||
@ -1,18 +1,19 @@
|
||||
module Yore.Log (Yore.Log.error, info) where
|
||||
|
||||
import Control.Monad.IO.Class (MonadIO (..))
|
||||
import Data.Time (ZonedTime, getZonedTime)
|
||||
import Data.Time.Format (defaultTimeLocale, formatTime)
|
||||
import GHC.Stack (HasCallStack, SrcLoc (..), callStack, getCallStack)
|
||||
import Text.Printf (printf)
|
||||
|
||||
info :: (HasCallStack) => String -> IO ()
|
||||
info :: (HasCallStack, MonadIO m) => String -> m ()
|
||||
info = doLog "INF"
|
||||
|
||||
error :: (HasCallStack) => String -> IO ()
|
||||
error :: (HasCallStack, MonadIO m) => String -> m ()
|
||||
error = doLog "ERR"
|
||||
|
||||
doLog :: (HasCallStack) => String -> String -> IO ()
|
||||
doLog level msg = do
|
||||
doLog :: (HasCallStack, MonadIO m) => String -> String -> m ()
|
||||
doLog level msg = liftIO $ do
|
||||
now <- getZonedTime
|
||||
let location = getLocation $ getCallStack callStack
|
||||
printf "(%s) (%s) (%s) %s\n" (iso8601Show now) level location msg
|
||||
|
||||
21
src/Yore/Schedule.hs
Normal file
21
src/Yore/Schedule.hs
Normal file
@ -0,0 +1,21 @@
|
||||
module Yore.Schedule (schedule) where
|
||||
|
||||
import Control.Monad (when)
|
||||
import Data.Time (ZonedTime, getZonedTime, secondsToNominalDiffTime, nominalDiffTimeToSeconds)
|
||||
import Control.Concurrent (threadDelay, forkIO)
|
||||
import Data.Time.Clock.POSIX (getPOSIXTime)
|
||||
|
||||
schedule :: (ZonedTime -> Bool) -> IO () -> IO ()
|
||||
schedule shouldRunAt f = everyMinute $ \now -> when (shouldRunAt now) f
|
||||
|
||||
everyMinute :: (ZonedTime -> IO ()) -> IO ()
|
||||
everyMinute f = do
|
||||
-- Use POSIX time to avoid having to handle leap seconds
|
||||
now <- getPOSIXTime
|
||||
let posixSeconds :: Int
|
||||
posixSeconds = floor $ nominalDiffTimeToSeconds now
|
||||
lastFullMinute = secondsToNominalDiffTime $ fromIntegral $ posixSeconds - posixSeconds `mod` 60
|
||||
secondsSinceLastFullMinute = now - lastFullMinute
|
||||
threadDelay $ ceiling $ (60 - secondsSinceLastFullMinute) * 1_000_000
|
||||
_ <- forkIO $ f =<< getZonedTime
|
||||
everyMinute f
|
||||
@ -40,6 +40,7 @@ library
|
||||
, Yore.Index
|
||||
, Yore.Log
|
||||
, Yore.Repl
|
||||
, Yore.Schedule
|
||||
, Yore.Scrape
|
||||
, Yore.Time
|
||||
hs-source-dirs:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user