morphism / cyton.py
acb's picture
Upload 5 files
5e284bb verified
import argparse
import sys
import os
import serial
import time
import paramiko
import io
from pathlib import Path
from datetime import datetime
def create_ssh_connection():
"""Create SSH connection to remote server"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect('topos.exypno.tech', port=420, username='albert')
return ssh
except Exception as e:
print(f"Failed to connect to remote server: {e}")
return None
def set_gain(ser, gain=8):
"""Set 2x gain on all channels (1-16) for Cyton+Daisy"""
print(f"Setting {gain}x gain on all channels...")
# Stop any streaming first
ser.write(b's')
time.sleep(0.5)
gain_mapping = [1, 2, 4, 6, 8, 12, 24]
gain_val = gain_mapping.index(gain)
# Main board channels (1-8)
main_channels = ['1', '2', '3', '4', '5', '6', '7', '8']
# Daisy board channels (9-16 represented as Q-I)
daisy_channels = ['Q', 'W', 'E', 'R', 'T', 'Y', 'U', 'I']
# Combine all channel commands into one string
commands = ''
for ch in main_channels + daisy_channels:
commands += f'x{ch}0{gain_val}0000X'
# Send all commands at once
ser.write(commands.encode())
time.sleep(0.5)
# Clear any response from the serial buffer
ser.reset_input_buffer()
print("Gain settings updated")
def set_sample_rate(ser, freq):
"""Set sample rate using the '~' command"""
# Sample rate mapping according to documentation
freq_mapping = {
16000: '0',
8000: '1',
4000: '2',
2000: '3',
1000: '4',
500: '5',
250: '6'
}
if freq not in freq_mapping:
raise ValueError(f"Unsupported frequency {freq}Hz. Supported rates: {list(freq_mapping.keys())}")
# Stop any streaming first
ser.write(b's')
time.sleep(0.5)
# Set sample rate
command = f"~{freq_mapping[freq]}"
ser.write(command.encode())
time.sleep(0.5)
# Clear response from buffer
ser.reset_input_buffer()
print(f"Sample rate set to {freq}Hz")
def init_board(ser):
"""Initialize the OpenBCI board for 16 channels"""
print("Initializing board...")
# Stop any previous streaming
ser.write(b's')
time.sleep(1)
# Soft reset
ser.write(b'v')
time.sleep(2)
# Clear buffers
ser.reset_input_buffer()
ser.reset_output_buffer()
# Enable 16 channel mode
ser.write(b'C')
time.sleep(1)
# Enable all channels (1-16)
# First 8 channels
commands = [b'!', b'@', b'#', b'$', b'%', b'^', b'&', b'*']
# Next 8 channels (Daisy module)
commands.extend([b'Q', b'W', b'E', b'R', b'T', b'Y', b'U', b'I'])
for cmd in commands:
ser.write(cmd)
time.sleep(0.1)
# Set high-speed mode
ser.write(b'\xF0\x06') # Set baud rate to 230400
time.sleep(1)
ser.baudrate = 230400
set_gain(ser, gain=2)
print("Board initialized")
def find_packet_start(ser):
"""Find the start of a packet by looking for 0xA0 header"""
while True:
byte = ser.read()
if byte[0] == 0xA0: # Header byte
return byte
return None
def read_complete_packet(ser):
"""Read a complete packet ensuring proper alignment"""
# Find the start of packet
start_byte = find_packet_start(ser)
if not start_byte:
return None
# Read remaining 32 bytes
remaining_bytes = ser.read(32)
if len(remaining_bytes) != 32:
return None
# Verify footer byte (0xCx)
if (remaining_bytes[31] & 0xF0) != 0xC0:
return None
return start_byte + remaining_bytes
def process_packet(packet):
"""Process a 33-byte packet and extract channel data according to documentation"""
if len(packet) != 33:
return None
channels = []
for i in range(8):
start_idx = 2 + (i * 3) # Start at byte 2 (after header and sample number)
channel_data = packet[start_idx:start_idx + 3]
# Convert 24-bit to 32-bit signed int according to documentation
if (channel_data[0] & 0x80): # If negative number
value = -1 * ((((~channel_data[0] & 0xFF) << 16) |
((~channel_data[1] & 0xFF) << 8) |
(~channel_data[2] & 0xFF)) + 1)
else: # If positive number
value = (channel_data[0] << 16) | (channel_data[1] << 8) | channel_data[2]
# Convert to microvolts: 4.5V / gain / (2^23 - 1)
scale_factor = 4.5 / (24.0 * 8388607.0) * 1000000 # Using gain of 24
channels.append(value * scale_factor)
return channels
def start_sd_recording(ser, duration='G'):
"""Start recording to SD card with specified duration
Duration codes:
A = 5MIN
S = 15MIN
F = 30MIN
G = 1HR (default)
H = 2HR
J = 4HR
K = 12HR
L = 24HR
a = ~14sec (test)
"""
valid_durations = {'A', 'S', 'F', 'G', 'H', 'J', 'K', 'L', 'a'}
if duration not in valid_durations:
raise ValueError(f"Invalid duration code. Valid codes: {valid_durations}")
print(f"Starting SD card recording with duration code {duration}")
ser.write(duration.encode())
time.sleep(0.5)
ser.write(b'b')
time.sleep(0.5)
def stop_sd_recording(ser):
"""Stop SD card recording"""
print("Stopping SD card recording")
ser.write(b's')
time.sleep(0.5)
ser.write(b'j')
time.sleep(0.5)
def sd_record(port, duration='G', sample_rate=1000):
"""Record data to SD card"""
duration_map = {
'A': 5*60, # 5 minutes
'S': 15*60, # 15 minutes
'F': 30*60, # 30 minutes
'G': 60*60, # 1 hour
'H': 2*60*60, # 2 hours
'J': 4*60*60, # 4 hours
'K': 12*60*60, # 12 hours
'L': 24*60*60, # 24 hours
'a': 14 # ~14 seconds (test)
}
# Open serial port
ser = serial.Serial(port, 115200)
time.sleep(2)
try:
# Initialize board
init_board(ser)
# Set sample rate
set_sample_rate(ser, sample_rate)
# Start recording
start_sd_recording(ser, duration)
# Calculate wait time
wait_time = duration_map[duration]
start_time = time.time()
print(f"Recording to SD card for {wait_time} seconds...")
try:
while (time.time() - start_time) < wait_time:
remaining = wait_time - (time.time() - start_time)
print(f"\rRecording... {remaining:.1f} seconds remaining ", end='')
time.sleep(0.1)
except KeyboardInterrupt:
print("\nRecording interrupted by user")
finally:
# Always stop recording
stop_sd_recording(ser)
print("\nRecording complete")
finally:
ser.close()
def main():
parser = argparse.ArgumentParser(description='OpenBCI EEG Recording Tool')
parser.add_argument('--port', '-p', type=str, default='/dev/ttyUSB0',
help='Serial port to use (default: /dev/ttyUSB0)')
parser.add_argument('--filename', '-o', type=str,
help='Output filename (default: openbci_<timestamp>.txt)')
parser.add_argument('--sd', action='store_true',
help='Record to SD card instead of streaming to PC')
parser.add_argument('--duration', type=str, default='G',
help='SD card recording duration code (default: G = 1 hour)')
parser.add_argument('--sample-rate', type=int, default=1000,
help='Sample rate in Hz (default: 1000)')
parser.add_argument('--remote', action='store_true',
help='Write to remote server instead of local file')
args = parser.parse_args()
if args.sd:
sd_record(args.port, args.duration, args.sample_rate)
return
if args.filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
args.filename = f"openbci_{timestamp}.txt"
# Open serial port
ser = serial.Serial(args.port, 115200)
time.sleep(2)
init_board(ser)
filename = args.filename
if args.remote:
ssh = create_ssh_connection()
if not ssh:
print("Failed to establish SSH connection. Exiting.")
return
sftp = ssh.open_sftp()
remote_file = sftp.open(filename, 'w')
# Write header
header = "Timestamp,"
header += ",".join([f"Channel{i+1}" for i in range(16)])
header += "\n"
remote_file.write(header)
else:
# Original local file writing
with open(filename, 'w') as f:
header = "Timestamp,"
header += ",".join([f"Channel{i+1}" for i in range(16)])
header += "\n"
f.write(header)
# Start streaming
ser.write(b'b')
time.sleep(0.5)
ser.reset_input_buffer()
print(f"Started recording to {filename}")
packet_count = 0
start_time = time.time()
buffer = io.StringIO()
last_write = time.time()
try:
while True:
# Read two properly aligned packets
packet1 = read_complete_packet(ser)
if packet1:
packet2 = read_complete_packet(ser)
if packet2:
# Process both packets
data1 = process_packet(packet1)
data2 = process_packet(packet2)
if data1 and data2:
packet_count += 1
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
all_channels = data1 + data2 # Combine all 16 channels
data_str = [f"{x:.6f}" for x in all_channels]
line = timestamp + "," + ",".join(data_str) + "\n"
if args.remote:
buffer.write(line)
# Write buffer every 100ms
if time.time() - last_write >= 0.1:
remote_file.write(buffer.getvalue())
buffer = io.StringIO()
last_write = time.time()
else:
with open(filename, 'a') as f:
f.write(line)
# Print status every second
if packet_count % 125 == 0:
elapsed_time = time.time() - start_time
rate = packet_count / elapsed_time
print(f"\rRecording... {rate:.1f} Hz, {packet_count} packets", end='')
# Check for buffer overflow
if ser.in_waiting > 1000:
print(f"\nWarning: Buffer building up ({ser.in_waiting} bytes)")
ser.reset_input_buffer()
except KeyboardInterrupt:
# Stop streaming
ser.write(b's')
ser.close()
if args.remote:
# Write any remaining data in buffer
if buffer.getvalue():
remote_file.write(buffer.getvalue())
remote_file.close()
sftp.close()
ssh.close()
# Print final statistics
elapsed_time = time.time() - start_time
rate = packet_count / elapsed_time
print(f"\n\nRecording stopped")
print(f"Duration: {elapsed_time:.1f} seconds")
print(f"Packets recorded: {packet_count}")
print(f"Average sample rate: {rate:.1f} Hz")
print(f"Data saved to: {filename}")
if __name__ == "__main__":
main()