Coverage for gwcelery/tools/flask.py: 100%

24 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-14 05:52 +0000

1"""Flask web application for manually triggering certain tasks.""" 

2from contextlib import ExitStack, nullcontext 

3 

4import click 

5from celery.bin.base import LOG_LEVEL, CeleryDaemonCommand, CeleryOption 

6from celery.platforms import detached 

7from flask.cli import FlaskGroup 

8 

9from .. import views as _ # noqa: F401 

10from ..flask import app 

11 

12 

13class CeleryDaemonFlaskGroup(FlaskGroup): 

14 

15 def __init__(self, *args, **kwargs): 

16 super().__init__(self, *args, **kwargs) 

17 self.params.extend(CeleryDaemonCommand(name='flask').params) 

18 

19 

20def maybe_detached(*, detach, **kwargs): 

21 return detached(**kwargs) if detach else nullcontext() 

22 

23 

24@click.group(cls=CeleryDaemonFlaskGroup, help=__doc__, 

25 context_settings={'auto_envvar_prefix': 'FLASK'}) 

26@click.option('-D', 

27 '--detach', 

28 cls=CeleryOption, 

29 is_flag=True, 

30 default=False, 

31 help="Start as a background process.") 

32@click.option('-l', 

33 '--loglevel', 

34 default='WARNING', 

35 cls=CeleryOption, 

36 type=LOG_LEVEL, 

37 help="Logging level.") 

38@click.pass_context 

39def flask(ctx, detach=None, loglevel=None, logfile=None, 

40 pidfile=None, uid=None, gid=None, umask=None, **kwargs): 

41 # Look up Celery app 

42 celery_app = ctx.parent.obj.app 

43 

44 # # Prepare to pass Flask app to Flask CLI 

45 ctx.obj.create_app = lambda *args, **kwargs: app 

46 

47 # Detach from the tty if requested. 

48 # FIXME: After we update to click >= 8, replace the elaborate construction 

49 # below with ctx.with_resource(maybe_detached(...)) 

50 exit_stack = ExitStack() 

51 ctx.call_on_close(exit_stack.close) 

52 exit_stack.enter_context(maybe_detached(detach=detach, 

53 logfile=logfile, pidfile=pidfile, 

54 uid=uid, gid=gid, umask=umask)) 

55 

56 celery_app.log.setup(loglevel, logfile)