import argparse import sys import os import serial import time import paramiko import io from pathlib import Path from datetime import datetime def create_ssh_connection(): """Create SSH connection to remote server""" ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect('topos.exypno.tech', port=420, username='albert') return ssh except Exception as e: print(f"Failed to connect to remote server: {e}") return None def set_gain(ser, gain=8): """Set 2x gain on all channels (1-16) for Cyton+Daisy""" print(f"Setting {gain}x gain on all channels...") # Stop any streaming first ser.write(b's') time.sleep(0.5) gain_mapping = [1, 2, 4, 6, 8, 12, 24] gain_val = gain_mapping.index(gain) # Main board channels (1-8) main_channels = ['1', '2', '3', '4', '5', '6', '7', '8'] # Daisy board channels (9-16 represented as Q-I) daisy_channels = ['Q', 'W', 'E', 'R', 'T', 'Y', 'U', 'I'] # Combine all channel commands into one string commands = '' for ch in main_channels + daisy_channels: commands += f'x{ch}0{gain_val}0000X' # Send all commands at once ser.write(commands.encode()) time.sleep(0.5) # Clear any response from the serial buffer ser.reset_input_buffer() print("Gain settings updated") def set_sample_rate(ser, freq): """Set sample rate using the '~' command""" # Sample rate mapping according to documentation freq_mapping = { 16000: '0', 8000: '1', 4000: '2', 2000: '3', 1000: '4', 500: '5', 250: '6' } if freq not in freq_mapping: raise ValueError(f"Unsupported frequency {freq}Hz. Supported rates: {list(freq_mapping.keys())}") # Stop any streaming first ser.write(b's') time.sleep(0.5) # Set sample rate command = f"~{freq_mapping[freq]}" ser.write(command.encode()) time.sleep(0.5) # Clear response from buffer ser.reset_input_buffer() print(f"Sample rate set to {freq}Hz") def init_board(ser): """Initialize the OpenBCI board for 16 channels""" print("Initializing board...") # Stop any previous streaming ser.write(b's') time.sleep(1) # Soft reset ser.write(b'v') time.sleep(2) # Clear buffers ser.reset_input_buffer() ser.reset_output_buffer() # Enable 16 channel mode ser.write(b'C') time.sleep(1) # Enable all channels (1-16) # First 8 channels commands = [b'!', b'@', b'#', b'$', b'%', b'^', b'&', b'*'] # Next 8 channels (Daisy module) commands.extend([b'Q', b'W', b'E', b'R', b'T', b'Y', b'U', b'I']) for cmd in commands: ser.write(cmd) time.sleep(0.1) # Set high-speed mode ser.write(b'\xF0\x06') # Set baud rate to 230400 time.sleep(1) ser.baudrate = 230400 set_gain(ser, gain=2) print("Board initialized") def find_packet_start(ser): """Find the start of a packet by looking for 0xA0 header""" while True: byte = ser.read() if byte[0] == 0xA0: # Header byte return byte return None def read_complete_packet(ser): """Read a complete packet ensuring proper alignment""" # Find the start of packet start_byte = find_packet_start(ser) if not start_byte: return None # Read remaining 32 bytes remaining_bytes = ser.read(32) if len(remaining_bytes) != 32: return None # Verify footer byte (0xCx) if (remaining_bytes[31] & 0xF0) != 0xC0: return None return start_byte + remaining_bytes def process_packet(packet): """Process a 33-byte packet and extract channel data according to documentation""" if len(packet) != 33: return None channels = [] for i in range(8): start_idx = 2 + (i * 3) # Start at byte 2 (after header and sample number) channel_data = packet[start_idx:start_idx + 3] # Convert 24-bit to 32-bit signed int according to documentation if (channel_data[0] & 0x80): # If negative number value = -1 * ((((~channel_data[0] & 0xFF) << 16) | ((~channel_data[1] & 0xFF) << 8) | (~channel_data[2] & 0xFF)) + 1) else: # If positive number value = (channel_data[0] << 16) | (channel_data[1] << 8) | channel_data[2] # Convert to microvolts: 4.5V / gain / (2^23 - 1) scale_factor = 4.5 / (24.0 * 8388607.0) * 1000000 # Using gain of 24 channels.append(value * scale_factor) return channels def start_sd_recording(ser, duration='G'): """Start recording to SD card with specified duration Duration codes: A = 5MIN S = 15MIN F = 30MIN G = 1HR (default) H = 2HR J = 4HR K = 12HR L = 24HR a = ~14sec (test) """ valid_durations = {'A', 'S', 'F', 'G', 'H', 'J', 'K', 'L', 'a'} if duration not in valid_durations: raise ValueError(f"Invalid duration code. Valid codes: {valid_durations}") print(f"Starting SD card recording with duration code {duration}") ser.write(duration.encode()) time.sleep(0.5) ser.write(b'b') time.sleep(0.5) def stop_sd_recording(ser): """Stop SD card recording""" print("Stopping SD card recording") ser.write(b's') time.sleep(0.5) ser.write(b'j') time.sleep(0.5) def sd_record(port, duration='G', sample_rate=1000): """Record data to SD card""" duration_map = { 'A': 5*60, # 5 minutes 'S': 15*60, # 15 minutes 'F': 30*60, # 30 minutes 'G': 60*60, # 1 hour 'H': 2*60*60, # 2 hours 'J': 4*60*60, # 4 hours 'K': 12*60*60, # 12 hours 'L': 24*60*60, # 24 hours 'a': 14 # ~14 seconds (test) } # Open serial port ser = serial.Serial(port, 115200) time.sleep(2) try: # Initialize board init_board(ser) # Set sample rate set_sample_rate(ser, sample_rate) # Start recording start_sd_recording(ser, duration) # Calculate wait time wait_time = duration_map[duration] start_time = time.time() print(f"Recording to SD card for {wait_time} seconds...") try: while (time.time() - start_time) < wait_time: remaining = wait_time - (time.time() - start_time) print(f"\rRecording... {remaining:.1f} seconds remaining ", end='') time.sleep(0.1) except KeyboardInterrupt: print("\nRecording interrupted by user") finally: # Always stop recording stop_sd_recording(ser) print("\nRecording complete") finally: ser.close() def main(): parser = argparse.ArgumentParser(description='OpenBCI EEG Recording Tool') parser.add_argument('--port', '-p', type=str, default='/dev/ttyUSB0', help='Serial port to use (default: /dev/ttyUSB0)') parser.add_argument('--filename', '-o', type=str, help='Output filename (default: openbci_.txt)') parser.add_argument('--sd', action='store_true', help='Record to SD card instead of streaming to PC') parser.add_argument('--duration', type=str, default='G', help='SD card recording duration code (default: G = 1 hour)') parser.add_argument('--sample-rate', type=int, default=1000, help='Sample rate in Hz (default: 1000)') parser.add_argument('--remote', action='store_true', help='Write to remote server instead of local file') args = parser.parse_args() if args.sd: sd_record(args.port, args.duration, args.sample_rate) return if args.filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") args.filename = f"openbci_{timestamp}.txt" # Open serial port ser = serial.Serial(args.port, 115200) time.sleep(2) init_board(ser) filename = args.filename if args.remote: ssh = create_ssh_connection() if not ssh: print("Failed to establish SSH connection. Exiting.") return sftp = ssh.open_sftp() remote_file = sftp.open(filename, 'w') # Write header header = "Timestamp," header += ",".join([f"Channel{i+1}" for i in range(16)]) header += "\n" remote_file.write(header) else: # Original local file writing with open(filename, 'w') as f: header = "Timestamp," header += ",".join([f"Channel{i+1}" for i in range(16)]) header += "\n" f.write(header) # Start streaming ser.write(b'b') time.sleep(0.5) ser.reset_input_buffer() print(f"Started recording to {filename}") packet_count = 0 start_time = time.time() buffer = io.StringIO() last_write = time.time() try: while True: # Read two properly aligned packets packet1 = read_complete_packet(ser) if packet1: packet2 = read_complete_packet(ser) if packet2: # Process both packets data1 = process_packet(packet1) data2 = process_packet(packet2) if data1 and data2: packet_count += 1 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") all_channels = data1 + data2 # Combine all 16 channels data_str = [f"{x:.6f}" for x in all_channels] line = timestamp + "," + ",".join(data_str) + "\n" if args.remote: buffer.write(line) # Write buffer every 100ms if time.time() - last_write >= 0.1: remote_file.write(buffer.getvalue()) buffer = io.StringIO() last_write = time.time() else: with open(filename, 'a') as f: f.write(line) # Print status every second if packet_count % 125 == 0: elapsed_time = time.time() - start_time rate = packet_count / elapsed_time print(f"\rRecording... {rate:.1f} Hz, {packet_count} packets", end='') # Check for buffer overflow if ser.in_waiting > 1000: print(f"\nWarning: Buffer building up ({ser.in_waiting} bytes)") ser.reset_input_buffer() except KeyboardInterrupt: # Stop streaming ser.write(b's') ser.close() if args.remote: # Write any remaining data in buffer if buffer.getvalue(): remote_file.write(buffer.getvalue()) remote_file.close() sftp.close() ssh.close() # Print final statistics elapsed_time = time.time() - start_time rate = packet_count / elapsed_time print(f"\n\nRecording stopped") print(f"Duration: {elapsed_time:.1f} seconds") print(f"Packets recorded: {packet_count}") print(f"Average sample rate: {rate:.1f} Hz") print(f"Data saved to: {filename}") if __name__ == "__main__": main()