tracker.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. import logging
  2. import matplotlib.pyplot as plt
  3. import numpy as np
  4. import scipy.stats as st
  5. from ProbMap import ProbMap, ProbMapData
  6. from Robot import Robot
  7. class Sensor:
  8. def __init__(self, tracker, coverage_radius=150) -> None:
  9. self.type = 'Cam'
  10. # host tracker
  11. self.tracker = tracker
  12. self.coverage_radius = coverage_radius
  13. def get_detection(self):
  14. all_targets = self.tracker.simulator.targets
  15. detections = []
  16. for target in all_targets:
  17. if np.linalg.norm(target.position - self.tracker.position) < self.coverage_radius:
  18. detection = (target.position-self.tracker.position)
  19. # logging.debug(
  20. # f"Ture {self.tracker.name}{self.tracker.id} detection:\t" + str(detection))
  21. # make up some noise and calculate the confidence of the detection
  22. std_dev = 1
  23. noise = np.random.normal(loc=0, scale=std_dev, size=2)
  24. confidence = sum(st.norm.pdf(
  25. noise, loc=0, scale=std_dev)*2)/2*std_dev
  26. if confidence <= 0.55:
  27. confidence = 0.55
  28. detection = detection + noise
  29. distance = np.linalg.norm(detection)
  30. if distance > self.coverage_radius:
  31. detection = detection * (self.coverage_radius/distance)
  32. detection = np.append(detection, confidence)
  33. detections.append(detection)
  34. logging.debug(
  35. f"Noisy {self.tracker.log_head} Detection: {detection}")
  36. return detections
  37. class Tracker(Robot):
  38. def __init__(self, simulator, name: str, id: int, position: np.array, coverage_radius) -> None:
  39. super().__init__(simulator, name, id, position)
  40. self.sensor = Sensor(self, coverage_radius)
  41. self.neighbor = set()
  42. self.area_width = 2000 # meter
  43. self.area_height = 2000 # meter
  44. self.resolution = 0.5 # meter
  45. self.prob_map = ProbMap(self.area_width, self.area_height, self.resolution,
  46. center_x=0.0, center_y=0.0, init_val=0.05,
  47. false_alarm_prob=0.01)
  48. self.observations = dict() # type: dict[tuple]
  49. self.shareable_v = ProbMapData()
  50. self.shareable_Q = ProbMapData()
  51. self.neighbors_v = dict()
  52. self.neighbors_Q = dict()
  53. def build_shareable_info(self, shareable_info, info_type):
  54. """Generate shareable information from local
  55. Args:
  56. shareable_info (dict): Stores all local infomation. Format: {(x, y) : value}
  57. """
  58. local_meas_info = ProbMapData()
  59. local_meas_info.tracker_id = self.id
  60. local_meas_info.type = info_type
  61. for k, v in shareable_info.items():
  62. local_meas_info.grid_ind += k
  63. local_meas_info.values.append(v)
  64. self.shareable_v = local_meas_info
  65. def get_info_from_neighbors(self, req_type):
  66. neighbors_info = dict()
  67. # Send requests and get responses from all neighbors' services
  68. # Collect info from neighbors
  69. if req_type == 'v':
  70. for e in self.neighbor:
  71. self.neighbors_v[e] = self.simulator.trackers[e].shareable_v
  72. for _id, res in self.neighbors_v.items():
  73. for i in range(len(res.values)):
  74. cell_ind = tuple([res.grid_ind[i*2], res.grid_ind[i*2+1]])
  75. # sum up all neighbors' measurement values
  76. value = res.values[i]
  77. try:
  78. neighbors_info[cell_ind] += value
  79. except KeyError:
  80. neighbors_info[cell_ind] = value
  81. elif req_type == 'Q':
  82. for e in self.neighbor:
  83. self.neighbors_v[e] = self.simulator.trackers[e].shareable_Q
  84. for _id, res in self.neighbors_Q.items():
  85. for i in range(len(res.values)):
  86. cell_ind = tuple([res.grid_ind[i*2], res.grid_ind[i*2+1]])
  87. # sum up all neighbors' values and counting, need to calculate average value
  88. value = res.values[i]
  89. try:
  90. neighbors_info[cell_ind][0] += value
  91. neighbors_info[cell_ind][1] += 1.
  92. except KeyError:
  93. neighbors_info[cell_ind] = [value, 1.]
  94. return neighbors_info
  95. def sensing(self):
  96. detections = self.sensor.get_detection()
  97. output_detection = dict()
  98. id_counter = 0
  99. for det in detections:
  100. transformed_detection = det+np.append(self.position, 0)
  101. output_detection[id_counter] = transformed_detection
  102. id_counter += 1
  103. self.observations = output_detection
  104. def job(self):
  105. self.sensing()
  106. logging.debug(f"{self.log_head} OBSERVATION: {self.observations}")
  107. shareable_v = self.prob_map.generate_shareable_v(
  108. self.observations)
  109. # build shareable_v and publish it
  110. self.build_shareable_info(shareable_v, 'v')
  111. # logging.debug(f"{self.name}_{self.id}: {self.shareable_v.grid_ind}")
  112. # get all neighbors' detections
  113. neighbors_meas = self.get_info_from_neighbors('v')
  114. # logging.debug("{}{} got neighbor {} info: {}".format(
  115. # self.name, self.id, self.neighbor, neighbors_meas))
  116. # # Update the local map by all detections (local and neighbors')
  117. self.prob_map.map_update(shareable_v, neighbors_meas,
  118. len(self.simulator.trackers), len(self.neighbor))
  119. # Convert prob map to a shareable information and publish it
  120. self.build_shareable_info(
  121. self.prob_map.non_empty_cell, 'Q')
  122. # Collect neighbors' map (Q) for consensus
  123. neighbors_map = self.get_info_from_neighbors('Q')
  124. # # rospy.loginfo("{} got neighbors' map: {}".format(
  125. # # self.name, neighbors_map))
  126. # Make consensus, merge neighbors' map
  127. self.prob_map.consensus(neighbors_map)
  128. self.target_estimates = self.prob_map.get_target_est(
  129. 0.6, normalization=True)
  130. logging.debug(
  131. f"{self.name}_{self.id} ProbMap: {self.prob_map.prob_map}")
  132. # print(target_estimates)