|
@@ -1,98 +1,149 @@
|
|
|
-import numpy as np
|
|
|
|
|
-import warnings
|
|
|
|
|
import logging
|
|
import logging
|
|
|
|
|
|
|
|
|
|
+import matplotlib.pyplot as plt
|
|
|
|
|
+import numpy as np
|
|
|
|
|
+import scipy.stats as st
|
|
|
|
|
+
|
|
|
|
|
+from ProbMap import ProbMap, ProbMapData
|
|
|
|
|
+from Robot import Robot
|
|
|
|
|
+
|
|
|
|
|
|
|
|
class Sensor:
|
|
class Sensor:
|
|
|
- def __init__(self, tracker, sensor_rad=150) -> None:
|
|
|
|
|
|
|
+ def __init__(self, tracker, coverage_radius=150) -> None:
|
|
|
self.type = 'Cam'
|
|
self.type = 'Cam'
|
|
|
|
|
+ # host tracker
|
|
|
self.tracker = tracker
|
|
self.tracker = tracker
|
|
|
- self.coverage_radius = sensor_rad
|
|
|
|
|
|
|
+ self.coverage_radius = coverage_radius
|
|
|
|
|
|
|
|
- def get_detection(self, targets):
|
|
|
|
|
|
|
+ def get_detection(self):
|
|
|
|
|
+ all_targets = self.tracker.simulator.targets
|
|
|
detections = []
|
|
detections = []
|
|
|
- for target in targets:
|
|
|
|
|
|
|
+ for target in all_targets:
|
|
|
if np.linalg.norm(target.position - self.tracker.position) < self.coverage_radius:
|
|
if np.linalg.norm(target.position - self.tracker.position) < self.coverage_radius:
|
|
|
detection = (target.position-self.tracker.position)
|
|
detection = (target.position-self.tracker.position)
|
|
|
- logging.info(
|
|
|
|
|
- f"Ture {self.tracker.name}{self.tracker.id} detection:\n\t\t" + str(detection))
|
|
|
|
|
- detection += np.random.normal(loc=0, scale=2, size=2)
|
|
|
|
|
|
|
+ # logging.debug(
|
|
|
|
|
+ # f"Ture {self.tracker.name}{self.tracker.id} detection:\t" + str(detection))
|
|
|
|
|
+ # make up some noise and calculate the confidence of the detection
|
|
|
|
|
+ std_dev = 1
|
|
|
|
|
+ noise = np.random.normal(loc=0, scale=std_dev, size=2)
|
|
|
|
|
+ confidence = sum(st.norm.pdf(
|
|
|
|
|
+ noise, loc=0, scale=std_dev)*2)/2*std_dev
|
|
|
|
|
+ detection = detection + noise
|
|
|
distance = np.linalg.norm(detection)
|
|
distance = np.linalg.norm(detection)
|
|
|
if distance > self.coverage_radius:
|
|
if distance > self.coverage_radius:
|
|
|
detection = detection * (self.coverage_radius/distance)
|
|
detection = detection * (self.coverage_radius/distance)
|
|
|
|
|
+ detection = np.append(detection, confidence)
|
|
|
detections.append(detection)
|
|
detections.append(detection)
|
|
|
- logging.info(
|
|
|
|
|
- f"Noisy {self.tracker.name}{self.tracker.id} detection:\n\t\t"+str(detection))
|
|
|
|
|
|
|
+ logging.debug(
|
|
|
|
|
+ f"Noisy {self.tracker.name}{self.tracker.id} detection: {detection}")
|
|
|
return detections
|
|
return detections
|
|
|
|
|
|
|
|
|
|
|
|
|
-class Robot:
|
|
|
|
|
- def __init__(self, name: str, id: int, position: np.array, rate: int) -> None:
|
|
|
|
|
- """The base Robot class
|
|
|
|
|
-
|
|
|
|
|
- Parameters
|
|
|
|
|
- ----------
|
|
|
|
|
- name : str
|
|
|
|
|
- Robot's name
|
|
|
|
|
- id : int
|
|
|
|
|
- Robot's ID
|
|
|
|
|
- position : np.array
|
|
|
|
|
- Robot's start position
|
|
|
|
|
- rate : int
|
|
|
|
|
- Simulator's update rate
|
|
|
|
|
- """
|
|
|
|
|
- self.name = name
|
|
|
|
|
- self.id = id
|
|
|
|
|
- self.position = position
|
|
|
|
|
- self.desierd_position = self.position
|
|
|
|
|
- self.in_position = True
|
|
|
|
|
- self.speed = 0.0
|
|
|
|
|
- self.rate = rate
|
|
|
|
|
- self.max_speed = 10.0 # m/s
|
|
|
|
|
-
|
|
|
|
|
- def move(self):
|
|
|
|
|
- """Move the robot to a position
|
|
|
|
|
-
|
|
|
|
|
- Parameters
|
|
|
|
|
- ----------
|
|
|
|
|
- f : Function, optional
|
|
|
|
|
- a funcrtion that returns a position, by default None
|
|
|
|
|
|
|
+class Tracker(Robot):
|
|
|
|
|
+ def __init__(self, simulator, name: str, id: int, position: np.array, coverage_radius, rate: int) -> None:
|
|
|
|
|
+ super().__init__(simulator, name, id, position, rate)
|
|
|
|
|
+ self.sensor = Sensor(self, coverage_radius)
|
|
|
|
|
+ self.neighbor = set()
|
|
|
|
|
+
|
|
|
|
|
+ self.area_width = 2000 # meter
|
|
|
|
|
+ self.area_height = 2000 # meter
|
|
|
|
|
+ self.resolution = 1.0 # meter
|
|
|
|
|
+ self.prob_map = ProbMap(self.area_width, self.area_height, self.resolution,
|
|
|
|
|
+ center_x=0.0, center_y=0.0, init_val=0.01,
|
|
|
|
|
+ false_alarm_prob=0.05)
|
|
|
|
|
+
|
|
|
|
|
+ self.observations = dict() # type: dict[tuple]
|
|
|
|
|
+ self.shareable_v = ProbMapData()
|
|
|
|
|
+ self.shareable_Q = ProbMapData()
|
|
|
|
|
+ self.neighbors_v = dict()
|
|
|
|
|
+ self.neighbors_Q = dict()
|
|
|
|
|
+
|
|
|
|
|
+ def build_shareable_info(self, shareable_info, info_type):
|
|
|
|
|
+ """Generate shareable information from local
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ shareable_info (dict): Stores all local infomation. Format: {(x, y) : value}
|
|
|
"""
|
|
"""
|
|
|
- logging.warning("Moving function not impelemented")
|
|
|
|
|
- # warnings.warn("moving function not Impelement")
|
|
|
|
|
- pass
|
|
|
|
|
-
|
|
|
|
|
- def waypoint_ctrl(self, speed, desierd_pos: np.array = None):
|
|
|
|
|
- # if got new waypoint, update
|
|
|
|
|
- if not np.any(desierd_pos == None):
|
|
|
|
|
- self.desierd_position = desierd_pos
|
|
|
|
|
-
|
|
|
|
|
- if speed > self.max_speed:
|
|
|
|
|
- logging.warning("Given speed is over maximum")
|
|
|
|
|
- self.speed = speed
|
|
|
|
|
-
|
|
|
|
|
- if np.any(self.position != self.desierd_position):
|
|
|
|
|
- one_time_step_length = speed/self.rate
|
|
|
|
|
- difference = self.desierd_position - self.position
|
|
|
|
|
- distance = np.linalg.norm(difference)
|
|
|
|
|
- # if it's close enough to the goal position, make it in position
|
|
|
|
|
- if distance <= one_time_step_length:
|
|
|
|
|
- self.position = self.desierd_position
|
|
|
|
|
- else:
|
|
|
|
|
- # move to the goal point at the given speed
|
|
|
|
|
- direction = difference/distance
|
|
|
|
|
- self.position = self.position + direction*self.speed/self.rate
|
|
|
|
|
|
|
+ local_meas_info = ProbMapData()
|
|
|
|
|
+ local_meas_info.tracker_id = self.id
|
|
|
|
|
+ local_meas_info.type = info_type
|
|
|
|
|
+ for k, v in shareable_info.items():
|
|
|
|
|
+ local_meas_info.grid_ind += k
|
|
|
|
|
+ local_meas_info.values.append(v)
|
|
|
|
|
+ self.shareable_v = local_meas_info
|
|
|
|
|
|
|
|
- def job(self):
|
|
|
|
|
- pass
|
|
|
|
|
- # self.position = self.speed*(1/self.rate) + self.position
|
|
|
|
|
|
|
+ def get_info_from_neighbors(self, req_type):
|
|
|
|
|
+ neighbors_info = dict()
|
|
|
|
|
+ # Send requests and get responses from all neighbors' services
|
|
|
|
|
+ # Collect info from neighbors
|
|
|
|
|
+ if req_type == 'v':
|
|
|
|
|
+ for e in self.neighbor:
|
|
|
|
|
+ self.neighbors_v[e] = self.simulator.trackers[e].shareable_v
|
|
|
|
|
|
|
|
|
|
+ for _id, res in self.neighbors_v.items():
|
|
|
|
|
+ for i in range(len(res.values)):
|
|
|
|
|
+ cell_ind = tuple([res.grid_ind[i*2], res.grid_ind[i*2+1]])
|
|
|
|
|
+ # sum up all neighbors' measurement values
|
|
|
|
|
+ value = res.values[i]
|
|
|
|
|
+ try:
|
|
|
|
|
+ neighbors_info[cell_ind] += value
|
|
|
|
|
+ except KeyError:
|
|
|
|
|
+ neighbors_info[cell_ind] = value
|
|
|
|
|
+ elif req_type == 'Q':
|
|
|
|
|
+ for e in self.neighbor:
|
|
|
|
|
+ self.neighbors_v[e] = self.simulator.trackers[e].shareable_Q
|
|
|
|
|
+ for _id, res in self.neighbors_Q.items():
|
|
|
|
|
+ for i in range(len(res.values)):
|
|
|
|
|
+ cell_ind = tuple([res.grid_ind[i*2], res.grid_ind[i*2+1]])
|
|
|
|
|
+ # sum up all neighbors' values and counting, need to calculate average value
|
|
|
|
|
+ value = res.values[i]
|
|
|
|
|
+ try:
|
|
|
|
|
+ neighbors_info[cell_ind][0] += value
|
|
|
|
|
+ neighbors_info[cell_ind][1] += 1.
|
|
|
|
|
+ except KeyError:
|
|
|
|
|
+ neighbors_info[cell_ind] = [value, 1.]
|
|
|
|
|
+ return neighbors_info
|
|
|
|
|
|
|
|
-class Tracker(Robot):
|
|
|
|
|
- def __init__(self, name: str, id: int, position: np.array, sensor_rad, rate: int) -> None:
|
|
|
|
|
- super().__init__(name, id, position, rate)
|
|
|
|
|
- self.sensor = Sensor(self, sensor_rad)
|
|
|
|
|
- self.neighbor = set()
|
|
|
|
|
|
|
+ def sensing(self):
|
|
|
|
|
+ detections = self.sensor.get_detection()
|
|
|
|
|
+ output_detection = dict()
|
|
|
|
|
+ id_counter = 0
|
|
|
|
|
+ for det in detections:
|
|
|
|
|
+ output_detection[id_counter] = det
|
|
|
|
|
+ id_counter += 1
|
|
|
|
|
+ self.observations = output_detection
|
|
|
|
|
|
|
|
def job(self):
|
|
def job(self):
|
|
|
- self.waypoint_ctrl(speed=50, desierd_pos=np.array([500, 500]))
|
|
|
|
|
|
|
+ self.sensing()
|
|
|
|
|
+ shareable_v = self.prob_map.generate_shareable_v(
|
|
|
|
|
+ self.observations)
|
|
|
|
|
+
|
|
|
|
|
+ # build shareable_v and publish it
|
|
|
|
|
+ self.build_shareable_info(shareable_v, 'v')
|
|
|
|
|
+ # logging.debug(f"{self.name}_{self.id}: {self.shareable_v.grid_ind}")
|
|
|
|
|
+
|
|
|
|
|
+ # get all neighbors' detections
|
|
|
|
|
+ neighbors_meas = self.get_info_from_neighbors('v')
|
|
|
|
|
+ # logging.debug("{}{} got neighbor {} info: {}".format(
|
|
|
|
|
+ # self.name, self.id, self.neighbor, neighbors_meas))
|
|
|
|
|
+
|
|
|
|
|
+ # # Update the local map by all detections (local and neighbors')
|
|
|
|
|
+ self.prob_map.map_update(shareable_v, neighbors_meas,
|
|
|
|
|
+ len(self.simulator.trackers), len(self.neighbor))
|
|
|
|
|
+
|
|
|
|
|
+ # Convert prob map to a shareable information and publish it
|
|
|
|
|
+ self.build_shareable_info(
|
|
|
|
|
+ self.prob_map.non_empty_cell, 'Q')
|
|
|
|
|
+
|
|
|
|
|
+ # Collect neighbors' map (Q) for consensus
|
|
|
|
|
+ neighbors_map = self.get_info_from_neighbors('Q')
|
|
|
|
|
+ # # rospy.loginfo("{} got neighbors' map: {}".format(
|
|
|
|
|
+ # # self.name, neighbors_map))
|
|
|
|
|
+
|
|
|
|
|
+ # Make consensus, merge neighbors' map
|
|
|
|
|
+ self.prob_map.consensus(neighbors_map)
|
|
|
|
|
+
|
|
|
|
|
+ self.target_estimates = self.prob_map.get_target_est(
|
|
|
|
|
+ 0.8, normalization=False)
|
|
|
|
|
+ print(self.prob_map.prob_map)
|
|
|
|
|
+ # print(target_estimates)
|