Coverage for gwcelery/voevent/util.py: 85%
13 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1"""VOEvent-related utilities."""
2import ipaddress
3import socket
6def get_host_port(address):
7 """Split a network address of the form ``host:port``.
9 Parameters
10 ----------
11 network : str
12 The network address.
14 Returns
15 -------
16 host : str
17 The hostname, or an empty string if missing.
18 port : int, None
19 The port number, or None if missing.
21 """
22 host, _, port = address.partition(':')
23 return host, (int(port) if port else None)
26def get_local_ivo(app):
27 """Create an IVOID to identify this application in VOEvent Transport
28 Protocol packets.
30 Returns
31 -------
32 str
33 A local IVOID composed of the machine's fully qualified domain name and
34 the Celery application name (for example,
35 `ivo://emfollow.ligo.caltech.edu/gwcelery`).
37 """
38 return 'ivo://{}/{}'.format(socket.getfqdn(), app.main)
41def get_network(address):
42 """Find the IP network prefix for a hostname or CIDR notation.
44 Parameters
45 ----------
46 address : str
47 A hostname, such as ``ligo.org``, or an IP address prefix in CIDR
48 notation, such as ``127.0.0.0/8``.
50 Returns
51 -------
52 ipaddress.IPv4Network
53 An object representing the IP address prefix.
55 """
56 try:
57 net = ipaddress.ip_network(address, strict=False)
58 except ValueError:
59 net = ipaddress.ip_network(socket.gethostbyname(address), strict=False)
60 return net