519 lines
16 KiB
Python
519 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""Access to confidential data with DAC and MAC."""
|
|
|
|
import argparse
|
|
import getpass
|
|
import hashlib
|
|
import shutil
|
|
import signal
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
from config import (
|
|
ACL_FILE,
|
|
ACCESS_MODE_FILE,
|
|
CONFDATA_DIR,
|
|
LOG_DIR,
|
|
OBJECT_LABELS_FILE,
|
|
PASSWD_FILE,
|
|
SUBJECT_LABELS_FILE,
|
|
)
|
|
|
|
HELP_TEXT = """\
|
|
Commands:
|
|
create <file> create new empty file
|
|
read <file> print file contents
|
|
append <file> <text> append text line to file
|
|
copy <src> <dst> copy file into confdata
|
|
remove <file> delete file from confdata
|
|
grant <user> <path> <perms> grant access (owner only)
|
|
help show this help
|
|
exit exit"""
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
return hashlib.sha256(password.encode("ascii")).hexdigest()
|
|
|
|
|
|
def log_action(login: str, action: str) -> None:
|
|
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
|
timestamp = datetime.now().isoformat(sep=" ", timespec="seconds")
|
|
with open(LOG_DIR / "access.log", "a") as f:
|
|
f.write(f"{timestamp} [{login}] {action}\n")
|
|
|
|
|
|
def read_users() -> dict[str, dict]:
|
|
users: dict[str, dict] = {}
|
|
if not PASSWD_FILE.exists():
|
|
return users
|
|
with open(PASSWD_FILE) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
parts = line.split(":", 4)
|
|
if len(parts) != 5:
|
|
continue
|
|
users[parts[0]] = {
|
|
"password_hash": parts[1],
|
|
"id": parts[2],
|
|
"permissions": parts[3],
|
|
"full_name": parts[4],
|
|
}
|
|
return users
|
|
|
|
|
|
def read_access_mode() -> str:
|
|
if not ACCESS_MODE_FILE.exists():
|
|
return "BOTH"
|
|
raw = ACCESS_MODE_FILE.read_text().strip().upper()
|
|
if raw in ("BOTH", "DAC_ONLY", "MAC_ONLY"):
|
|
return raw
|
|
return "BOTH"
|
|
|
|
|
|
def normalize_path(path: str) -> str:
|
|
"""Normalize path relative to confdata (no ./, no trailing /)."""
|
|
p = path.strip().lstrip("./")
|
|
parts = [x for x in p.split("/") if x]
|
|
return "/".join(parts) if parts else ""
|
|
|
|
|
|
def read_acl() -> dict[str, dict]:
|
|
"""Read ACL: path -> {owner, perms: {user: perms_str}}."""
|
|
acl: dict[str, dict] = {}
|
|
if not ACL_FILE.exists():
|
|
return acl
|
|
for line in ACL_FILE.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
parts = line.split(":", 2)
|
|
if len(parts) < 3:
|
|
continue
|
|
path, owner, assignments = parts[0], parts[1], parts[2]
|
|
path = normalize_path(path)
|
|
perms: dict[str, str] = {}
|
|
for ass in assignments.split(","):
|
|
if ":" in ass:
|
|
u, p = ass.split(":", 1)
|
|
perms[u.strip()] = p.strip()
|
|
acl[path] = {"owner": owner, "perms": perms}
|
|
return acl
|
|
|
|
|
|
def write_acl(acl: dict[str, dict]) -> None:
|
|
ACL_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
lines = []
|
|
for path in sorted(acl.keys()):
|
|
entry = acl[path]
|
|
owner = entry["owner"]
|
|
perms = entry["perms"]
|
|
ass = ",".join(f"{u}:{p}" for u, p in sorted(perms.items()))
|
|
lines.append(f"{path}:{owner}:{ass}")
|
|
ACL_FILE.write_text("\n".join(lines) + "\n")
|
|
|
|
|
|
def read_subject_labels() -> dict[str, int]:
|
|
labels: dict[str, int] = {}
|
|
if not SUBJECT_LABELS_FILE.exists():
|
|
return labels
|
|
for line in SUBJECT_LABELS_FILE.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
parts = line.split(":", 1)
|
|
if len(parts) == 2 and parts[1] in ("0", "1", "2"):
|
|
labels[parts[0]] = int(parts[1])
|
|
return labels
|
|
|
|
|
|
def read_object_labels() -> dict[str, int]:
|
|
labels: dict[str, int] = {}
|
|
if not OBJECT_LABELS_FILE.exists():
|
|
return labels
|
|
for line in OBJECT_LABELS_FILE.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
parts = line.split(":", 1)
|
|
if len(parts) == 2 and parts[1] in ("0", "1", "2"):
|
|
labels[parts[0]] = int(parts[1])
|
|
return labels
|
|
|
|
|
|
def write_object_labels(labels: dict[str, int]) -> None:
|
|
OBJECT_LABELS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
lines = [f"{path}:{level}" for path, level in sorted(labels.items())]
|
|
OBJECT_LABELS_FILE.write_text("\n".join(lines) + "\n")
|
|
|
|
|
|
def dac_allows(login: str, action: str, rel_path: str) -> bool:
|
|
"""Check DAC: does login have required permission on object?"""
|
|
rel_path = normalize_path(rel_path)
|
|
acl = read_acl()
|
|
if action in ("create", "create_dst"):
|
|
# New object: not in ACL yet, any authenticated user can create
|
|
if rel_path not in acl:
|
|
return True
|
|
entry = acl.get(rel_path)
|
|
if not entry:
|
|
return False
|
|
perms = entry["perms"].get(login, "")
|
|
if action == "read":
|
|
return "r" in perms
|
|
if action in ("write", "append", "create_dst"):
|
|
return "w" in perms
|
|
if action == "remove":
|
|
return "d" in perms
|
|
if action == "grant":
|
|
return entry["owner"] == login
|
|
return False
|
|
|
|
|
|
def dac_owner(rel_path: str) -> str | None:
|
|
"""Return owner of path or None."""
|
|
rel_path = normalize_path(rel_path)
|
|
acl = read_acl()
|
|
entry = acl.get(rel_path)
|
|
return entry["owner"] if entry else None
|
|
|
|
|
|
def mac_allows(login: str, action: str, rel_path: str, obj_level: int | None = None) -> bool:
|
|
"""Check MAC (Bell-LaPadula)."""
|
|
labels = read_subject_labels()
|
|
subj_level = labels.get(login, 0)
|
|
obj_labels = read_object_labels()
|
|
norm_path = normalize_path(rel_path)
|
|
if obj_level is None:
|
|
obj_level = obj_labels.get(norm_path)
|
|
if action == "read":
|
|
# No read up: subject_level >= object_level
|
|
obj_lev = obj_level if obj_level is not None else 0
|
|
return subj_level >= obj_lev
|
|
if action in ("write", "append", "remove"):
|
|
obj_lev = obj_level if obj_level is not None else 0
|
|
return subj_level <= obj_lev
|
|
if action == "create_dst":
|
|
# New object gets subject's label; subject can write to same level
|
|
return True
|
|
return True
|
|
|
|
|
|
def check_access(login: str, action: str, rel_path: str, mode: str) -> bool:
|
|
if mode in ("BOTH", "DAC_ONLY"):
|
|
if not dac_allows(login, action, rel_path):
|
|
return False
|
|
if mode in ("BOTH", "MAC_ONLY"):
|
|
if not mac_allows(login, action, rel_path):
|
|
return False
|
|
return True
|
|
|
|
|
|
def authenticate() -> tuple[str, dict]:
|
|
users = read_users()
|
|
while True:
|
|
try:
|
|
login = input("Login: ").strip()
|
|
password = getpass.getpass("Password: ")
|
|
except (EOFError, KeyboardInterrupt):
|
|
print("\nBye.")
|
|
sys.exit(0)
|
|
|
|
user = users.get(login)
|
|
if user and user["password_hash"] == hash_password(password):
|
|
return login, user
|
|
log_action(login if login else "-", "LOGIN_FAILED")
|
|
print("Invalid credentials. Try again.")
|
|
|
|
|
|
def confdata_path(arg: str) -> Path:
|
|
p = Path(arg)
|
|
if not p.is_absolute():
|
|
p = CONFDATA_DIR / p
|
|
return p.resolve()
|
|
|
|
|
|
def rel_path_from_confdata(path: Path) -> str:
|
|
try:
|
|
rel = path.relative_to(CONFDATA_DIR.resolve())
|
|
return "/".join(rel.parts)
|
|
except ValueError:
|
|
return ""
|
|
|
|
|
|
def is_in_confdata(path: Path) -> bool:
|
|
try:
|
|
path.relative_to(CONFDATA_DIR.resolve())
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def cmd_read(args: list[str], login: str, mode: str) -> None:
|
|
if len(args) != 1:
|
|
print("Usage: read <file>")
|
|
return
|
|
path = confdata_path(args[0])
|
|
if not is_in_confdata(path):
|
|
print("Access denied: file must be inside confdata")
|
|
return
|
|
if not path.exists():
|
|
print(f"File not found: {path.name}")
|
|
return
|
|
rel = rel_path_from_confdata(path)
|
|
if not check_access(login, "read", rel, mode):
|
|
print("Permission denied")
|
|
return
|
|
print(path.read_text(), end="")
|
|
log_action(login, f"READ {path}")
|
|
|
|
|
|
def cmd_create(args: list[str], login: str, mode: str) -> None:
|
|
if len(args) != 1:
|
|
print("Usage: create <file>")
|
|
return
|
|
path = confdata_path(args[0])
|
|
if not is_in_confdata(path):
|
|
print("Access denied: file must be inside confdata")
|
|
return
|
|
if path.exists():
|
|
print(f"File already exists: {path.name}")
|
|
return
|
|
rel = rel_path_from_confdata(path)
|
|
# DAC: new object, add to ACL with owner=login
|
|
# MAC: new object gets subject's label
|
|
if mode in ("BOTH", "DAC_ONLY"):
|
|
acl = read_acl()
|
|
acl[rel] = {"owner": login, "perms": {login: "rwd"}}
|
|
write_acl(acl)
|
|
if mode in ("BOTH", "MAC_ONLY"):
|
|
labels = read_subject_labels()
|
|
subj_level = labels.get(login, 0)
|
|
obj_labels = read_object_labels()
|
|
obj_labels[rel] = subj_level
|
|
write_object_labels(obj_labels)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.touch()
|
|
log_action(login, f"CREATE {path}")
|
|
print("Done.")
|
|
|
|
|
|
def cmd_append(args: list[str], login: str, mode: str) -> None:
|
|
if len(args) < 2:
|
|
print("Usage: append <file> <text>")
|
|
return
|
|
path = confdata_path(args[0])
|
|
if not is_in_confdata(path):
|
|
print("Access denied: file must be inside confdata")
|
|
return
|
|
if not path.exists():
|
|
print(f"File not found: {path.name}. Use 'create' first.")
|
|
return
|
|
rel = rel_path_from_confdata(path)
|
|
if not check_access(login, "append", rel, mode):
|
|
print("Permission denied")
|
|
return
|
|
text = " ".join(args[1:])
|
|
with open(path, "a") as f:
|
|
f.write(text + "\n")
|
|
log_action(login, f"APPEND {path}")
|
|
print("Done.")
|
|
|
|
|
|
def cmd_copy(args: list[str], login: str, mode: str) -> None:
|
|
if len(args) != 2:
|
|
print("Usage: copy <src> <dst>")
|
|
return
|
|
|
|
src = Path(args[0]).resolve()
|
|
dst = confdata_path(args[1])
|
|
|
|
if not is_in_confdata(dst):
|
|
print("Access denied: destination must be inside confdata")
|
|
return
|
|
if dst.is_dir():
|
|
dst = dst / src.name
|
|
if dst.exists():
|
|
print(f"Destination already exists: {dst.name}")
|
|
return
|
|
if not src.exists():
|
|
print(f"Source not found: {args[0]}")
|
|
return
|
|
if src.is_dir():
|
|
print("Copying directories is not supported")
|
|
return
|
|
|
|
dst_rel = rel_path_from_confdata(dst)
|
|
src_rel = rel_path_from_confdata(src) if is_in_confdata(src) else None
|
|
|
|
if src_rel is not None:
|
|
if not check_access(login, "read", src_rel, mode):
|
|
print("Permission denied (read source)")
|
|
return
|
|
if not check_access(login, "create_dst", dst_rel, mode):
|
|
print("Permission denied (create destination)")
|
|
return
|
|
|
|
shutil.copy2(src, dst)
|
|
if mode in ("BOTH", "DAC_ONLY"):
|
|
acl = read_acl()
|
|
acl[dst_rel] = {"owner": login, "perms": {login: "rwd"}}
|
|
write_acl(acl)
|
|
if mode in ("BOTH", "MAC_ONLY"):
|
|
labels = read_subject_labels()
|
|
subj_level = labels.get(login, 0)
|
|
obj_labels = read_object_labels()
|
|
obj_labels[dst_rel] = subj_level
|
|
write_object_labels(obj_labels)
|
|
log_action(login, f"COPY {src} -> {dst}")
|
|
print("Done.")
|
|
|
|
|
|
def cmd_remove(args: list[str], login: str, mode: str) -> None:
|
|
if len(args) != 1:
|
|
print("Usage: remove <file>")
|
|
return
|
|
path = confdata_path(args[0])
|
|
if not is_in_confdata(path):
|
|
print("Access denied: file must be inside confdata")
|
|
return
|
|
if not path.exists():
|
|
print(f"File not found: {path.name}")
|
|
return
|
|
rel = rel_path_from_confdata(path)
|
|
if not check_access(login, "remove", rel, mode):
|
|
print("Permission denied")
|
|
return
|
|
path.unlink()
|
|
if mode in ("BOTH", "DAC_ONLY"):
|
|
acl = read_acl()
|
|
acl.pop(rel, None)
|
|
write_acl(acl)
|
|
if mode in ("BOTH", "MAC_ONLY"):
|
|
obj_labels = read_object_labels()
|
|
obj_labels.pop(rel, None)
|
|
write_object_labels(obj_labels)
|
|
log_action(login, f"REMOVE {path}")
|
|
print("Done.")
|
|
|
|
|
|
def cmd_grant(args: list[str], login: str, mode: str) -> None:
|
|
if mode not in ("BOTH", "DAC_ONLY"):
|
|
print("grant is only available in DAC or BOTH mode")
|
|
return
|
|
if len(args) != 3:
|
|
print("Usage: grant <user> <path> <perms>")
|
|
return
|
|
target_user, path_arg, perms = args[0], args[1], args[2]
|
|
for ch in perms:
|
|
if ch not in "rwd":
|
|
print(f"Invalid permission {ch!r}; allowed: r, w, d")
|
|
return
|
|
path = confdata_path(path_arg)
|
|
if not is_in_confdata(path):
|
|
print("Access denied: path must be inside confdata")
|
|
return
|
|
if not path.exists():
|
|
print(f"File not found: {path_arg}")
|
|
return
|
|
rel = rel_path_from_confdata(path)
|
|
if not dac_allows(login, "grant", rel):
|
|
print("Permission denied (only owner can grant)")
|
|
return
|
|
acl = read_acl()
|
|
entry = acl.get(rel)
|
|
if not entry:
|
|
print("ACL entry not found")
|
|
return
|
|
entry["perms"][target_user] = perms
|
|
write_acl(acl)
|
|
log_action(login, f"GRANT {target_user} {rel} {perms}")
|
|
print("Done.")
|
|
|
|
|
|
def check_credentials(login: str, password: str) -> bool:
|
|
"""Check login+password against passwd. Used by --check mode."""
|
|
users = read_users()
|
|
user = users.get(login)
|
|
if user and user["password_hash"] == hash_password(password):
|
|
return True
|
|
return False
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Access confidential data")
|
|
parser.add_argument(
|
|
"--check",
|
|
metavar="LOGIN",
|
|
help="Batch mode: read passwords line-by-line from stdin, output 0 or 1 per line; exit 0 on first match",
|
|
)
|
|
args, _ = parser.parse_known_args()
|
|
|
|
if args.check is not None:
|
|
users = read_users()
|
|
user = users.get(args.check)
|
|
target_hash = user["password_hash"] if user else None
|
|
|
|
for line in sys.stdin:
|
|
password = line.rstrip("\n")
|
|
if target_hash and hash_password(password) == target_hash:
|
|
sys.stdout.write("1\n")
|
|
sys.stdout.flush()
|
|
sys.exit(0)
|
|
sys.stdout.write("0\n")
|
|
sys.stdout.flush()
|
|
sys.exit(1)
|
|
|
|
signal.signal(signal.SIGINT, lambda _s, _f: (print("\nBye."), sys.exit(0)))
|
|
|
|
login, user = authenticate()
|
|
full_name = user["full_name"]
|
|
mode = read_access_mode()
|
|
|
|
log_action(login, "LOGIN")
|
|
print(f"\nПривет, {full_name}")
|
|
print(HELP_TEXT)
|
|
|
|
while True:
|
|
try:
|
|
line = input(f"\n{login}> ").strip()
|
|
except (EOFError, KeyboardInterrupt):
|
|
log_action(login, "EXIT")
|
|
print("\nBye.")
|
|
break
|
|
|
|
if not line:
|
|
continue
|
|
|
|
parts = line.split()
|
|
command, cmd_args = parts[0], parts[1:]
|
|
|
|
if command == "exit":
|
|
log_action(login, "EXIT")
|
|
print("Bye.")
|
|
break
|
|
elif command == "help":
|
|
print(HELP_TEXT)
|
|
elif command == "create":
|
|
cmd_create(cmd_args, login, mode)
|
|
elif command == "read":
|
|
cmd_read(cmd_args, login, mode)
|
|
elif command == "append":
|
|
cmd_append(cmd_args, login, mode)
|
|
elif command == "copy":
|
|
cmd_copy(cmd_args, login, mode)
|
|
elif command == "remove":
|
|
cmd_remove(cmd_args, login, mode)
|
|
elif command == "grant":
|
|
cmd_grant(cmd_args, login, mode)
|
|
else:
|
|
print(f"Unknown command: {command!r}. Type 'help' for available commands.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|