| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- import logging
- import matplotlib.pyplot as plt
- import numpy as np
- import scipy.stats as st
- from ProbMap import ProbMap, ProbMapData
- from Robot import Robot
- class Sensor:
- def __init__(self, tracker, coverage_radius=150) -> None:
- self.type = 'Cam'
- # host tracker
- self.tracker = tracker
- self.coverage_radius = coverage_radius
- # the sensor's condidence will multiplied by this factor
- self.health = 1.0
- def get_detection(self):
- all_targets = self.tracker.simulator.targets
- detections = []
- for target in all_targets:
- if np.linalg.norm(target.position - self.tracker.position) < self.coverage_radius:
- detection = (target.position-self.tracker.position)
- # logging.debug(
- # f"Ture {self.tracker.name}{self.tracker.id} detection:\t" + str(detection))
- # make up some noise and calculate the confidence of the detection
- std_dev = 1
- noise = np.random.normal(loc=0, scale=std_dev, size=2)
- confidence = sum(st.norm.pdf(
- noise, loc=0, scale=std_dev)*2)/2*std_dev
- if confidence <= 0.55:
- confidence = 0.55
- detection = detection + noise
- distance = np.linalg.norm(detection)
- if distance > self.coverage_radius:
- detection = detection * (self.coverage_radius/distance)
- detection = np.append(detection, confidence)
- detections.append(detection)
- logging.debug(
- f"Noisy {self.tracker.log_head} Detection: {detection}")
- return detections
- class Tracker(Robot):
- def __init__(self, simulator, name: str, id: int, position: np.array, coverage_radius) -> None:
- super().__init__(simulator, name, id, position)
- self.sensor = Sensor(self, coverage_radius)
- self.neighbor = set()
- self.area_width = 2000 # meter
- self.area_height = 2000 # meter
- self.resolution = 1 # meter
- self.prob_map = ProbMap(self.area_width, self.area_height, self.resolution,
- center_x=0.0, center_y=0.0, init_val=0.6,
- false_alarm_prob=0.05)
- self.observations = dict() # type: dict[tuple]
- self.shareable_v = ProbMapData()
- self.shareable_Q = ProbMapData()
- self.neighbors_v = dict()
- self.neighbors_Q = dict()
- def build_shareable_info(self, shareable_info, info_type):
- """Generate shareable information from local
- Args:
- shareable_info (dict): Stores all local infomation. Format: {(x, y) : value}
- """
- local_meas_info = ProbMapData()
- local_meas_info.tracker_id = self.id
- local_meas_info.type = info_type
- for k, v in shareable_info.items():
- local_meas_info.grid_ind += k
- local_meas_info.values.append(v)
- self.shareable_v = local_meas_info
- def get_info_from_neighbors(self, req_type):
- neighbors_info = dict()
- # Send requests and get responses from all neighbors' services
- # Collect info from neighbors
- if req_type == 'v':
- for e in self.neighbor:
- self.neighbors_v[e] = self.simulator.trackers[e].shareable_v
- for _id, res in self.neighbors_v.items():
- for i in range(len(res.values)):
- cell_ind = tuple([res.grid_ind[i*2], res.grid_ind[i*2+1]])
- # sum up all neighbors' measurement values
- value = res.values[i]
- try:
- neighbors_info[cell_ind] += value
- except KeyError:
- neighbors_info[cell_ind] = value
- elif req_type == 'Q':
- for e in self.neighbor:
- self.neighbors_v[e] = self.simulator.trackers[e].shareable_Q
- for _id, res in self.neighbors_Q.items():
- for i in range(len(res.values)):
- cell_ind = tuple([res.grid_ind[i*2], res.grid_ind[i*2+1]])
- # sum up all neighbors' values and counting, need to calculate average value
- value = res.values[i]
- try:
- neighbors_info[cell_ind][0] += value
- neighbors_info[cell_ind][1] += 1.
- except KeyError:
- neighbors_info[cell_ind] = [value, 1.]
- return neighbors_info
- def sensing(self):
- detections = self.sensor.get_detection()
- output_detection = dict()
- id_counter = 0
- for det in detections:
- transformed_detection = det+np.append(self.position, 0)
- output_detection[id_counter] = transformed_detection
- id_counter += 1
- self.observations = output_detection
-
- def random_moving(self):
- # if already reached the previous waypoint
- if self.waypoint_ctrl():
- self.ang = (self.ang + np.deg2rad(np.random.randint(-360, 360))) % 360
- rot_mat = np.asarray([[np.cos(self.ang), -np.sin(self.ang)],
- [np.sin(self.ang), np.cos(self.ang)]])
- self.waypoint_ctrl(speed=20, desired_pos=(self.position
- - np.dot([0, 10], rot_mat)))
- def job(self):
- self.random_moving()
- self.sensing()
- # logging.debug(f"{self.log_head} OBSERVATION: {self.observations}")
- shareable_v = self.prob_map.generate_shareable_v(
- self.observations)
- # build shareable_v and publish it
- self.build_shareable_info(shareable_v, 'v')
- # logging.debug(f"{self.name}_{self.id}: {self.shareable_v.grid_ind}")
- # get all neighbors' detections
- neighbors_meas = self.get_info_from_neighbors('v')
- # logging.debug("{}{} got neighbor {} info: {}".format(
- # self.name, self.id, self.neighbor, neighbors_meas))
- # # Update the local map by all detections (local and neighbors')
- self.prob_map.map_update(shareable_v, neighbors_meas,
- len(self.simulator.trackers), len(self.neighbor))
- # Convert prob map to a shareable information and publish it
- self.build_shareable_info(
- self.prob_map.non_empty_cell, 'Q')
- # Collect neighbors' map (Q) for consensus
- neighbors_map = self.get_info_from_neighbors('Q')
- # # rospy.loginfo("{} got neighbors' map: {}".format(
- # # self.name, neighbors_map))
- # Make consensus, merge neighbors' map
- self.prob_map.consensus(neighbors_map)
- self.target_estimates = self.prob_map.get_target_est(
- 0.5, normalization=True)
- logging.debug(
- f"{self.name}_{self.id} ProbMap: {self.prob_map.prob_map}")
- logging.debug(
- f"{self.name}_{self.id} NonEmp: {self.prob_map.non_empty_cell}")
- # print(target_estimates)
|