-
Notifications
You must be signed in to change notification settings - Fork 686
/
build_yaml.py
1709 lines (1438 loc) · 67.3 KB
/
build_yaml.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import absolute_import
from __future__ import print_function
from collections import defaultdict
from copy import deepcopy
import datetime
import json
import os
import os.path
import re
import sys
from xml.sax.saxutils import escape
import yaml
from .build_cpe import CPEDoesNotExist
from .constants import XCCDF_REFINABLE_PROPERTIES, SCE_SYSTEM
from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir, get_rule_path_by_id
from .rule_yaml import parse_prodtype
from .cce import is_cce_format_valid, is_cce_value_valid
from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand
from .utils import required_key, mkdir_p
from .xml import ElementTree as ET
from .shims import unicode_func
def add_sub_element(parent, tag, data):
"""
Creates a new child element under parent with tag tag, and sets
data as the content under the tag. In particular, data is a string
to be parsed as an XML tree, allowing sub-elements of children to be
added.
If data should not be parsed as an XML tree, either escape the contents
before passing into this function, or use ElementTree.SubElement().
Returns the newly created subelement of type tag.
"""
# This is used because our YAML data contain XML and XHTML elements
# ET.SubElement() escapes the < > characters by < and >
# and therefore it does not add child elements
# we need to do a hack instead
# TODO: Remove this function after we move to Markdown everywhere in SSG
ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data)
try:
element = ET.fromstring(ustr.encode("utf-8"))
except Exception:
msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
.format(parent.tag, ustr))
raise RuntimeError(msg)
parent.append(element)
return element
def reorder_according_to_ordering(unordered, ordering, regex=None):
ordered = []
if regex is None:
regex = "|".join(["({0})".format(item) for item in ordering])
regex = re.compile(regex)
items_to_order = list(filter(regex.match, unordered))
unordered = set(unordered)
for priority_type in ordering:
for item in items_to_order:
if priority_type in item and item in unordered:
ordered.append(item)
unordered.remove(item)
ordered.extend(sorted(unordered))
return ordered
def add_warning_elements(element, warnings):
# The use of [{dict}, {dict}] in warnings is to handle the following
# scenario where multiple warnings have the same category which is
# valid in SCAP and our content:
#
# warnings:
# - general: Some general warning
# - general: Some other general warning
# - general: |-
# Some really long multiline general warning
#
# Each of the {dict} should have only one key/value pair.
for warning_dict in warnings:
warning = add_sub_element(element, "warning", list(warning_dict.values())[0])
warning.set("category", list(warning_dict.keys())[0])
def add_nondata_subelements(element, subelement, attribute, attr_data):
"""Add multiple iterations of a sublement that contains an attribute but no data
For example, <requires id="my_required_id"/>"""
for data in attr_data:
req = ET.SubElement(element, subelement)
req.set(attribute, data)
class Profile(object):
"""Represents XCCDF profile
"""
def __init__(self, id_):
self.id_ = id_
self.title = ""
self.description = ""
self.extends = None
self.selected = []
self.unselected = []
self.variables = dict()
self.refine_rules = defaultdict(list)
self.metadata = None
self.reference = None
# self.platforms is used further in the build system
# self.platform is merged into self.platforms
# it is here for backward compatibility
self.platforms = set()
self.cpe_names = set()
self.platform = None
self.rule_filter = noop_rule_filterfunc
def read_yaml_contents(self, yaml_contents, env_yaml):
self.title = required_key(yaml_contents, "title")
del yaml_contents["title"]
self.description = required_key(yaml_contents, "description")
del yaml_contents["description"]
self.extends = yaml_contents.pop("extends", None)
selection_entries = required_key(yaml_contents, "selections")
if selection_entries:
self._parse_selections(selection_entries, env_yaml)
del yaml_contents["selections"]
self.platforms = yaml_contents.pop("platforms", set())
self.platform = yaml_contents.pop("platform", None)
self.rule_filter = rule_filter_from_def(
yaml_contents.pop("filter_rules", None))
@classmethod
def from_yaml(cls, yaml_file, env_yaml):
yaml_contents = open_and_expand(yaml_file, env_yaml)
if yaml_contents is None:
return None
basename, _ = os.path.splitext(os.path.basename(yaml_file))
profile = cls(basename)
profile.read_yaml_contents(yaml_contents, env_yaml)
profile.reference = yaml_contents.pop("reference", None)
# ensure that content of profile.platform is in profile.platforms as
# well
if profile.platform is not None:
profile.platforms.add(profile.platform)
if env_yaml:
for platform in profile.platforms:
try:
profile.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
except CPEDoesNotExist:
print("Unsupported platform '%s' in profile '%s'." % (platform, profile.id_))
raise
# At the moment, metadata is not used to build content
if "metadata" in yaml_contents:
del yaml_contents["metadata"]
if yaml_contents:
raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
% (yaml_file, yaml_contents))
return profile
def dump_yaml(self, file_name, documentation_complete=True):
to_dump = {}
to_dump["documentation_complete"] = documentation_complete
to_dump["title"] = self.title
to_dump["description"] = self.description
to_dump["reference"] = self.reference
if self.metadata is not None:
to_dump["metadata"] = self.metadata
if self.extends is not None:
to_dump["extends"] = self.extends
if self.platforms:
to_dump["platforms"] = self.platforms
selections = []
for item in self.selected:
selections.append(str(item))
for item in self.unselected:
selections.append("!"+str(item))
for varname in self.variables.keys():
selections.append(varname+"="+self.variables.get(varname))
for rule, refinements in self.refine_rules.items():
for prop, val in refinements:
selections.append("{rule}.{property}={value}"
.format(rule=rule, property=prop, value=val))
to_dump["selections"] = selections
with open(file_name, "w+") as f:
yaml.dump(to_dump, f, indent=4)
def _parse_selections(self, entries, env_yaml):
for item in entries:
self.apply_selection(item, env_yaml)
def apply_selection(self, item, env_yaml):
if "." in item:
rule, refinement = item.split(".", 1)
property_, value = refinement.split("=", 1)
if property_ not in XCCDF_REFINABLE_PROPERTIES:
msg = ("Property '{property_}' cannot be refined. "
"Rule properties that can be refined are {refinables}. "
"Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
.format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
rule_id=rule, value=value, profile=self.id_)
)
raise ValueError(msg)
self.refine_rules[rule].append((property_, value))
elif "=" in item:
varname, value = item.split("=", 1)
self.variables[varname] = value
else:
product_dir = env_yaml.get('product_dir', None)
benchmark_root = env_yaml.get('benchmark_root', None)
content_dir = os.path.join(product_dir, benchmark_root)
if item.startswith("!"):
rule_id = item[1:]
else:
rule_id = item
rule_yaml = get_rule_path_by_id(content_dir, rule_id)
if rule_yaml is None:
raise ValueError("Unable to find rule '{}'".format(rule_id))
rule = Rule.from_yaml(rule_yaml, env_yaml)
if item.startswith("!"):
self.unselected.append(rule)
else:
self.selected.append(rule)
def to_xml_element(self):
element = ET.Element('Profile')
element.set("id", self.id_)
if self.extends:
element.set("extends", self.extends)
title = add_sub_element(element, "title", self.title)
title.set("override", "true")
desc = add_sub_element(element, "description", self.description)
desc.set("override", "true")
if self.reference:
add_sub_element(element, "reference", escape(self.reference))
for cpe_name in self.cpe_names:
plat = ET.SubElement(element, "platform")
plat.set("idref", cpe_name)
for selection in self.selected:
select = ET.Element("select")
select.set("idref", str(selection))
select.set("selected", "true")
element.append(select)
for selection in self.unselected:
unselect = ET.Element("select")
unselect.set("idref", str(selection))
unselect.set("selected", "false")
element.append(unselect)
for value_id, selector in self.variables.items():
refine_value = ET.Element("refine-value")
refine_value.set("idref", value_id)
refine_value.set("selector", selector)
element.append(refine_value)
for refined_rule, refinement_list in self.refine_rules.items():
refine_rule = ET.Element("refine-rule")
refine_rule.set("idref", refined_rule)
for refinement in refinement_list:
refine_rule.set(refinement[0], refinement[1])
element.append(refine_rule)
return element
def get_rule_selectors(self):
return list(rule.id_ for rule in self.selected + self.unselected)
def get_variable_selectors(self):
return self.variables
def validate_refine_rules(self, rules):
existing_rule_ids = [r.id_ for r in rules]
for refine_rule, refinement_list in self.refine_rules.items():
# Take first refinement to ilustrate where the error is
# all refinements in list are invalid, so it doesn't really matter
a_refinement = refinement_list[0]
if refine_rule not in existing_rule_ids:
msg = (
"You are trying to refine a rule that doesn't exist. "
"Rule '{rule_id}' was not found in the benchmark. "
"Please check all rule refinements for rule: '{rule_id}', for example: "
"- {rule_id}.{property_}={value}' in profile {profile_id}."
.format(rule_id=refine_rule, profile_id=self.id_,
property_=a_refinement[0], value=a_refinement[1])
)
raise ValueError(msg)
if refine_rule not in self.get_rule_selectors():
msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining "
"a rule that is not selected by it. The refinement will not have any "
"noticeable effect. Either select the rule or remove the rule refinement."
.format(rule_id=refine_rule, property_=a_refinement[0],
value=a_refinement[1], profile_id=self.id_)
)
raise ValueError(msg)
def validate_variables(self, variables):
variables_by_id = dict()
for var in variables:
variables_by_id[var.id_] = var
for var_id, our_val in self.variables.items():
if var_id not in variables_by_id:
all_vars_list = [" - %s" % v for v in variables_by_id.keys()]
msg = (
"Value '{var_id}' in profile '{profile_name}' is not known. "
"We know only variables:\n{var_names}"
.format(
var_id=var_id, profile_name=self.id_,
var_names="\n".join(sorted(all_vars_list)))
)
raise ValueError(msg)
allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()]
if our_val not in allowed_selectors:
msg = (
"Value '{var_id}' in profile '{profile_name}' "
"uses the selector '{our_val}'. "
"This is not possible, as only selectors {all_selectors} are available. "
"Either change the selector used in the profile, or "
"add the selector-value pair to the variable definition."
.format(
var_id=var_id, profile_name=self.id_, our_val=our_val,
all_selectors=allowed_selectors,
)
)
raise ValueError(msg)
def validate_rules(self, rules, groups):
existing_rule_ids = [r.id_ for r in rules]
rule_selectors = self.get_rule_selectors()
for id_ in rule_selectors:
if id_ in groups:
msg = (
"You have selected a group '{group_id}' instead of a "
"rule. Groups have no effect in the profile and are not "
"allowed to be selected. Please remove '{group_id}' "
"from profile '{profile_id}' before proceeding."
.format(group_id=id_, profile_id=self.id_)
)
raise ValueError(msg)
if id_ not in existing_rule_ids:
msg = (
"Rule '{rule_id}' was not found in the benchmark. Please "
"remove rule '{rule_id}' from profile '{profile_id}' "
"before proceeding."
.format(rule_id=id_, profile_id=self.id_)
)
raise ValueError(msg)
def __sub__(self, other):
profile = Profile(self.id_)
profile.title = self.title
profile.description = self.description
profile.extends = self.extends
profile.platforms = self.platforms
profile.platform = self.platform
profile.selected = list(set(self.selected) - set(other.selected))
profile.selected.sort()
profile.unselected = list(set(self.unselected) - set(other.unselected))
profile.variables = dict ((k, v) for (k, v) in self.variables.items()
if k not in other.variables or v != other.variables[k])
return profile
class ResolvableProfile(Profile):
def __init__(self, * args, ** kwargs):
super(ResolvableProfile, self).__init__(* args, ** kwargs)
self.resolved = False
self.resolved_selections = set()
def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
return items
def _merge_control(self, control):
self.selected.extend(control.rules)
for varname, value in control.variables.items():
if varname not in self.variables:
self.variables[varname] = value
def resolve_controls(self, controls_manager):
pass
def extend_by(self, extended_profile):
extended_selects = set(extended_profile.selected)
self.resolved_selections.update(extended_selects)
updated_variables = dict(extended_profile.variables)
updated_variables.update(self.variables)
self.variables = updated_variables
extended_refinements = deepcopy(extended_profile.refine_rules)
updated_refinements = self._subtract_refinements(extended_refinements)
updated_refinements.update(self.refine_rules)
self.refine_rules = updated_refinements
def resolve(self, all_profiles, controls_manager=None):
if self.resolved:
return
self.resolve_controls(controls_manager)
self.resolved_selections = set(rule for rule in self.selected if self.rule_filter(rule))
if self.extends:
if self.extends not in all_profiles:
msg = (
"Profile {name} extends profile {extended}, but "
"only profiles {known_profiles} are available for resolution."
.format(name=self.id_, extended=self.extends,
known_profiles=list(all_profiles.keys())))
raise RuntimeError(msg)
extended_profile = all_profiles[self.extends]
extended_profile.resolve(all_profiles, controls_manager)
self.extend_by(extended_profile)
for uns in self.unselected:
self.resolved_selections.discard(uns)
self.unselected = []
self.extends = None
self.selected = sorted(self.resolved_selections)
self.resolved = True
def _subtract_refinements(self, extended_refinements):
"""
Given a dict of rule refinements from the extended profile,
"undo" every refinement prefixed with '!' in this profile.
"""
for rule, refinements in list(self.refine_rules.items()):
if rule.startswith("!"):
for prop, val in refinements:
extended_refinements[rule[1:]].remove((prop, val))
del self.refine_rules[rule]
return extended_refinements
class ProfileWithSeparatePolicies(ResolvableProfile):
def __init__(self, * args, ** kwargs):
super(ProfileWithSeparatePolicies, self).__init__(* args, ** kwargs)
self.policies = {}
def read_yaml_contents(self, yaml_contents, env_yaml):
policies = yaml_contents.pop("policies", None)
if policies:
self._parse_policies(policies)
super(ProfileWithSeparatePolicies, self).read_yaml_contents(yaml_contents, env_yaml)
def _parse_policies(self, policies_yaml):
for item in policies_yaml:
id_ = required_key(item, "id")
controls_ids = required_key(item, "controls")
if not isinstance(controls_ids, list):
if controls_ids != "all":
msg = (
"Policy {id_} contains invalid controls list {controls}."
.format(id_=id_, controls=str(controls_ids)))
raise ValueError(msg)
self.policies[id_] = controls_ids
def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
controls = []
for cid in controls_ids:
if not cid.startswith("all"):
controls.extend(
self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
elif ":" in cid:
_, level_id = cid.split(":", 1)
controls.extend(
controls_manager.get_all_controls_of_level(policy_id, level_id))
else:
controls.extend(controls_manager.get_all_controls(policy_id))
return controls
def resolve_controls(self, controls_manager):
for policy_id, controls_ids in self.policies.items():
controls = []
if isinstance(controls_ids, list):
controls = self._process_controls_ids_into_controls(
controls_manager, policy_id, controls_ids)
elif controls_ids.startswith("all"):
controls = self._process_controls_ids_into_controls(
controls_manager, policy_id, [controls_ids])
else:
msg = (
"Unknown policy content {content} in profile {profile_id}"
.format(content=controls_ids, profile_id=self.id_))
raise ValueError(msg)
for c in controls:
self._merge_control(c)
def extend_by(self, extended_profile):
self.policies.update(extended_profile.policies)
super(ProfileWithSeparatePolicies, self).extend_by(extended_profile)
class ProfileWithInlinePolicies(ResolvableProfile):
def __init__(self, * args, ** kwargs):
super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
self.controls_by_policy = defaultdict(list)
def apply_selection(self, item, env_yaml):
# ":" is the delimiter for controls but not when the item is a variable
if ":" in item and "=" not in item:
policy_id, control_id = item.split(":", 1)
self.controls_by_policy[policy_id].append(control_id)
else:
super(ProfileWithInlinePolicies, self).apply_selection(item, env_yaml)
def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
controls = []
for cid in controls_ids:
if not cid.startswith("all"):
controls.extend(
self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
elif ":" in cid:
_, level_id = cid.split(":", 1)
controls.extend(
controls_manager.get_all_controls_of_level(policy_id, level_id))
else:
controls.extend(
controls_manager.get_all_controls(policy_id))
return controls
def resolve_controls(self, controls_manager):
for policy_id, controls_ids in self.controls_by_policy.items():
controls = self._process_controls_ids_into_controls(
controls_manager, policy_id, controls_ids)
for c in controls:
self._merge_control(c)
class Value(object):
"""Represents XCCDF Value
"""
def __init__(self, id_):
self.id_ = id_
self.title = ""
self.description = ""
self.type_ = "string"
self.operator = "equals"
self.interactive = False
self.options = {}
self.warnings = []
@staticmethod
def from_yaml(yaml_file, env_yaml=None):
yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
if yaml_contents is None:
return None
value_id, _ = os.path.splitext(os.path.basename(yaml_file))
value = Value(value_id)
value.title = required_key(yaml_contents, "title")
del yaml_contents["title"]
value.description = required_key(yaml_contents, "description")
del yaml_contents["description"]
value.type_ = required_key(yaml_contents, "type")
del yaml_contents["type"]
value.operator = yaml_contents.pop("operator", "equals")
possible_operators = ["equals", "not equal", "greater than",
"less than", "greater than or equal",
"less than or equal", "pattern match"]
if value.operator not in possible_operators:
raise ValueError(
"Found an invalid operator value '%s' in '%s'. "
"Expected one of: %s"
% (value.operator, yaml_file, ", ".join(possible_operators))
)
value.interactive = \
yaml_contents.pop("interactive", "false").lower() == "true"
value.options = required_key(yaml_contents, "options")
del yaml_contents["options"]
value.warnings = yaml_contents.pop("warnings", [])
for warning_list in value.warnings:
if len(warning_list) != 1:
raise ValueError("Only one key/value pair should exist for each dictionary")
if yaml_contents:
raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
% (yaml_file, yaml_contents))
return value
def to_xml_element(self):
value = ET.Element('Value')
value.set('id', self.id_)
value.set('type', self.type_)
if self.operator != "equals": # equals is the default
value.set('operator', self.operator)
if self.interactive: # False is the default
value.set('interactive', 'true')
title = ET.SubElement(value, 'title')
title.text = self.title
add_sub_element(value, 'description', self.description)
add_warning_elements(value, self.warnings)
for selector, option in self.options.items():
# do not confuse Value with big V with value with small v
# value is child element of Value
value_small = ET.SubElement(value, 'value')
# by XCCDF spec, default value is value without selector
if selector != "default":
value_small.set('selector', str(selector))
value_small.text = str(option)
return value
def to_file(self, file_name):
root = self.to_xml_element()
tree = ET.ElementTree(root)
tree.write(file_name)
class Benchmark(object):
"""Represents XCCDF Benchmark
"""
def __init__(self, id_):
self.id_ = id_
self.title = ""
self.status = ""
self.description = ""
self.notice_id = ""
self.notice_description = ""
self.front_matter = ""
self.rear_matter = ""
self.cpes = []
self.version = "0.1"
self.profiles = []
self.values = {}
self.groups = {}
self.rules = {}
self.product_cpe_names = []
# This is required for OCIL clauses
conditional_clause = Value("conditional_clause")
conditional_clause.title = "A conditional clause for check statements."
conditional_clause.description = conditional_clause.title
conditional_clause.type_ = "string"
conditional_clause.options = {"": "This is a placeholder"}
self.add_value(conditional_clause)
@classmethod
def from_yaml(cls, yaml_file, id_, env_yaml=None):
yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
if yaml_contents is None:
return None
benchmark = cls(id_)
benchmark.title = required_key(yaml_contents, "title")
del yaml_contents["title"]
benchmark.status = required_key(yaml_contents, "status")
del yaml_contents["status"]
benchmark.description = required_key(yaml_contents, "description")
del yaml_contents["description"]
notice_contents = required_key(yaml_contents, "notice")
benchmark.notice_id = required_key(notice_contents, "id")
del notice_contents["id"]
benchmark.notice_description = required_key(notice_contents,
"description")
del notice_contents["description"]
if not notice_contents:
del yaml_contents["notice"]
benchmark.front_matter = required_key(yaml_contents,
"front-matter")
del yaml_contents["front-matter"]
benchmark.rear_matter = required_key(yaml_contents,
"rear-matter")
del yaml_contents["rear-matter"]
benchmark.version = str(required_key(yaml_contents, "version"))
del yaml_contents["version"]
if env_yaml:
benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names()
if yaml_contents:
raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
% (yaml_file, yaml_contents))
return benchmark
def add_profiles_from_dir(self, dir_, env_yaml):
for dir_item in sorted(os.listdir(dir_)):
dir_item_path = os.path.join(dir_, dir_item)
if not os.path.isfile(dir_item_path):
continue
_, ext = os.path.splitext(os.path.basename(dir_item_path))
if ext != '.profile':
sys.stderr.write(
"Encountered file '%s' while looking for profiles, "
"extension '%s' is unknown. Skipping..\n"
% (dir_item, ext)
)
continue
try:
new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml)
except DocumentationNotComplete:
continue
except Exception as exc:
msg = ("Error building profile from '{fname}': '{error}'"
.format(fname=dir_item_path, error=str(exc)))
raise RuntimeError(msg)
if new_profile is None:
continue
self.profiles.append(new_profile)
def to_xml_element(self):
root = ET.Element('Benchmark')
root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml')
root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/')
root.set('id', 'product-name')
root.set('xsi:schemaLocation',
'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd')
root.set('style', 'SCAP_1.1')
root.set('resolved', 'false')
root.set('xml:lang', 'en-US')
status = ET.SubElement(root, 'status')
status.set('date', datetime.date.today().strftime("%Y-%m-%d"))
status.text = self.status
add_sub_element(root, "title", self.title)
add_sub_element(root, "description", self.description)
notice = add_sub_element(root, "notice", self.notice_description)
notice.set('id', self.notice_id)
add_sub_element(root, "front-matter", self.front_matter)
add_sub_element(root, "rear-matter", self.rear_matter)
# The Benchmark applicability is determined by the CPEs
# defined in the product.yml
for cpe_name in self.product_cpe_names:
plat = ET.SubElement(root, "platform")
plat.set("idref", cpe_name)
version = ET.SubElement(root, 'version')
version.text = self.version
ET.SubElement(root, "metadata")
for profile in self.profiles:
root.append(profile.to_xml_element())
for value in self.values.values():
root.append(value.to_xml_element())
groups_in_bench = list(self.groups.keys())
priority_order = ["system", "services"]
groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order)
# Make system group the first, followed by services group
for group_id in groups_in_bench:
group = self.groups.get(group_id)
# Products using application benchmark don't have system or services group
if group is not None:
root.append(group.to_xml_element())
for rule in self.rules.values():
root.append(rule.to_xml_element())
return root
def to_file(self, file_name, ):
root = self.to_xml_element()
tree = ET.ElementTree(root)
tree.write(file_name)
def add_value(self, value):
if value is None:
return
self.values[value.id_] = value
# The benchmark is also considered a group, so this function signature needs to match
# Group()'s add_group()
def add_group(self, group, env_yaml=None):
if group is None:
return
self.groups[group.id_] = group
def add_rule(self, rule):
if rule is None:
return
self.rules[rule.id_] = rule
def to_xccdf(self):
"""We can easily extend this script to generate a valid XCCDF instead
of SSG SHORTHAND.
"""
raise NotImplementedError
def __str__(self):
return self.id_
class Group(object):
"""Represents XCCDF Group
"""
ATTRIBUTES_TO_PASS_ON = (
"platforms",
)
def __init__(self, id_):
self.id_ = id_
self.prodtype = "all"
self.title = ""
self.description = ""
self.warnings = []
self.requires = []
self.conflicts = []
self.values = {}
self.groups = {}
self.rules = {}
# self.platforms is used further in the build system
# self.platform is merged into self.platforms
# it is here for backward compatibility
self.platforms = set()
self.cpe_names = set()
self.platform = None
@classmethod
def from_yaml(cls, yaml_file, env_yaml=None):
yaml_contents = open_and_macro_expand(yaml_file, env_yaml)
if yaml_contents is None:
return None
group_id = os.path.basename(os.path.dirname(yaml_file))
group = cls(group_id)
group.prodtype = yaml_contents.pop("prodtype", "all")
group.title = required_key(yaml_contents, "title")
del yaml_contents["title"]
group.description = required_key(yaml_contents, "description")
del yaml_contents["description"]
group.warnings = yaml_contents.pop("warnings", [])
group.conflicts = yaml_contents.pop("conflicts", [])
group.requires = yaml_contents.pop("requires", [])
group.platform = yaml_contents.pop("platform", None)
group.platforms = yaml_contents.pop("platforms", set())
# ensure that content of group.platform is in group.platforms as
# well
if group.platform is not None:
group.platforms.add(group.platform)
if env_yaml:
for platform in group.platforms:
try:
group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform))
except CPEDoesNotExist:
print("Unsupported platform '%s' in group '%s'." % (platform, group.id_))
raise
for warning_list in group.warnings:
if len(warning_list) != 1:
raise ValueError("Only one key/value pair should exist for each dictionary")
if yaml_contents:
raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s"
% (yaml_file, yaml_contents))
group.validate_prodtype(yaml_file)
return group
def validate_prodtype(self, yaml_file):
for ptype in self.prodtype.split(","):
if ptype.strip() != ptype:
msg = (
"Comma-separated '{prodtype}' prodtype "
"in {yaml_file} contains whitespace."
.format(prodtype=self.prodtype, yaml_file=yaml_file))
raise ValueError(msg)
def to_xml_element(self):
group = ET.Element('Group')
group.set('id', self.id_)
if self.prodtype != "all":
group.set("prodtype", self.prodtype)
title = ET.SubElement(group, 'title')
title.text = self.title
add_sub_element(group, 'description', self.description)
add_warning_elements(group, self.warnings)
add_nondata_subelements(group, "requires", "id", self.requires)
add_nondata_subelements(group, "conflicts", "id", self.conflicts)
for cpe_name in self.cpe_names:
platform_el = ET.SubElement(group, "platform")
platform_el.set("idref", cpe_name)
for _value in self.values.values():
group.append(_value.to_xml_element())
# Rules that install or remove packages affect remediation
# of other rules.
# When packages installed/removed rules come first:
# The Rules are ordered in more logical way, and
# remediation order is natural, first the package is installed, then configured.
rules_in_group = list(self.rules.keys())
regex = (r'(package_.*_(installed|removed))|' +
r'(service_.*_(enabled|disabled))|' +
r'install_smartcard_packages$')
priority_order = ["installed", "install_smartcard_packages", "removed",
"enabled", "disabled"]
rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex)
# Add rules in priority order, first all packages installed, then removed,
# followed by services enabled, then disabled
for rule_id in rules_in_group:
group.append(self.rules.get(rule_id).to_xml_element())
# Add the sub groups after any current level group rules.
# As package installed/removed and service enabled/disabled rules are usuallly in
# top level group, this ensures groups that further configure a package or service
# are after rules that install or remove it.
groups_in_group = list(self.groups.keys())
priority_order = [
# Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule.
# Due to conflicts between rules rpm_verify_* rules and any rule that configures
# stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group,
# the rules deviating from the system default should be evaluated later.
# So that in the end the system has contents, permissions and ownership reset, and
# any deviations or stricter settings are applied by the rules in the profile.
"software", "integrity", "integrity-software", "rpm_verification",
# The account group has to precede audit group because
# the rule package_screen_installed is desired to be executed before the rule
# audit_rules_privileged_commands, othervise the rule
# does not catch newly installed screen binary during remediation
# and report fail
"accounts", "auditing",
# The FIPS group should come before Crypto,
# if we want to set a different (stricter) Crypto Policy than FIPS.
"fips", "crypto",
# The firewalld_activation must come before ruleset_modifications, othervise
# remediations for ruleset_modifications won't work
"firewalld_activation", "ruleset_modifications",
# Rules from group disabling_ipv6 must precede rules from configuring_ipv6,
# otherwise the remediation prints error although it is successful
"disabling_ipv6", "configuring_ipv6"
]
groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order)
for group_id in groups_in_group:
_group = self.groups[group_id]
group.append(_group.to_xml_element())
return group
def to_file(self, file_name):
root = self.to_xml_element()
tree = ET.ElementTree(root)
tree.write(file_name)
def add_value(self, value):
if value is None:
return
self.values[value.id_] = value
def add_group(self, group, env_yaml=None):
if group is None:
return
if self.platforms and not group.platforms:
group.platforms = self.platforms
self.groups[group.id_] = group
self._pass_our_properties_on_to(group)