{ "cells": [ { "cell_type": "code", "execution_count": 35, "id": "6d803f08-aa4b-40b1-8dc5-8dac097ffd17", "metadata": {}, "outputs": [], "source": [ "import os\n", "import zipfile\n", "import io\n", "from pathlib import Path\n", "\n", "import numpy as np\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "\n", "import tensorflow as tf\n", "from tensorflow import keras\n", "from tensorflow.keras import layers, regularizers" ] }, { "cell_type": "code", "execution_count": 36, "id": "68a45206-e160-4bd6-ab84-f3a60ef04fb9", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Input: 26 \tOutput:13\n", "Joints: 13\n" ] } ], "source": [ "# Load Kinect movement data\n", "\n", "JOINTS = [\"head\", \"left_shoulder\", \"left_elbow\", \"right_shoulder\", \"right_elbow\",\n", " \"left_hand\", \"right_hand\", \"left_hip\", \"right_hip\", \"left_knee\", \"right_knee\",\n", " \"left_foot\", \"right_foot\",\n", "]\n", "N_JOINTS = len(JOINTS) # total number of body joints\n", "N_INPUT = N_JOINTS * 2 # input for each joint(x and y)\n", "N_OUTPUT = N_JOINTS * 1 # output/target z coordiante\n", "print(f'Input: {N_INPUT} \\tOutput:{N_OUTPUT}')\n", "print(f'Joints: {N_JOINTS}')" ] }, { "cell_type": "code", "execution_count": 43, "id": "7e950179-3350-4cd5-a9e5-98679259e049", "metadata": {}, "outputs": [], "source": [ "# Loads single csv file and splits into input and target array\n", "def load_single_csv(filepath_or_bytes):\n", " if isinstance(filepath_or_bytes, (str, os.PathLike)):\n", " df = pd.read_csv(filepath_or_bytes)\n", " else:\n", " df = pd.read_csv(io.BytesIO(filepath_or_bytes))\n", " df.columns = df.columns.str.strip() \n", "\n", " x_cols = [f\"{j}_x\" for j in JOINTS]\n", " y_cols = [f\"{j}_y\" for j in JOINTS]\n", " z_cols = [f\"{j}_z\" for j in JOINTS]\n", "\n", " xy_cols = []\n", " for j in JOINTS:\n", " xy_cols += [f\"{j}_x\", f\"{j}_y\"]\n", "\n", " X = df[xy_cols].values.astype(np.float32) # input\n", " y = df[z_cols].values.astype(np.float32) # Target\n", "\n", " return X, y" ] }, { "cell_type": "code", "execution_count": 44, "id": "a048829e-8e40-40ef-9cf1-5b18b2c804cd", "metadata": {}, "outputs": [], "source": [ "# load all csv file from the folder\n", "def load_all_sequences(folder_path):\n", " sequences, file_names = [], []\n", "\n", " # Get all CSV files in the folder\n", " csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]\n", " csv_files.sort()\n", " \n", " print(f\"Found {len(csv_files)} CSV files in folder.\")\n", "\n", " for name in csv_files:\n", " file_path = os.path.join(folder_path, name)\n", " \n", " with open(file_path, 'rb') as f:\n", " raw = f.read()\n", " \n", " X, y = load_single_csv(raw)\n", " \n", " sequences.append((X, y)) # stores as (input,target) \n", " file_names.append(name)\n", "\n", " return sequences, file_names" ] }, { "cell_type": "code", "execution_count": 45, "id": "adfcb71f-e762-4440-b0d4-85669201957a", "metadata": {}, "outputs": [], "source": [ "# For Dense MLP model, which treats each frame independently\n", "def flatten_sequences(sequences):\n", " X_flat = np.concatenate([s[0] for s in sequences], axis=0)\n", " y_flat = np.concatenate([s[1] for s in sequences], axis=0)\n", " return X_flat, y_flat\n" ] }, { "cell_type": "code", "execution_count": 46, "id": "686a1d16-09bd-44c5-8c92-91f1dd89b4a5", "metadata": {}, "outputs": [], "source": [ "# Create fixed-length windows of consecutive frames from each session for conv1d, lstm and gru\n", "def make_windowed_sequences(sequences, window_size=30, stride=1):\n", " X_list, y_list = [], []\n", " for X, y in sequences:\n", " n = len(X)\n", " for start in range(0, n - window_size + 1, stride):\n", " X_list.append(X[start : start + window_size])\n", " y_list.append(y[start : start + window_size])\n", "\n", " X_seq = np.array(X_list, dtype=np.float32) # (N, window, 26)\n", " y_seq = np.array(y_list, dtype=np.float32) # (N, window, 13)\n", " return X_seq, y_seq\n" ] }, { "cell_type": "code", "execution_count": 47, "id": "8305831b-5824-436e-bf22-e78c775963eb", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Found 179 CSV files in folder.\n", "\n", "Flat dataset: X=(24005, 26) y=(24005, 13)\n", "Windowed dataset: X=(3831, 30, 26) y_last=(3831, 13)\n" ] } ], "source": [ "REPO_ROOT = os.path.abspath(os.path.join(os.getcwd(), '..'))\n", "DATA_DIR = os.path.join(REPO_ROOT, 'Datasets_all') \n", "KINECT_DATA_PATH = os.path.join(DATA_DIR, 'kinect_good_preprocessed')\n", "\n", "# sequences contain list of tuples (X,y)\n", "sequences, file_names = load_all_sequences(KINECT_DATA_PATH)\n", "\n", "# Frame-level flat data (for Dense models)\n", "X_flat, y_flat = flatten_sequences(sequences)\n", "print(f\"\\nFlat dataset: X={X_flat.shape} y={y_flat.shape}\")\n", "\n", "# Windowed sequences (for Conv1D / LSTM / GRU models)\n", "WINDOW_SIZE = 30\n", "X_seq, y_seq = make_windowed_sequences(sequences, window_size=WINDOW_SIZE, stride=5)\n", "y_seq_last = y_seq[:, -1, :] # (N, 13)\n", "print(f\"Windowed dataset: X={X_seq.shape} y_last={y_seq_last.shape}\")" ] }, { "cell_type": "code", "execution_count": 42, "id": "90dd98c3-e87c-47e5-aee4-5b453ab291a7", "metadata": {}, "outputs": [], "source": [ "# Define Deep Learning network architectures\n", "\n", "# DEnse MLP\n", "\n", "def build_dense_model( hidden_units=(128, 64), activation=\"relu\", dropout_rate=0.2,\n", " l2_reg=1e-4,optimizer=\"adam\", loss=\"mse\",\n", "):\n", " inputs = keras.Input(shape=(N_INPUT,), name=\"xy_input\")\n", " x = inputs\n", " for i, units in enumerate(hidden_units):\n", " x = layers.Dense(\n", " units,\n", " activation=activation,\n", " kernel_regularizer=regularizers.l2(l2_reg) if l2_reg else None,\n", " name=f\"dense_{i+1}\",\n", " )(x)\n", " if dropout_rate > 0:\n", " x = layers.Dropout(dropout_rate, name=f\"dropout_{i+1}\")(x)\n", "\n", " outputs = layers.Dense(N_OUTPUT, activation=\"linear\", name=\"z_output\")(x)\n", " model = keras.Model(inputs, outputs, name=\"DenseModel\")\n", " return model\n" ] }, { "cell_type": "code", "execution_count": 23, "id": "2f525503-f9ef-42e6-bdeb-1df33d168410", "metadata": {}, "outputs": [], "source": [ "# Conv1D CNN\n", "\n", "def build_conv1d_model(filters=(64, 128), kernel_size=3, pool_size=2, dense_units=(64,),\n", " activation=\"relu\", dropout_rate=0.2,optimizer=\"adam\", loss=\"mse\",\n", "):\n", " inputs = keras.Input(shape=(WINDOW_SIZE, N_INPUT), name=\"xy_seq_input\")\n", " x = inputs\n", " for i, f in enumerate(filters):\n", " x = layers.Conv1D(f, kernel_size, activation=activation, padding=\"same\",\n", " name=f\"conv_{i+1}\")(x)\n", " x = layers.MaxPooling1D(pool_size, padding=\"same\", name=f\"pool_{i+1}\")(x)\n", " if dropout_rate > 0:\n", " x = layers.Dropout(dropout_rate, name=f\"drop_conv_{i+1}\")(x)\n", "\n", " x = layers.GlobalAveragePooling1D(name=\"gap\")(x)\n", "\n", " for i, units in enumerate(dense_units):\n", " x = layers.Dense(units, activation=activation, name=f\"fc_{i+1}\")(x)\n", " if dropout_rate > 0:\n", " x = layers.Dropout(dropout_rate, name=f\"drop_fc_{i+1}\")(x)\n", "\n", " outputs = layers.Dense(N_OUTPUT, activation=\"linear\", name=\"z_output\")(x)\n", " model = keras.Model(inputs, outputs, name=\"Conv1DModel\")\n", " return model" ] }, { "cell_type": "code", "execution_count": 24, "id": "7d8b4b93-bd1a-4443-aa87-c4f5f39b22b9", "metadata": {}, "outputs": [], "source": [ "# layers.LSTM\n", "\n", "def build_lstm_model(lstm_units=(64, 32), dense_units=(32,), activation=\"tanh\",\n", " dropout_rate=0.2, recurrent_dropout=0.0, optimizer=\"adam\", loss=\"mse\",\n", "):\n", " inputs = keras.Input(shape=(WINDOW_SIZE, N_INPUT), name=\"xy_seq_input\")\n", " x = inputs\n", " for i, units in enumerate(lstm_units):\n", " return_sequences = (i < len(lstm_units) - 1) \n", " x = layers.LSTM(\n", " units,\n", " return_sequences=return_sequences,\n", " dropout=dropout_rate,\n", " recurrent_dropout=recurrent_dropout,\n", " name=f\"lstm_{i+1}\",\n", " )(x)\n", "\n", " for i, units in enumerate(dense_units):\n", " x = layers.Dense(units, activation=\"relu\", name=f\"fc_{i+1}\")(x)\n", " if dropout_rate > 0:\n", " x = layers.Dropout(dropout_rate, name=f\"drop_fc_{i+1}\")(x)\n", "\n", " outputs = layers.Dense(N_OUTPUT, activation=\"linear\", name=\"z_output\")(x)\n", " model = keras.Model(inputs, outputs, name=\"LSTMModel\")\n", " return model" ] }, { "cell_type": "code", "execution_count": 25, "id": "1841d0b4-4805-4667-91a2-00c8d3696e77", "metadata": {}, "outputs": [], "source": [ "# layers.GRU \n", "\n", "def build_gru_model(gru_units=(64, 32), dense_units=(32,), dropout_rate=0.2, \n", " recurrent_dropout=0.0, optimizer=\"adam\", loss=\"mse\",):\n", " inputs = keras.Input(shape=(WINDOW_SIZE, N_INPUT), name=\"xy_seq_input\")\n", " x = inputs\n", " for i, units in enumerate(gru_units):\n", " return_sequences = (i < len(gru_units) - 1)\n", " x = layers.GRU(\n", " units,\n", " return_sequences=return_sequences,\n", " dropout=dropout_rate,\n", " recurrent_dropout=recurrent_dropout,\n", " name=f\"gru_{i+1}\",\n", " )(x)\n", "\n", " for i, units in enumerate(dense_units):\n", " x = layers.Dense(units, activation=\"relu\", name=f\"fc_{i+1}\")(x)\n", " if dropout_rate > 0:\n", " x = layers.Dropout(dropout_rate, name=f\"drop_fc_{i+1}\")(x)\n", "\n", " outputs = layers.Dense(N_OUTPUT, activation=\"linear\", name=\"z_output\")(x)\n", " model = keras.Model(inputs, outputs, name=\"GRUModel\")\n", " return model" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.14.0" } }, "nbformat": 4, "nbformat_minor": 5 }