| from multiprocessing import Value |
| import airsim |
| import time |
| import math |
| import keyboard |
| import json |
|
|
| class DroneInfo(): |
| gps_pos_altitude = 0 |
| gps_pos_latitude = 0 |
| gps_pos_longitude = 0 |
|
|
| ori_w = 0 |
| ori_x = 0 |
| ori_y = 0 |
| ori_z = 0 |
|
|
| |
| l_v_x = 0 |
| l_v_y = 0 |
| l_v_z = 0 |
|
|
| |
| a_v_x = 0 |
| a_v_y = 0 |
| a_v_z = 0 |
|
|
| roll = 0 |
| yaw = 0 |
| pitch = 0 |
|
|
| linear_speed = 0 |
|
|
|
|
| class Drone(): |
| def __init__(self): |
| |
| self.client = airsim.MultirotorClient() |
| self.client.confirmConnection() |
| self.client.enableApiControl(True) |
| self.client.armDisarm(True) |
|
|
| def TakeOff(self): |
| if airsim.LandedState.Landed == 0: |
| self.client.takeoffAsync().join() |
| print("take off") |
| else: |
| self.client.hoverAsync().join() |
| print("already flying") |
|
|
| def Landed(self): |
| if airsim.LandedState.Landed == 1: |
| print("already landed") |
| else: |
| self.client.landAsync().join() |
| print("now landing") |
|
|
| def DroneMoveByTime(self, vx, vy, vz, duration=3): |
| print("now speed", vx, vy, vz) |
| self.client.moveByVelocityAsync(vx, vy, vz, duration) |
| time.sleep(duration) |
| print("End Moving") |
|
|
| def GetState(self): |
| |
| State = DroneInfo() |
| gps_pos = self.client.getMultirotorState().gps_location |
| State.gps_pos_altitude = gps_pos.altitude |
| State.gps_pos_longitude = gps_pos.longitude |
| State.gps_pos_latitude = gps_pos.latitude |
|
|
| orientation = self.client.getMultirotorState().kinematics_estimated.orientation |
| State.ori_w = orientation.w_val |
| State.ori_x = orientation.x_val |
| State.ori_y = orientation.y_val |
| State.ori_z = orientation.z_val |
|
|
| a_velocity = self.client.getMultirotorState().kinematics_estimated.angular_velocity |
| State.a_v_x = a_velocity.x_val |
| State.a_v_y = a_velocity.y_val |
| State.a_v_z = a_velocity.z_val |
|
|
| l_velocity = self.client.getMultirotorState().kinematics_estimated.linear_velocity |
| State.l_v_x = l_velocity.x_val |
| State.l_v_y = l_velocity.y_val |
| State.l_v_z = l_velocity.z_val |
|
|
| State.roll = self.client.getMultirotorState().rc_data.roll |
| State.yaw = self.client.getMultirotorState().rc_data.yaw |
| State.pitch = self.client.getMultirotorState().rc_data.pitch |
|
|
| State.linear_speed = math.sqrt(pow(State.l_v_x,2)+pow(State.l_v_y,2)+pow(State.l_v_z,2)) |
|
|
| return State |
|
|
| def Hover(self): |
| landed = self.client.getMultirotorState() |
|
|
| def MoveToPosition(self,x,y,z,duration=3): |
| self.client.moveToPositionAsync(x, y, z, duration).join() |
|
|
| def Reset(self): |
| self.client.reset() |
|
|
| def SnowTeleport(self): |
| position = airsim.Vector3r(9.87236677 , -279.41862836, -22.81082173) |
| heading = airsim.utils.to_quaternion(0, 0, 0) |
| pose = airsim.Pose(position, heading) |
| self.client.simSetVehiclePose(pose, True) |
|
|
| def OvercastTeleport(self): |
| position = airsim.Vector3r(213.24547002 , -429.66393833, 19.260) |
| heading = airsim.utils.to_quaternion(0, 0, 0) |
| pose = airsim.Pose(position, heading) |
| self.client.simSetVehiclePose(pose, True) |
|
|
| def RainTeleport(self): |
| position = airsim.Vector3r(-315.200 , -444.920, 12.520) |
| heading = airsim.utils.to_quaternion(0, 0, 0) |
| pose = airsim.Pose(position, heading) |
| self.client.simSetVehiclePose(pose, True) |
| |
| def SandTeleport(self): |
| position = airsim.Vector3r(-335.50020795 , 282.16650494, 19.50313758) |
| heading = airsim.utils.to_quaternion(0, 0, 0) |
| pose = airsim.Pose(position, heading) |
| self.client.simSetVehiclePose(pose, True) |
|
|
| def ClearTeleport(self): |
| position = airsim.Vector3r(-152.28332052 , 236.00371837, 66.40114393) |
| heading = airsim.utils.to_quaternion(0, 0, 0) |
| pose = airsim.Pose(position, heading) |
| self.client.simSetVehiclePose(pose, True) |
|
|
| def FogTeleport(self): |
| position = airsim.Vector3r(-430.39138328 , -91.7420777, 13.86624983) |
| heading = airsim.utils.to_quaternion(0, 0, 0) |
| pose = airsim.Pose(position, heading) |
| self.client.simSetVehiclePose(pose, True) |
|
|
|
|
| def CloseAPI(self): |
| self.client.enableApiControl(False) |
|
|
| def KeyboardControl(self,x): |
| w = keyboard.KeyboardEvent('down', 150, 'w') |
| s = keyboard.KeyboardEvent('down', 150, 's') |
| a = keyboard.KeyboardEvent('down', 150, 'a') |
| d = keyboard.KeyboardEvent('down', 150, 'd') |
| up = keyboard.KeyboardEvent('down', 150, 'up') |
| down = keyboard.KeyboardEvent('down', 150, 'down') |
| left = keyboard.KeyboardEvent('down', 150, 'left') |
| right = keyboard.KeyboardEvent('down', 150, 'right') |
| k = keyboard.KeyboardEvent('down', 28, 'k') |
| l = keyboard.KeyboardEvent('down', 28, 'l') |
| if x.event_type == 'down' and x.name == w.name: |
| self.client.moveByVelocityBodyFrameAsync(3, 0, 0, 0.5) |
| print("forward") |
| elif x.event_type == 'down' and x.name == s.name: |
| self.client.moveByVelocityBodyFrameAsync(-3, 0, 0, 0.5) |
| print("back") |
| elif x.event_type == 'down' and x.name == a.name: |
| self.client.moveByVelocityBodyFrameAsync(0, -2, 0, 0.5) |
| print("left") |
| elif x.event_type == 'down' and x.name == d.name: |
| self.client.moveByVelocityBodyFrameAsync(0, 2, 0, 0.5) |
| print("right") |
| elif x.event_type == 'down' and x.name == up.name: |
| self.client.moveByVelocityBodyFrameAsync(0, 0, -0.5, 0.5) |
| print("up") |
| elif x.event_type == 'down' and x.name == down.name: |
| self.client.moveByVelocityBodyFrameAsync(0, 0, 0.5, 0.5) |
| print("down") |
| elif x.event_type == 'down' and x.name == left.name: |
| self.client.rotateByYawRateAsync(-20, 0.5) |
| print("turn left") |
| elif x.event_type == 'down' and x.name == right.name: |
| self.client.rotateByYawRateAsync(20, 0.5) |
| print("turn right") |
| elif x.event_type == 'down' and x.name == k.name: |
| |
| self.client.enableApiControl(True) |
| print("get control") |
| |
| self.client.armDisarm(True) |
| print("unlock") |
| |
| self.client.takeoffAsync().join() |
| print("takeoff") |
| elif x.event_type == 'down' and x.name == l.name: |
| keyboard.wait("l") |
| keyboard.unhook_all() |
| else: |
| self.client.moveByVelocityBodyFrameAsync(0, 0, 0, 0.5).join() |
| self.client.hoverAsync().join() |
| print("hovering") |
|
|
| def ReplayRoute(self): |
| with open('Route.json', 'r', encoding='utf-8') as file: |
| data = json.load(file) |
|
|
| location_x = data.get("Location.X", []) |
| location_y = data.get("Location.Y", []) |
| location_z = data.get("Location.Z", []) |
|
|
| |
| all_frames = set() |
| for arr in [location_x, location_y, location_z]: |
| all_frames.update([item["frame"] for item in arr]) |
| all_frames = sorted(all_frames) |
|
|
| interp_x = interpolate_axis(location_x, all_frames) |
| interp_y = interpolate_axis(location_y, all_frames) |
| interp_z = interpolate_axis(location_z, all_frames) |
|
|
| for i in range(len(all_frames) - 1): |
| position = airsim.Vector3r(interp_x[i]/100, interp_y[i]/100, -interp_z[i]/100) |
| position_next = airsim.Vector3r(interp_x[i+1]/100, interp_y[i+1]/100, -interp_z[i+1]/100) |
| time = (all_frames[i+1] - all_frames[i]) / 30 |
| speed = Calc_distance(position, position_next) / time if time > 0 else 1 |
| print(position) |
| self.client.moveToPositionAsync(position.x_val, position.y_val, position.z_val, speed).join() |
|
|
|
|
| def interpolate_axis(axis_list, target_frames): |
| if not axis_list: |
| return [0.0] * len(target_frames) |
| axis_list = sorted(axis_list, key=lambda x: x["frame"]) |
| frames = [item["frame"] for item in axis_list] |
| values = [item["value"] for item in axis_list] |
| result = [] |
| for f in target_frames: |
| if f <= frames[0]: |
| result.append(values[0]) |
| elif f >= frames[-1]: |
| result.append(values[-1]) |
| else: |
| for i in range(len(frames) - 1): |
| if frames[i] <= f <= frames[i+1]: |
| t = (f - frames[i]) / (frames[i+1] - frames[i]) |
| v = values[i] + t * (values[i+1] - values[i]) |
| result.append(v) |
| break |
| return result |
|
|
|
|
| def Calc_distance(pos1, pos2): |
| return math.sqrt((pos1.x_val - pos2.x_val) ** 2 + (pos1.y_val - pos2.y_val) ** 2 + (pos1.z_val - pos2.z_val) ** 2) |
|
|
| if __name__ == '__main__': |
| dronne = Drone() |
| |
| dronne.ReplayRoute() |
|
|
| dronecontrol = Drone() |
|
|
|
|