Coverage for gwcelery/tasks/core.py: 85%
46 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
1"""Base classes for other Celery tasks."""
2from celery import group
3from celery.utils.log import get_logger
5from .. import app
7log = get_logger(__name__)
10@app.task(shared=False)
11def identity(arg=None):
12 """Identity task (returns its input)."""
13 return arg
16@app.task(shared=False)
17def get_first(args):
18 """Get the first result of a group. Identity for scalar"""
19 try:
20 first, *_ = args
21 except TypeError:
22 first = args # if scalar
23 return first
26@app.task(shared=False)
27def get_last(args):
28 """Get the last result of a group. Identity for scalar"""
29 try:
30 *_, last = args
31 except TypeError:
32 last = args # if scalar
33 return last
36class DispatchHandler(dict):
38 def process_args(self, *args, **kwargs):
39 r"""Determine key and callback arguments.
41 The default implementation treats the first positional argument as the
42 key.
44 Parameters
45 ----------
46 \*args
47 Arguments passed to :meth:`__call__`.
48 \*\*kwargs
49 Keyword arguments passed to :meth:`__call__`.
51 Returns
52 -------
53 key
54 The key to determine which callback to invoke.
55 \*args
56 The arguments to pass to the registered callback.
57 \*\*kwargs
58 The keyword arguments to pass to the registered callback.
60 """
61 key, *args = args
62 return key, args, kwargs
64 def __call__(self, *keys, **kwargs):
65 r"""Create a new task and register it as a callback for handling the
66 given keys.
68 Parameters
69 ----------
70 \*keys : list
71 Keys to match
72 \*\*kwargs
73 Additional keyword arguments for `celery.Celery.task`.
75 """
76 def wrap(f):
77 f = app.task(ignore_result=True, **kwargs)(f)
78 for key in keys:
79 self.setdefault(key, []).append(f)
80 return f
82 return wrap
84 def dispatch(self, *args, **kwargs):
85 log.debug('considering dispatch: args=%r, kwargs=%r', args, kwargs)
86 try:
87 key, args, kwargs = self.process_args(*args, **kwargs)
88 except (TypeError, ValueError):
89 log.exception('error unpacking key')
90 return
91 log.debug('unpacked: key=%r, args=%r, kwargs=%r', key, args, kwargs)
93 try:
94 matching_handlers = self[key]
95 except KeyError:
96 log.warning('ignoring unrecognized key: %r', key)
97 else:
98 log.info('calling handlers %r for key %r', matching_handlers, key)
99 group([handler.s() for handler in matching_handlers]).apply_async(
100 args, kwargs)