Coverage for gwcelery/tools/flask.py: 100%
24 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
1"""Flask web application for manually triggering certain tasks."""
2from contextlib import ExitStack, nullcontext
4import click
5from celery.bin.base import LOG_LEVEL, CeleryDaemonCommand, CeleryOption
6from celery.platforms import detached
7from flask.cli import FlaskGroup
9from .. import views as _ # noqa: F401
10from ..flask import app
13class CeleryDaemonFlaskGroup(FlaskGroup):
15 def __init__(self, *args, **kwargs):
16 super().__init__(self, *args, **kwargs)
17 self.params.extend(CeleryDaemonCommand(name='flask').params)
20def maybe_detached(*, detach, **kwargs):
21 return detached(**kwargs) if detach else nullcontext()
24@click.group(cls=CeleryDaemonFlaskGroup, help=__doc__,
25 context_settings={'auto_envvar_prefix': 'FLASK'})
26@click.option('-D',
27 '--detach',
28 cls=CeleryOption,
29 is_flag=True,
30 default=False,
31 help="Start as a background process.")
32@click.option('-l',
33 '--loglevel',
34 default='WARNING',
35 cls=CeleryOption,
36 type=LOG_LEVEL,
37 help="Logging level.")
38@click.pass_context
39def flask(ctx, detach=None, loglevel=None, logfile=None,
40 pidfile=None, uid=None, gid=None, umask=None, **kwargs):
41 # Look up Celery app
42 celery_app = ctx.parent.obj.app
44 # # Prepare to pass Flask app to Flask CLI
45 ctx.obj.create_app = lambda *args, **kwargs: app
47 # Detach from the tty if requested.
48 # FIXME: After we update to click >= 8, replace the elaborate construction
49 # below with ctx.with_resource(maybe_detached(...))
50 exit_stack = ExitStack()
51 ctx.call_on_close(exit_stack.close)
52 exit_stack.enter_context(maybe_detached(detach=detach,
53 logfile=logfile, pidfile=pidfile,
54 uid=uid, gid=gid, umask=umask))
56 celery_app.log.setup(loglevel, logfile)