Updates.
[cas.git] / src / ssh_keys / utils.py
1 from datetime import datetime, timedelta
2 import re
3 import subprocess
4
5
6 def get_key_details(key):
7     """
8     >>> get_key_details('ssh-dss AAAAB3NzaC1kc3MAAACBAJxrocPXtCxwgg5yvOc1NLFFz/Fql4+7sOgMOkwWO6pxpJ4bPZgzZ0B17/HGKxQaot3Nc7vzdkC3MBrDDbKrX4n9qB9yJBd0Kkr5X0K7SnBKU+7fbg+rloUdYE78LS6ap05+xlJ8dU918DnS3KqcT/YQQXaTLrt/2DUOM1qxCI1XAAAAFQCJXLN0vYx7SIYMQ0zhv9IUT5WhgQAAAIAT2new16avxvs56zU87t1QQe0qwbQEIUygWW6vqnc9Lo9aSf21sM5WAHTkEnTVyiFSI6K6Q6bD2OUMvS2oWaoariW8EFKzg7/pufmThG0oAxkloc3j8gMO2+xuw7yHzP2pd6xgosNkqivpsGT1PKo+vM6x8p9B6PvipHPqhgFHWQAAAIEAhgFE3+gfPpfDIhaPP5Adx4Hm0VO3xBgOtafvunv3kP54kvHuTaD2uLwgcdOsMedv1/tqhJddh4+9ibwhlKbxKLHrQIcGSHCIY/BoA4RnpSBlGoXEc2buLoZ9IwANCIa2mp19Q/v4wwLnTJHabdMkNCiUn8NPEiHUPjgIj1uoCgo= epsilon')
9     {'algo': 'DSA', 'bits': 1024, 'sha256': 'o2DscHLwvUsYJ7a5eYgc2B4hQO4rf19u17W9Hh/H2h8', 'comment': 'epsilon', 'md5': 'f6:78:a7:9e:6b:41:dc:31:f4:39:12:5f:2b:0c:7a:11'}
10     """
11     process = subprocess.run(
12         ['ssh-keygen', '-lEsha256', '-f-'],
13         input=key.encode('utf-8'),
14         stdout=subprocess.PIPE,
15         stderr=subprocess.PIPE)
16     if process.returncode != 0:
17         raise ValueError(process.stderr.decode('utf-8'))
18
19     output = process.stdout.decode('utf-8').rstrip()
20     m = re.match(
21         r'^(?P<bits>\d+) SHA256:(?P<sha256>[^ ]+) (?P<comment>.*) \((?P<algo>.*)\)$',
22         output
23     )
24     data = {
25         'algo':  m.group('algo'),
26         'bits': int(m.group('bits')),
27         'sha256': m.group('sha256'),
28         'comment': m.group('comment'),
29     }
30
31     process = subprocess.run(
32         ['ssh-keygen', '-lEmd5', '-f-'],
33         input=key.encode('utf-8'),
34         stdout=subprocess.PIPE,
35         stderr=subprocess.PIPE)
36     if process.returncode != 0:
37         raise ValueError(process.stderr.decode('utf-8'))
38
39     output = process.stdout.decode('utf-8').rstrip()
40     m = re.match(
41         r'^(?P<bits>\d+) MD5:(?P<md5>[a-f0-9:]+) (?P<comment>.*) \((?P<algo>.*)\)$',
42         output
43     )
44     data['md5'] = m.group('md5')
45
46     return data
47
48
49 def parse_log_line(line, year=None, allow_future_days=7):
50     """
51     >>> sorted(parse_log_line(
52     ...     'Jan  1 2:34:56 heta sshd[4112]: Accepted publickey for localuser from 0.0.0.0 port 33980 ssh2: RSA f6:78:a7:9e:6b:41:dc:31:f4:39:12:5f:2b:0c:7a:11',
53     ...     year=2019).items())
54     [('algo', 'RSA'), ('datetime', datetime.datetime(2019, 1, 1, 2, 34, 56)), ('host', 'heta'), ('ip', '0.0.0.0'), ('md5', 'f6:78:a7:9e:6b:41:dc:31:f4:39:12:5f:2b:0c:7a:11'), ('user', 'localuser')]
55     >>> sorted(parse_log_line(
56     ...     'Jan  1 2:34:56 heta sshd[4112]: Accepted publickey for localuser from 0.0.0.0 port 33980 ssh2: RSA SHA256:9FLLdMdArtybwaoS45qtNZSmcBsr1pR2t8c9O+XODBw',
57     ...     year=2019).items())
58     [('algo', 'RSA'), ('datetime', datetime.datetime(2019, 1, 1, 2, 34, 56)), ('host', 'heta'), ('ip', '0.0.0.0'), ('sha256', '9FLLdMdArtybwaoS45qtNZSmcBsr1pR2t8c9O+XODBw'), ('user', 'localuser')]
59
60     """
61     logline_re = r'^(?P<time>\w{3}\s+\d+\s+\d?\d:\d\d:\d\d) (?P<host>\S+) .*: Accepted publickey for (?P<user>\S*) from (?P<ip>[\S]+) port .* ssh2: (?P<algo>\w+) ((?P<md5>[a-f0-9:]+)|SHA256:(?P<sha256>[a-zA-Z0-9+/]+))$'
62
63     m = re.match(logline_re, line)
64     if m is None:
65         return None
66     dt = datetime.strptime(m['time'], '%b %d %H:%M:%S')
67     if year is not None:
68         dt = dt.replace(year=year)
69     else:
70         now = datetime.now()
71         dt = dt.replace(year=now.year)
72         if (dt - now).days > allow_future_days:
73             dt = dt.replace(year=dt.year - 1)
74
75     data = {
76         'datetime': dt,
77         'host': m['host'],
78         'user': m['user'],
79         'ip': m['ip'],
80         'algo': m['algo'],
81     }
82     if m['md5']:
83         data['md5'] = m['md5']
84     if m['sha256']:
85         data['sha256'] = m['sha256']
86
87     return data