tracker.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import logging
  2. import matplotlib.pyplot as plt
  3. import numpy as np
  4. import scipy.stats as st
  5. from ProbMap import ProbMap, ProbMapData
  6. from Robot import Robot
  7. class Sensor:
  8. def __init__(self, tracker, coverage_radius=150) -> None:
  9. self.type = 'Cam'
  10. # host tracker
  11. self.tracker = tracker
  12. self.coverage_radius = coverage_radius
  13. def get_detection(self):
  14. all_targets = self.tracker.simulator.targets
  15. detections = []
  16. for target in all_targets:
  17. if np.linalg.norm(target.position - self.tracker.position) < self.coverage_radius:
  18. detection = (target.position-self.tracker.position)
  19. # logging.debug(
  20. # f"Ture {self.tracker.name}{self.tracker.id} detection:\t" + str(detection))
  21. # make up some noise and calculate the confidence of the detection
  22. std_dev = 1
  23. noise = np.random.normal(loc=0, scale=std_dev, size=2)
  24. confidence = sum(st.norm.pdf(
  25. noise, loc=0, scale=std_dev)*2)/2*std_dev
  26. detection = detection + noise
  27. distance = np.linalg.norm(detection)
  28. if distance > self.coverage_radius:
  29. detection = detection * (self.coverage_radius/distance)
  30. detection = np.append(detection, confidence)
  31. detections.append(detection)
  32. logging.debug(
  33. f"Noisy {self.tracker.name}{self.tracker.id} detection: {detection}")
  34. return detections
  35. class Tracker(Robot):
  36. def __init__(self, simulator, name: str, id: int, position: np.array, coverage_radius, rate: int) -> None:
  37. super().__init__(simulator, name, id, position, rate)
  38. self.sensor = Sensor(self, coverage_radius)
  39. self.neighbor = set()
  40. self.area_width = 2000 # meter
  41. self.area_height = 2000 # meter
  42. self.resolution = 1.0 # meter
  43. self.prob_map = ProbMap(self.area_width, self.area_height, self.resolution,
  44. center_x=0.0, center_y=0.0, init_val=0.01,
  45. false_alarm_prob=0.05)
  46. self.observations = dict() # type: dict[tuple]
  47. self.shareable_v = ProbMapData()
  48. self.shareable_Q = ProbMapData()
  49. self.neighbors_v = dict()
  50. self.neighbors_Q = dict()
  51. def build_shareable_info(self, shareable_info, info_type):
  52. """Generate shareable information from local
  53. Args:
  54. shareable_info (dict): Stores all local infomation. Format: {(x, y) : value}
  55. """
  56. local_meas_info = ProbMapData()
  57. local_meas_info.tracker_id = self.id
  58. local_meas_info.type = info_type
  59. for k, v in shareable_info.items():
  60. local_meas_info.grid_ind += k
  61. local_meas_info.values.append(v)
  62. self.shareable_v = local_meas_info
  63. def get_info_from_neighbors(self, req_type):
  64. neighbors_info = dict()
  65. # Send requests and get responses from all neighbors' services
  66. # Collect info from neighbors
  67. if req_type == 'v':
  68. for e in self.neighbor:
  69. self.neighbors_v[e] = self.simulator.trackers[e].shareable_v
  70. for _id, res in self.neighbors_v.items():
  71. for i in range(len(res.values)):
  72. cell_ind = tuple([res.grid_ind[i*2], res.grid_ind[i*2+1]])
  73. # sum up all neighbors' measurement values
  74. value = res.values[i]
  75. try:
  76. neighbors_info[cell_ind] += value
  77. except KeyError:
  78. neighbors_info[cell_ind] = value
  79. elif req_type == 'Q':
  80. for e in self.neighbor:
  81. self.neighbors_v[e] = self.simulator.trackers[e].shareable_Q
  82. for _id, res in self.neighbors_Q.items():
  83. for i in range(len(res.values)):
  84. cell_ind = tuple([res.grid_ind[i*2], res.grid_ind[i*2+1]])
  85. # sum up all neighbors' values and counting, need to calculate average value
  86. value = res.values[i]
  87. try:
  88. neighbors_info[cell_ind][0] += value
  89. neighbors_info[cell_ind][1] += 1.
  90. except KeyError:
  91. neighbors_info[cell_ind] = [value, 1.]
  92. return neighbors_info
  93. def sensing(self):
  94. detections = self.sensor.get_detection()
  95. output_detection = dict()
  96. id_counter = 0
  97. for det in detections:
  98. output_detection[id_counter] = det
  99. id_counter += 1
  100. self.observations = output_detection
  101. def job(self):
  102. self.sensing()
  103. shareable_v = self.prob_map.generate_shareable_v(
  104. self.observations)
  105. # build shareable_v and publish it
  106. self.build_shareable_info(shareable_v, 'v')
  107. # logging.debug(f"{self.name}_{self.id}: {self.shareable_v.grid_ind}")
  108. # get all neighbors' detections
  109. neighbors_meas = self.get_info_from_neighbors('v')
  110. # logging.debug("{}{} got neighbor {} info: {}".format(
  111. # self.name, self.id, self.neighbor, neighbors_meas))
  112. # # Update the local map by all detections (local and neighbors')
  113. self.prob_map.map_update(shareable_v, neighbors_meas,
  114. len(self.simulator.trackers), len(self.neighbor))
  115. # Convert prob map to a shareable information and publish it
  116. self.build_shareable_info(
  117. self.prob_map.non_empty_cell, 'Q')
  118. # Collect neighbors' map (Q) for consensus
  119. neighbors_map = self.get_info_from_neighbors('Q')
  120. # # rospy.loginfo("{} got neighbors' map: {}".format(
  121. # # self.name, neighbors_map))
  122. # Make consensus, merge neighbors' map
  123. self.prob_map.consensus(neighbors_map)
  124. self.target_estimates = self.prob_map.get_target_est(
  125. 0.8, normalization=False)
  126. print(self.prob_map.prob_map)
  127. # print(target_estimates)