Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions jsonschema/_evaluated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
"""
Evaluated item and property index tracking for unevaluated* keywords.

These functions determine which array items and object properties have been
evaluated by a schema, which is needed to implement the unevaluatedItems
and unevaluatedProperties keywords from JSON Schema 2020-12.

Previously lived in _utils.py — extracted here for single responsibility:
one module = one concern (evaluated item/property tracking).
"""
import re


def find_evaluated_item_indexes_by_schema(validator, instance, schema):
"""
Get all indexes of items that get evaluated under the current schema.

Covers all keywords related to unevaluatedItems: items, prefixItems, if,
then, else, contains, unevaluatedItems, allOf, oneOf, anyOf
"""
if validator.is_type(schema, "boolean"):
return []
evaluated_indexes = []

if "items" in schema:
return list(range(len(instance)))

ref = schema.get("$ref")
if ref is not None:
resolved = validator._resolver.lookup(ref)
evaluated_indexes.extend(
find_evaluated_item_indexes_by_schema(
validator.evolve(
schema=resolved.contents,
_resolver=resolved.resolver,
),
instance,
resolved.contents,
),
)

dynamicRef = schema.get("$dynamicRef")
if dynamicRef is not None:
resolved = validator._resolver.lookup(dynamicRef)
evaluated_indexes.extend(
find_evaluated_item_indexes_by_schema(
validator.evolve(
schema=resolved.contents,
_resolver=resolved.resolver,
),
instance,
resolved.contents,
),
)

if "prefixItems" in schema:
evaluated_indexes += list(range(len(schema["prefixItems"])))

if "if" in schema:
if validator.evolve(schema=schema["if"]).is_valid(instance):
evaluated_indexes += find_evaluated_item_indexes_by_schema(
validator, instance, schema["if"],
)
if "then" in schema:
evaluated_indexes += find_evaluated_item_indexes_by_schema(
validator, instance, schema["then"],
)
elif "else" in schema:
evaluated_indexes += find_evaluated_item_indexes_by_schema(
validator, instance, schema["else"],
)

for keyword in ["contains", "unevaluatedItems"]:
if keyword in schema:
for k, v in enumerate(instance):
if validator.evolve(schema=schema[keyword]).is_valid(v):
evaluated_indexes.append(k)

for keyword in ["allOf", "oneOf", "anyOf"]:
if keyword in schema:
for subschema in schema[keyword]:
errs = next(validator.descend(instance, subschema), None)
if errs is None:
evaluated_indexes += find_evaluated_item_indexes_by_schema(
validator, instance, subschema,
)

return evaluated_indexes


def find_evaluated_property_keys_by_schema(validator, instance, schema):
"""
Get all keys of items that get evaluated under the current schema.

Covers all keywords related to unevaluatedProperties: properties,
additionalProperties, unevaluatedProperties, patternProperties,
dependentSchemas, allOf, oneOf, anyOf, if, then, else
"""
if validator.is_type(schema, "boolean"):
return []
evaluated_keys = []

ref = schema.get("$ref")
if ref is not None:
resolved = validator._resolver.lookup(ref)
evaluated_keys.extend(
find_evaluated_property_keys_by_schema(
validator.evolve(
schema=resolved.contents,
_resolver=resolved.resolver,
),
instance,
resolved.contents,
),
)

dynamicRef = schema.get("$dynamicRef")
if dynamicRef is not None:
resolved = validator._resolver.lookup(dynamicRef)
evaluated_keys.extend(
find_evaluated_property_keys_by_schema(
validator.evolve(
schema=resolved.contents,
_resolver=resolved.resolver,
),
instance,
resolved.contents,
),
)

properties = schema.get("properties")
if validator.is_type(properties, "object"):
evaluated_keys += properties.keys() & instance.keys()

for keyword in ["additionalProperties", "unevaluatedProperties"]:
if (subschema := schema.get(keyword)) is None:
continue
evaluated_keys += (
key
for key, value in instance.items()
if is_valid(validator.descend(value, subschema))
)

if "patternProperties" in schema:
for property in instance:
for pattern in schema["patternProperties"]:
if re.search(pattern, property):
evaluated_keys.append(property)

if "dependentSchemas" in schema:
for property, subschema in schema["dependentSchemas"].items():
if property not in instance:
continue
evaluated_keys += find_evaluated_property_keys_by_schema(
validator, instance, subschema,
)

for keyword in ["allOf", "oneOf", "anyOf"]:
for subschema in schema.get(keyword, []):
if not is_valid(validator.descend(instance, subschema)):
continue
evaluated_keys += find_evaluated_property_keys_by_schema(
validator, instance, subschema,
)

if "if" in schema:
if validator.evolve(schema=schema["if"]).is_valid(instance):
evaluated_keys += find_evaluated_property_keys_by_schema(
validator, instance, schema["if"],
)
if "then" in schema:
evaluated_keys += find_evaluated_property_keys_by_schema(
validator, instance, schema["then"],
)
elif "else" in schema:
evaluated_keys += find_evaluated_property_keys_by_schema(
validator, instance, schema["else"],
)

return evaluated_keys


def is_valid(errs_it):
"""Whether there are no errors in the given iterator."""
return next(errs_it, None) is None
Loading
Loading