Coverage for gwcelery/email/bootsteps.py: 29%

58 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-14 05:52 +0000

1from threading import Thread 

2 

3from celery import bootsteps 

4from celery.utils.log import get_logger 

5from safe_netrc import netrc 

6 

7from .signals import email_received 

8 

9__all__ = ('Receiver',) 

10 

11log = get_logger(__name__) 

12 

13 

14class EmailBootStep(bootsteps.ConsumerStep): 

15 """Generic boot step to limit us to appropriate kinds of workers. 

16 

17 Only include this bootstep in workers that are started with the 

18 ``--email`` command line option. 

19 """ 

20 

21 def __init__(self, consumer, email=False, **kwargs): 

22 self.enabled = bool(email) 

23 

24 def start(self, consumer): 

25 log.info('Starting %s', self.name) 

26 

27 def stop(self, consumer): 

28 log.info('Stopping %s', self.name) 

29 

30 

31class Receiver(EmailBootStep): 

32 """Run the global email receiver in background thread.""" 

33 

34 name = 'email client' 

35 

36 def _runloop(self): 

37 from imapclient import IMAPClient 

38 from imapclient.exceptions import IMAPClientAbortError 

39 

40 username, _, password = netrc().authenticators(self._host) 

41 while self._running: 

42 try: 

43 log.debug('Starting new connection') 

44 with IMAPClient(self._host, use_uid=True, timeout=30) as conn: 

45 log.debug('Logging in') 

46 conn.login(username, password) 

47 log.debug('Selecting inbox') 

48 conn.select_folder('inbox') 

49 while self._running: 

50 log.debug('Searching for new messages') 

51 messages = conn.search() 

52 log.debug('Fetching new messages') 

53 for msgid, data in conn.fetch( 

54 messages, ['RFC822']).items(): 

55 log.debug('Sending signal for new email') 

56 email_received.send(None, rfc822=data[b'RFC822']) 

57 log.debug('Deleting email') 

58 conn.delete_messages(msgid) 

59 log.debug('Starting idle') 

60 conn.idle() 

61 # Stay in IDLE mode for at most 5 minutes. 

62 # According to the imapclient documentation: 

63 # 

64 # > Note that IMAPClient does not handle low-level 

65 # > socket errors that can happen when maintaining 

66 # > long-lived TCP connections. Users are advised to 

67 # > renew the IDLE command every 10 minutes to avoid 

68 # > the connection from being abruptly closed. 

69 for _ in range(60): 

70 if not self._running or conn.idle_check(timeout=5): 

71 break 

72 log.debug('Idle done') 

73 conn.idle_done() 

74 except IMAPClientAbortError: 

75 log.exception('IMAP connection aborted') 

76 except ConnectionResetError: 

77 log.exception('IMAP connection reset') 

78 

79 def start(self, consumer): 

80 super().start(consumer) 

81 self._host = consumer.app.conf['email_host'] 

82 self._running = True 

83 self._thread = Thread(target=self._runloop, name='EmailClientThread') 

84 self._thread.start() 

85 

86 def stop(self, consumer): 

87 super().stop(consumer) 

88 self._running = False 

89 self._thread.join()