| import argparse |
| import sys |
| import os |
| import serial |
| import time |
| import paramiko |
| import io |
| from pathlib import Path |
| from datetime import datetime |
|
|
| def create_ssh_connection(): |
| """Create SSH connection to remote server""" |
| ssh = paramiko.SSHClient() |
| ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
| try: |
| ssh.connect('topos.exypno.tech', port=420, username='albert') |
| return ssh |
| except Exception as e: |
| print(f"Failed to connect to remote server: {e}") |
| return None |
|
|
| def set_gain(ser, gain=8): |
| """Set 2x gain on all channels (1-16) for Cyton+Daisy""" |
| print(f"Setting {gain}x gain on all channels...") |
|
|
| |
| ser.write(b's') |
| time.sleep(0.5) |
|
|
| gain_mapping = [1, 2, 4, 6, 8, 12, 24] |
| gain_val = gain_mapping.index(gain) |
|
|
| |
| main_channels = ['1', '2', '3', '4', '5', '6', '7', '8'] |
| |
| daisy_channels = ['Q', 'W', 'E', 'R', 'T', 'Y', 'U', 'I'] |
|
|
| |
| commands = '' |
| for ch in main_channels + daisy_channels: |
| commands += f'x{ch}0{gain_val}0000X' |
|
|
| |
| ser.write(commands.encode()) |
| time.sleep(0.5) |
|
|
| |
| ser.reset_input_buffer() |
|
|
| print("Gain settings updated") |
|
|
| def set_sample_rate(ser, freq): |
| """Set sample rate using the '~' command""" |
| |
| freq_mapping = { |
| 16000: '0', |
| 8000: '1', |
| 4000: '2', |
| 2000: '3', |
| 1000: '4', |
| 500: '5', |
| 250: '6' |
| } |
|
|
| if freq not in freq_mapping: |
| raise ValueError(f"Unsupported frequency {freq}Hz. Supported rates: {list(freq_mapping.keys())}") |
|
|
| |
| ser.write(b's') |
| time.sleep(0.5) |
|
|
| |
| command = f"~{freq_mapping[freq]}" |
| ser.write(command.encode()) |
| time.sleep(0.5) |
|
|
| |
| ser.reset_input_buffer() |
| print(f"Sample rate set to {freq}Hz") |
|
|
| def init_board(ser): |
| """Initialize the OpenBCI board for 16 channels""" |
| print("Initializing board...") |
| |
| |
| ser.write(b's') |
| time.sleep(1) |
| |
| |
| ser.write(b'v') |
| time.sleep(2) |
| |
| |
| ser.reset_input_buffer() |
| ser.reset_output_buffer() |
| |
| |
| ser.write(b'C') |
| time.sleep(1) |
| |
| |
| |
| commands = [b'!', b'@', b'#', b'$', b'%', b'^', b'&', b'*'] |
| |
| commands.extend([b'Q', b'W', b'E', b'R', b'T', b'Y', b'U', b'I']) |
| |
| for cmd in commands: |
| ser.write(cmd) |
| time.sleep(0.1) |
|
|
| |
| ser.write(b'\xF0\x06') |
| time.sleep(1) |
| ser.baudrate = 230400 |
|
|
| set_gain(ser, gain=2) |
| |
| print("Board initialized") |
|
|
| def find_packet_start(ser): |
| """Find the start of a packet by looking for 0xA0 header""" |
| while True: |
| byte = ser.read() |
| if byte[0] == 0xA0: |
| return byte |
| return None |
|
|
| def read_complete_packet(ser): |
| """Read a complete packet ensuring proper alignment""" |
| |
| start_byte = find_packet_start(ser) |
| if not start_byte: |
| return None |
| |
| |
| remaining_bytes = ser.read(32) |
| if len(remaining_bytes) != 32: |
| return None |
| |
| |
| if (remaining_bytes[31] & 0xF0) != 0xC0: |
| return None |
| |
| return start_byte + remaining_bytes |
|
|
| def process_packet(packet): |
| """Process a 33-byte packet and extract channel data according to documentation""" |
| if len(packet) != 33: |
| return None |
| |
| channels = [] |
| for i in range(8): |
| start_idx = 2 + (i * 3) |
| channel_data = packet[start_idx:start_idx + 3] |
| |
| |
| if (channel_data[0] & 0x80): |
| value = -1 * ((((~channel_data[0] & 0xFF) << 16) | |
| ((~channel_data[1] & 0xFF) << 8) | |
| (~channel_data[2] & 0xFF)) + 1) |
| else: |
| value = (channel_data[0] << 16) | (channel_data[1] << 8) | channel_data[2] |
| |
| |
| scale_factor = 4.5 / (24.0 * 8388607.0) * 1000000 |
| channels.append(value * scale_factor) |
| |
| return channels |
|
|
| def start_sd_recording(ser, duration='G'): |
| """Start recording to SD card with specified duration |
| Duration codes: |
| A = 5MIN |
| S = 15MIN |
| F = 30MIN |
| G = 1HR (default) |
| H = 2HR |
| J = 4HR |
| K = 12HR |
| L = 24HR |
| a = ~14sec (test) |
| """ |
| valid_durations = {'A', 'S', 'F', 'G', 'H', 'J', 'K', 'L', 'a'} |
| if duration not in valid_durations: |
| raise ValueError(f"Invalid duration code. Valid codes: {valid_durations}") |
| |
| print(f"Starting SD card recording with duration code {duration}") |
| ser.write(duration.encode()) |
| time.sleep(0.5) |
| ser.write(b'b') |
| time.sleep(0.5) |
| |
| def stop_sd_recording(ser): |
| """Stop SD card recording""" |
| print("Stopping SD card recording") |
| ser.write(b's') |
| time.sleep(0.5) |
| ser.write(b'j') |
| time.sleep(0.5) |
|
|
| def sd_record(port, duration='G', sample_rate=1000): |
| """Record data to SD card""" |
| duration_map = { |
| 'A': 5*60, |
| 'S': 15*60, |
| 'F': 30*60, |
| 'G': 60*60, |
| 'H': 2*60*60, |
| 'J': 4*60*60, |
| 'K': 12*60*60, |
| 'L': 24*60*60, |
| 'a': 14 |
| } |
|
|
| |
| ser = serial.Serial(port, 115200) |
| time.sleep(2) |
| |
| try: |
| |
| init_board(ser) |
| |
| |
| set_sample_rate(ser, sample_rate) |
| |
| |
| start_sd_recording(ser, duration) |
| |
| |
| wait_time = duration_map[duration] |
| start_time = time.time() |
| |
| print(f"Recording to SD card for {wait_time} seconds...") |
| |
| try: |
| while (time.time() - start_time) < wait_time: |
| remaining = wait_time - (time.time() - start_time) |
| print(f"\rRecording... {remaining:.1f} seconds remaining ", end='') |
| time.sleep(0.1) |
| |
| except KeyboardInterrupt: |
| print("\nRecording interrupted by user") |
| |
| finally: |
| |
| stop_sd_recording(ser) |
| print("\nRecording complete") |
| |
| finally: |
| ser.close() |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description='OpenBCI EEG Recording Tool') |
| parser.add_argument('--port', '-p', type=str, default='/dev/ttyUSB0', |
| help='Serial port to use (default: /dev/ttyUSB0)') |
| parser.add_argument('--filename', '-o', type=str, |
| help='Output filename (default: openbci_<timestamp>.txt)') |
| parser.add_argument('--sd', action='store_true', |
| help='Record to SD card instead of streaming to PC') |
| parser.add_argument('--duration', type=str, default='G', |
| help='SD card recording duration code (default: G = 1 hour)') |
| parser.add_argument('--sample-rate', type=int, default=1000, |
| help='Sample rate in Hz (default: 1000)') |
| parser.add_argument('--remote', action='store_true', |
| help='Write to remote server instead of local file') |
| args = parser.parse_args() |
|
|
| if args.sd: |
| sd_record(args.port, args.duration, args.sample_rate) |
| return |
|
|
| if args.filename is None: |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| args.filename = f"openbci_{timestamp}.txt" |
|
|
| |
| ser = serial.Serial(args.port, 115200) |
| time.sleep(2) |
| init_board(ser) |
|
|
| filename = args.filename |
|
|
| if args.remote: |
| ssh = create_ssh_connection() |
| if not ssh: |
| print("Failed to establish SSH connection. Exiting.") |
| return |
|
|
| sftp = ssh.open_sftp() |
| remote_file = sftp.open(filename, 'w') |
|
|
| |
| header = "Timestamp," |
| header += ",".join([f"Channel{i+1}" for i in range(16)]) |
| header += "\n" |
| remote_file.write(header) |
| else: |
| |
| with open(filename, 'w') as f: |
| header = "Timestamp," |
| header += ",".join([f"Channel{i+1}" for i in range(16)]) |
| header += "\n" |
| f.write(header) |
|
|
| |
| ser.write(b'b') |
| time.sleep(0.5) |
| ser.reset_input_buffer() |
|
|
| print(f"Started recording to {filename}") |
|
|
| packet_count = 0 |
| start_time = time.time() |
| buffer = io.StringIO() |
| last_write = time.time() |
|
|
| try: |
| while True: |
| |
| packet1 = read_complete_packet(ser) |
| if packet1: |
| packet2 = read_complete_packet(ser) |
| if packet2: |
| |
| data1 = process_packet(packet1) |
| data2 = process_packet(packet2) |
|
|
| if data1 and data2: |
| packet_count += 1 |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") |
| all_channels = data1 + data2 |
| data_str = [f"{x:.6f}" for x in all_channels] |
| line = timestamp + "," + ",".join(data_str) + "\n" |
|
|
| if args.remote: |
| buffer.write(line) |
|
|
| |
| if time.time() - last_write >= 0.1: |
| remote_file.write(buffer.getvalue()) |
| buffer = io.StringIO() |
| last_write = time.time() |
| else: |
| with open(filename, 'a') as f: |
| f.write(line) |
|
|
| |
| if packet_count % 125 == 0: |
| elapsed_time = time.time() - start_time |
| rate = packet_count / elapsed_time |
| print(f"\rRecording... {rate:.1f} Hz, {packet_count} packets", end='') |
|
|
| |
| if ser.in_waiting > 1000: |
| print(f"\nWarning: Buffer building up ({ser.in_waiting} bytes)") |
| ser.reset_input_buffer() |
|
|
| except KeyboardInterrupt: |
| |
| ser.write(b's') |
| ser.close() |
|
|
| if args.remote: |
| |
| if buffer.getvalue(): |
| remote_file.write(buffer.getvalue()) |
| remote_file.close() |
| sftp.close() |
| ssh.close() |
|
|
| |
| elapsed_time = time.time() - start_time |
| rate = packet_count / elapsed_time |
| print(f"\n\nRecording stopped") |
| print(f"Duration: {elapsed_time:.1f} seconds") |
| print(f"Packets recorded: {packet_count}") |
| print(f"Average sample rate: {rate:.1f} Hz") |
| print(f"Data saved to: {filename}") |
|
|
| if __name__ == "__main__": |
| main() |
|
|