Coverage for gwcelery/voevent/util.py: 85%

13 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-14 05:52 +0000

1"""VOEvent-related utilities.""" 

2import ipaddress 

3import socket 

4 

5 

6def get_host_port(address): 

7 """Split a network address of the form ``host:port``. 

8 

9 Parameters 

10 ---------- 

11 network : str 

12 The network address. 

13 

14 Returns 

15 ------- 

16 host : str 

17 The hostname, or an empty string if missing. 

18 port : int, None 

19 The port number, or None if missing. 

20 

21 """ 

22 host, _, port = address.partition(':') 

23 return host, (int(port) if port else None) 

24 

25 

26def get_local_ivo(app): 

27 """Create an IVOID to identify this application in VOEvent Transport 

28 Protocol packets. 

29 

30 Returns 

31 ------- 

32 str 

33 A local IVOID composed of the machine's fully qualified domain name and 

34 the Celery application name (for example, 

35 `ivo://emfollow.ligo.caltech.edu/gwcelery`). 

36 

37 """ 

38 return 'ivo://{}/{}'.format(socket.getfqdn(), app.main) 

39 

40 

41def get_network(address): 

42 """Find the IP network prefix for a hostname or CIDR notation. 

43 

44 Parameters 

45 ---------- 

46 address : str 

47 A hostname, such as ``ligo.org``, or an IP address prefix in CIDR 

48 notation, such as ``127.0.0.0/8``. 

49 

50 Returns 

51 ------- 

52 ipaddress.IPv4Network 

53 An object representing the IP address prefix. 

54 

55 """ 

56 try: 

57 net = ipaddress.ip_network(address, strict=False) 

58 except ValueError: 

59 net = ipaddress.ip_network(socket.gethostbyname(address), strict=False) 

60 return net