This commit is contained in:
2026-03-16 14:43:38 +03:00
parent 37eff0ed9b
commit 8aeb9a4244
17 changed files with 1451 additions and 28 deletions

518
lab3/confaccess.py Normal file
View File

@@ -0,0 +1,518 @@
#!/usr/bin/env python3
"""Access to confidential data with DAC and MAC."""
import argparse
import getpass
import hashlib
import shutil
import signal
import sys
from datetime import datetime
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from config import (
ACL_FILE,
ACCESS_MODE_FILE,
CONFDATA_DIR,
LOG_DIR,
OBJECT_LABELS_FILE,
PASSWD_FILE,
SUBJECT_LABELS_FILE,
)
HELP_TEXT = """\
Commands:
create <file> create new empty file
read <file> print file contents
append <file> <text> append text line to file
copy <src> <dst> copy file into confdata
remove <file> delete file from confdata
grant <user> <path> <perms> grant access (owner only)
help show this help
exit exit"""
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode("ascii")).hexdigest()
def log_action(login: str, action: str) -> None:
LOG_DIR.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().isoformat(sep=" ", timespec="seconds")
with open(LOG_DIR / "access.log", "a") as f:
f.write(f"{timestamp} [{login}] {action}\n")
def read_users() -> dict[str, dict]:
users: dict[str, dict] = {}
if not PASSWD_FILE.exists():
return users
with open(PASSWD_FILE) as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split(":", 4)
if len(parts) != 5:
continue
users[parts[0]] = {
"password_hash": parts[1],
"id": parts[2],
"permissions": parts[3],
"full_name": parts[4],
}
return users
def read_access_mode() -> str:
if not ACCESS_MODE_FILE.exists():
return "BOTH"
raw = ACCESS_MODE_FILE.read_text().strip().upper()
if raw in ("BOTH", "DAC_ONLY", "MAC_ONLY"):
return raw
return "BOTH"
def normalize_path(path: str) -> str:
"""Normalize path relative to confdata (no ./, no trailing /)."""
p = path.strip().lstrip("./")
parts = [x for x in p.split("/") if x]
return "/".join(parts) if parts else ""
def read_acl() -> dict[str, dict]:
"""Read ACL: path -> {owner, perms: {user: perms_str}}."""
acl: dict[str, dict] = {}
if not ACL_FILE.exists():
return acl
for line in ACL_FILE.read_text().splitlines():
line = line.strip()
if not line:
continue
parts = line.split(":", 2)
if len(parts) < 3:
continue
path, owner, assignments = parts[0], parts[1], parts[2]
path = normalize_path(path)
perms: dict[str, str] = {}
for ass in assignments.split(","):
if ":" in ass:
u, p = ass.split(":", 1)
perms[u.strip()] = p.strip()
acl[path] = {"owner": owner, "perms": perms}
return acl
def write_acl(acl: dict[str, dict]) -> None:
ACL_FILE.parent.mkdir(parents=True, exist_ok=True)
lines = []
for path in sorted(acl.keys()):
entry = acl[path]
owner = entry["owner"]
perms = entry["perms"]
ass = ",".join(f"{u}:{p}" for u, p in sorted(perms.items()))
lines.append(f"{path}:{owner}:{ass}")
ACL_FILE.write_text("\n".join(lines) + "\n")
def read_subject_labels() -> dict[str, int]:
labels: dict[str, int] = {}
if not SUBJECT_LABELS_FILE.exists():
return labels
for line in SUBJECT_LABELS_FILE.read_text().splitlines():
line = line.strip()
if not line:
continue
parts = line.split(":", 1)
if len(parts) == 2 and parts[1] in ("0", "1", "2"):
labels[parts[0]] = int(parts[1])
return labels
def read_object_labels() -> dict[str, int]:
labels: dict[str, int] = {}
if not OBJECT_LABELS_FILE.exists():
return labels
for line in OBJECT_LABELS_FILE.read_text().splitlines():
line = line.strip()
if not line:
continue
parts = line.split(":", 1)
if len(parts) == 2 and parts[1] in ("0", "1", "2"):
labels[parts[0]] = int(parts[1])
return labels
def write_object_labels(labels: dict[str, int]) -> None:
OBJECT_LABELS_FILE.parent.mkdir(parents=True, exist_ok=True)
lines = [f"{path}:{level}" for path, level in sorted(labels.items())]
OBJECT_LABELS_FILE.write_text("\n".join(lines) + "\n")
def dac_allows(login: str, action: str, rel_path: str) -> bool:
"""Check DAC: does login have required permission on object?"""
rel_path = normalize_path(rel_path)
acl = read_acl()
if action in ("create", "create_dst"):
# New object: not in ACL yet, any authenticated user can create
if rel_path not in acl:
return True
entry = acl.get(rel_path)
if not entry:
return False
perms = entry["perms"].get(login, "")
if action == "read":
return "r" in perms
if action in ("write", "append", "create_dst"):
return "w" in perms
if action == "remove":
return "d" in perms
if action == "grant":
return entry["owner"] == login
return False
def dac_owner(rel_path: str) -> str | None:
"""Return owner of path or None."""
rel_path = normalize_path(rel_path)
acl = read_acl()
entry = acl.get(rel_path)
return entry["owner"] if entry else None
def mac_allows(login: str, action: str, rel_path: str, obj_level: int | None = None) -> bool:
"""Check MAC (Bell-LaPadula)."""
labels = read_subject_labels()
subj_level = labels.get(login, 0)
obj_labels = read_object_labels()
norm_path = normalize_path(rel_path)
if obj_level is None:
obj_level = obj_labels.get(norm_path)
if action == "read":
# No read up: subject_level >= object_level
obj_lev = obj_level if obj_level is not None else 0
return subj_level >= obj_lev
if action in ("write", "append", "remove"):
obj_lev = obj_level if obj_level is not None else 0
return subj_level <= obj_lev
if action == "create_dst":
# New object gets subject's label; subject can write to same level
return True
return True
def check_access(login: str, action: str, rel_path: str, mode: str) -> bool:
if mode in ("BOTH", "DAC_ONLY"):
if not dac_allows(login, action, rel_path):
return False
if mode in ("BOTH", "MAC_ONLY"):
if not mac_allows(login, action, rel_path):
return False
return True
def authenticate() -> tuple[str, dict]:
users = read_users()
while True:
try:
login = input("Login: ").strip()
password = getpass.getpass("Password: ")
except (EOFError, KeyboardInterrupt):
print("\nBye.")
sys.exit(0)
user = users.get(login)
if user and user["password_hash"] == hash_password(password):
return login, user
log_action(login if login else "-", "LOGIN_FAILED")
print("Invalid credentials. Try again.")
def confdata_path(arg: str) -> Path:
p = Path(arg)
if not p.is_absolute():
p = CONFDATA_DIR / p
return p.resolve()
def rel_path_from_confdata(path: Path) -> str:
try:
rel = path.relative_to(CONFDATA_DIR.resolve())
return "/".join(rel.parts)
except ValueError:
return ""
def is_in_confdata(path: Path) -> bool:
try:
path.relative_to(CONFDATA_DIR.resolve())
return True
except ValueError:
return False
def cmd_read(args: list[str], login: str, mode: str) -> None:
if len(args) != 1:
print("Usage: read <file>")
return
path = confdata_path(args[0])
if not is_in_confdata(path):
print("Access denied: file must be inside confdata")
return
if not path.exists():
print(f"File not found: {path.name}")
return
rel = rel_path_from_confdata(path)
if not check_access(login, "read", rel, mode):
print("Permission denied")
return
print(path.read_text(), end="")
log_action(login, f"READ {path}")
def cmd_create(args: list[str], login: str, mode: str) -> None:
if len(args) != 1:
print("Usage: create <file>")
return
path = confdata_path(args[0])
if not is_in_confdata(path):
print("Access denied: file must be inside confdata")
return
if path.exists():
print(f"File already exists: {path.name}")
return
rel = rel_path_from_confdata(path)
# DAC: new object, add to ACL with owner=login
# MAC: new object gets subject's label
if mode in ("BOTH", "DAC_ONLY"):
acl = read_acl()
acl[rel] = {"owner": login, "perms": {login: "rwd"}}
write_acl(acl)
if mode in ("BOTH", "MAC_ONLY"):
labels = read_subject_labels()
subj_level = labels.get(login, 0)
obj_labels = read_object_labels()
obj_labels[rel] = subj_level
write_object_labels(obj_labels)
path.parent.mkdir(parents=True, exist_ok=True)
path.touch()
log_action(login, f"CREATE {path}")
print("Done.")
def cmd_append(args: list[str], login: str, mode: str) -> None:
if len(args) < 2:
print("Usage: append <file> <text>")
return
path = confdata_path(args[0])
if not is_in_confdata(path):
print("Access denied: file must be inside confdata")
return
if not path.exists():
print(f"File not found: {path.name}. Use 'create' first.")
return
rel = rel_path_from_confdata(path)
if not check_access(login, "append", rel, mode):
print("Permission denied")
return
text = " ".join(args[1:])
with open(path, "a") as f:
f.write(text + "\n")
log_action(login, f"APPEND {path}")
print("Done.")
def cmd_copy(args: list[str], login: str, mode: str) -> None:
if len(args) != 2:
print("Usage: copy <src> <dst>")
return
src = Path(args[0]).resolve()
dst = confdata_path(args[1])
if not is_in_confdata(dst):
print("Access denied: destination must be inside confdata")
return
if dst.is_dir():
dst = dst / src.name
if dst.exists():
print(f"Destination already exists: {dst.name}")
return
if not src.exists():
print(f"Source not found: {args[0]}")
return
if src.is_dir():
print("Copying directories is not supported")
return
dst_rel = rel_path_from_confdata(dst)
src_rel = rel_path_from_confdata(src) if is_in_confdata(src) else None
if src_rel is not None:
if not check_access(login, "read", src_rel, mode):
print("Permission denied (read source)")
return
if not check_access(login, "create_dst", dst_rel, mode):
print("Permission denied (create destination)")
return
shutil.copy2(src, dst)
if mode in ("BOTH", "DAC_ONLY"):
acl = read_acl()
acl[dst_rel] = {"owner": login, "perms": {login: "rwd"}}
write_acl(acl)
if mode in ("BOTH", "MAC_ONLY"):
labels = read_subject_labels()
subj_level = labels.get(login, 0)
obj_labels = read_object_labels()
obj_labels[dst_rel] = subj_level
write_object_labels(obj_labels)
log_action(login, f"COPY {src} -> {dst}")
print("Done.")
def cmd_remove(args: list[str], login: str, mode: str) -> None:
if len(args) != 1:
print("Usage: remove <file>")
return
path = confdata_path(args[0])
if not is_in_confdata(path):
print("Access denied: file must be inside confdata")
return
if not path.exists():
print(f"File not found: {path.name}")
return
rel = rel_path_from_confdata(path)
if not check_access(login, "remove", rel, mode):
print("Permission denied")
return
path.unlink()
if mode in ("BOTH", "DAC_ONLY"):
acl = read_acl()
acl.pop(rel, None)
write_acl(acl)
if mode in ("BOTH", "MAC_ONLY"):
obj_labels = read_object_labels()
obj_labels.pop(rel, None)
write_object_labels(obj_labels)
log_action(login, f"REMOVE {path}")
print("Done.")
def cmd_grant(args: list[str], login: str, mode: str) -> None:
if mode not in ("BOTH", "DAC_ONLY"):
print("grant is only available in DAC or BOTH mode")
return
if len(args) != 3:
print("Usage: grant <user> <path> <perms>")
return
target_user, path_arg, perms = args[0], args[1], args[2]
for ch in perms:
if ch not in "rwd":
print(f"Invalid permission {ch!r}; allowed: r, w, d")
return
path = confdata_path(path_arg)
if not is_in_confdata(path):
print("Access denied: path must be inside confdata")
return
if not path.exists():
print(f"File not found: {path_arg}")
return
rel = rel_path_from_confdata(path)
if not dac_allows(login, "grant", rel):
print("Permission denied (only owner can grant)")
return
acl = read_acl()
entry = acl.get(rel)
if not entry:
print("ACL entry not found")
return
entry["perms"][target_user] = perms
write_acl(acl)
log_action(login, f"GRANT {target_user} {rel} {perms}")
print("Done.")
def check_credentials(login: str, password: str) -> bool:
"""Check login+password against passwd. Used by --check mode."""
users = read_users()
user = users.get(login)
if user and user["password_hash"] == hash_password(password):
return True
return False
def main() -> None:
parser = argparse.ArgumentParser(description="Access confidential data")
parser.add_argument(
"--check",
metavar="LOGIN",
help="Batch mode: read passwords line-by-line from stdin, output 0 or 1 per line; exit 0 on first match",
)
args, _ = parser.parse_known_args()
if args.check is not None:
users = read_users()
user = users.get(args.check)
target_hash = user["password_hash"] if user else None
for line in sys.stdin:
password = line.rstrip("\n")
if target_hash and hash_password(password) == target_hash:
sys.stdout.write("1\n")
sys.stdout.flush()
sys.exit(0)
sys.stdout.write("0\n")
sys.stdout.flush()
sys.exit(1)
signal.signal(signal.SIGINT, lambda _s, _f: (print("\nBye."), sys.exit(0)))
login, user = authenticate()
full_name = user["full_name"]
mode = read_access_mode()
log_action(login, "LOGIN")
print(f"\nПривет, {full_name}")
print(HELP_TEXT)
while True:
try:
line = input(f"\n{login}> ").strip()
except (EOFError, KeyboardInterrupt):
log_action(login, "EXIT")
print("\nBye.")
break
if not line:
continue
parts = line.split()
command, cmd_args = parts[0], parts[1:]
if command == "exit":
log_action(login, "EXIT")
print("Bye.")
break
elif command == "help":
print(HELP_TEXT)
elif command == "create":
cmd_create(cmd_args, login, mode)
elif command == "read":
cmd_read(cmd_args, login, mode)
elif command == "append":
cmd_append(cmd_args, login, mode)
elif command == "copy":
cmd_copy(cmd_args, login, mode)
elif command == "remove":
cmd_remove(cmd_args, login, mode)
elif command == "grant":
cmd_grant(cmd_args, login, mode)
else:
print(f"Unknown command: {command!r}. Type 'help' for available commands.")
if __name__ == "__main__":
main()