tracker.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import logging
  2. import matplotlib.pyplot as plt
  3. import numpy as np
  4. import scipy.stats as st
  5. from ProbMap import ProbMap, ProbMapData
  6. from Robot import Robot
  7. class Sensor:
  8. def __init__(self, tracker, coverage_radius=150) -> None:
  9. self.type = 'Cam'
  10. # host tracker
  11. self.tracker = tracker
  12. self.coverage_radius = coverage_radius
  13. # the sensor's condidence will multiplied by this factor
  14. self.health = 1.0
  15. def get_detection(self):
  16. all_targets = self.tracker.simulator.targets
  17. detections = []
  18. for target in all_targets:
  19. if np.linalg.norm(target.position - self.tracker.position) < self.coverage_radius:
  20. detection = (target.position-self.tracker.position)
  21. # logging.debug(
  22. # f"Ture {self.tracker.name}{self.tracker.id} detection:\t" + str(detection))
  23. # make up some noise and calculate the confidence of the detection
  24. std_dev = 1
  25. noise = np.random.normal(loc=0, scale=std_dev, size=2)
  26. confidence = sum(st.norm.pdf(
  27. noise, loc=0, scale=std_dev)*2)/2*std_dev
  28. if confidence <= 0.55:
  29. confidence = 0.55
  30. detection = detection + noise
  31. distance = np.linalg.norm(detection)
  32. if distance > self.coverage_radius:
  33. detection = detection * (self.coverage_radius/distance)
  34. detection = np.append(detection, confidence)
  35. detections.append(detection)
  36. logging.debug(
  37. f"Noisy {self.tracker.log_head} Detection: {detection}")
  38. return detections
  39. class Tracker(Robot):
  40. def __init__(self, simulator, name: str, id: int, position: np.array, coverage_radius) -> None:
  41. super().__init__(simulator, name, id, position)
  42. self.sensor = Sensor(self, coverage_radius)
  43. self.neighbor = set()
  44. self.area_width = 2000 # meter
  45. self.area_height = 2000 # meter
  46. self.resolution = 1 # meter
  47. self.prob_map = ProbMap(self.area_width, self.area_height, self.resolution,
  48. center_x=0.0, center_y=0.0, init_val=0.6,
  49. false_alarm_prob=0.05)
  50. self.observations = dict() # type: dict[tuple]
  51. self.shareable_v = ProbMapData()
  52. self.shareable_Q = ProbMapData()
  53. self.neighbors_v = dict()
  54. self.neighbors_Q = dict()
  55. def build_shareable_info(self, shareable_info, info_type):
  56. """Generate shareable information from local
  57. Args:
  58. shareable_info (dict): Stores all local infomation. Format: {(x, y) : value}
  59. """
  60. local_meas_info = ProbMapData()
  61. local_meas_info.tracker_id = self.id
  62. local_meas_info.type = info_type
  63. for k, v in shareable_info.items():
  64. local_meas_info.grid_ind += k
  65. local_meas_info.values.append(v)
  66. self.shareable_v = local_meas_info
  67. def get_info_from_neighbors(self, req_type):
  68. neighbors_info = dict()
  69. # Send requests and get responses from all neighbors' services
  70. # Collect info from neighbors
  71. if req_type == 'v':
  72. for e in self.neighbor:
  73. self.neighbors_v[e] = self.simulator.trackers[e].shareable_v
  74. for _id, res in self.neighbors_v.items():
  75. for i in range(len(res.values)):
  76. cell_ind = tuple([res.grid_ind[i*2], res.grid_ind[i*2+1]])
  77. # sum up all neighbors' measurement values
  78. value = res.values[i]
  79. try:
  80. neighbors_info[cell_ind] += value
  81. except KeyError:
  82. neighbors_info[cell_ind] = value
  83. elif req_type == 'Q':
  84. for e in self.neighbor:
  85. self.neighbors_v[e] = self.simulator.trackers[e].shareable_Q
  86. for _id, res in self.neighbors_Q.items():
  87. for i in range(len(res.values)):
  88. cell_ind = tuple([res.grid_ind[i*2], res.grid_ind[i*2+1]])
  89. # sum up all neighbors' values and counting, need to calculate average value
  90. value = res.values[i]
  91. try:
  92. neighbors_info[cell_ind][0] += value
  93. neighbors_info[cell_ind][1] += 1.
  94. except KeyError:
  95. neighbors_info[cell_ind] = [value, 1.]
  96. return neighbors_info
  97. def sensing(self):
  98. detections = self.sensor.get_detection()
  99. output_detection = dict()
  100. id_counter = 0
  101. for det in detections:
  102. transformed_detection = det+np.append(self.position, 0)
  103. output_detection[id_counter] = transformed_detection
  104. id_counter += 1
  105. self.observations = output_detection
  106. def random_moving(self):
  107. # if already reached the previous waypoint
  108. if self.waypoint_ctrl():
  109. self.ang = (self.ang + np.deg2rad(np.random.randint(-360, 360))) % 360
  110. rot_mat = np.asarray([[np.cos(self.ang), -np.sin(self.ang)],
  111. [np.sin(self.ang), np.cos(self.ang)]])
  112. self.waypoint_ctrl(speed=20, desired_pos=(self.position
  113. - np.dot([0, 10], rot_mat)))
  114. def job(self):
  115. self.random_moving()
  116. self.sensing()
  117. # logging.debug(f"{self.log_head} OBSERVATION: {self.observations}")
  118. shareable_v = self.prob_map.generate_shareable_v(
  119. self.observations)
  120. # build shareable_v and publish it
  121. self.build_shareable_info(shareable_v, 'v')
  122. # logging.debug(f"{self.name}_{self.id}: {self.shareable_v.grid_ind}")
  123. # get all neighbors' detections
  124. neighbors_meas = self.get_info_from_neighbors('v')
  125. # logging.debug("{}{} got neighbor {} info: {}".format(
  126. # self.name, self.id, self.neighbor, neighbors_meas))
  127. # # Update the local map by all detections (local and neighbors')
  128. self.prob_map.map_update(shareable_v, neighbors_meas,
  129. len(self.simulator.trackers), len(self.neighbor))
  130. # Convert prob map to a shareable information and publish it
  131. self.build_shareable_info(
  132. self.prob_map.non_empty_cell, 'Q')
  133. # Collect neighbors' map (Q) for consensus
  134. neighbors_map = self.get_info_from_neighbors('Q')
  135. # # rospy.loginfo("{} got neighbors' map: {}".format(
  136. # # self.name, neighbors_map))
  137. # Make consensus, merge neighbors' map
  138. self.prob_map.consensus(neighbors_map)
  139. self.target_estimates = self.prob_map.get_target_est(
  140. 0.5, normalization=True)
  141. logging.debug(
  142. f"{self.name}_{self.id} ProbMap: {self.prob_map.prob_map}")
  143. logging.debug(
  144. f"{self.name}_{self.id} NonEmp: {self.prob_map.non_empty_cell}")
  145. # print(target_estimates)