Coverage for gwcelery/email/bootsteps.py: 29%
58 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1from threading import Thread
3from celery import bootsteps
4from celery.utils.log import get_logger
5from safe_netrc import netrc
7from .signals import email_received
9__all__ = ('Receiver',)
11log = get_logger(__name__)
14class EmailBootStep(bootsteps.ConsumerStep):
15 """Generic boot step to limit us to appropriate kinds of workers.
17 Only include this bootstep in workers that are started with the
18 ``--email`` command line option.
19 """
21 def __init__(self, consumer, email=False, **kwargs):
22 self.enabled = bool(email)
24 def start(self, consumer):
25 log.info('Starting %s', self.name)
27 def stop(self, consumer):
28 log.info('Stopping %s', self.name)
31class Receiver(EmailBootStep):
32 """Run the global email receiver in background thread."""
34 name = 'email client'
36 def _runloop(self):
37 from imapclient import IMAPClient
38 from imapclient.exceptions import IMAPClientAbortError
40 username, _, password = netrc().authenticators(self._host)
41 while self._running:
42 try:
43 log.debug('Starting new connection')
44 with IMAPClient(self._host, use_uid=True, timeout=30) as conn:
45 log.debug('Logging in')
46 conn.login(username, password)
47 log.debug('Selecting inbox')
48 conn.select_folder('inbox')
49 while self._running:
50 log.debug('Searching for new messages')
51 messages = conn.search()
52 log.debug('Fetching new messages')
53 for msgid, data in conn.fetch(
54 messages, ['RFC822']).items():
55 log.debug('Sending signal for new email')
56 email_received.send(None, rfc822=data[b'RFC822'])
57 log.debug('Deleting email')
58 conn.delete_messages(msgid)
59 log.debug('Starting idle')
60 conn.idle()
61 # Stay in IDLE mode for at most 5 minutes.
62 # According to the imapclient documentation:
63 #
64 # > Note that IMAPClient does not handle low-level
65 # > socket errors that can happen when maintaining
66 # > long-lived TCP connections. Users are advised to
67 # > renew the IDLE command every 10 minutes to avoid
68 # > the connection from being abruptly closed.
69 for _ in range(60):
70 if not self._running or conn.idle_check(timeout=5):
71 break
72 log.debug('Idle done')
73 conn.idle_done()
74 except IMAPClientAbortError:
75 log.exception('IMAP connection aborted')
76 except ConnectionResetError:
77 log.exception('IMAP connection reset')
79 def start(self, consumer):
80 super().start(consumer)
81 self._host = consumer.app.conf['email_host']
82 self._running = True
83 self._thread = Thread(target=self._runloop, name='EmailClientThread')
84 self._thread.start()
86 def stop(self, consumer):
87 super().stop(consumer)
88 self._running = False
89 self._thread.join()