Simsim.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import matplotlib.pyplot as plt
  2. import numpy as np
  3. from target import Target
  4. from tracker import Tracker
  5. import logging
  6. from collections import deque
  7. class Simsim:
  8. class Topics:
  9. def __init__(self) -> None:
  10. self.topics = dict()
  11. def add_topic(self, topic_name):
  12. self.topics[topic_name] = deque(maxlen=20)
  13. def del_topic(self, topic_name):
  14. try:
  15. del self.topics[topic_name]
  16. except:
  17. pass
  18. def publish(self, topic_name, msg):
  19. self.topics[topic_name].append(msg)
  20. def subs(self, topic_name):
  21. return self.topics[topic_name]
  22. def __init__(self) -> None:
  23. self.trackers = []
  24. self.edges = []
  25. self.targets = []
  26. self.map_size = [1000, 1000]
  27. self.rate = 30
  28. self.topics = self.Topics()
  29. plt.ion()
  30. self.fig, (self.plt_sim, self.plt_pm) = plt.subplots(
  31. 1, 2, figsize=(10, 5), dpi=160)
  32. plt.axis('equal')
  33. def add_tracker(self, name, position, sensor_rad):
  34. tracker = Tracker(self, name, len(self.trackers),
  35. position, sensor_rad)
  36. self.trackers.append(tracker)
  37. def add_edges(self, edges):
  38. self.edges.extend(edges)
  39. for e in edges:
  40. self.trackers[e[0]].neighbor.add(e[1])
  41. self.trackers[e[1]].neighbor.add(e[0])
  42. def add_target(self, name, position):
  43. target = Target(self, name, len(self.targets), position)
  44. self.targets.append(target)
  45. def _update_all(self):
  46. """Simulate once, update all trackers and targets
  47. """
  48. for tracker in self.trackers:
  49. tracker.job()
  50. for target in self.targets:
  51. target.job()
  52. def run(self, log_lvl=logging.WARN, ground_truth=False):
  53. logging.basicConfig(format='%(asctime)s.%(msecs)03d %(levelname)s: %(message)s',
  54. datefmt='%m/%d/%Y %H:%M:%S', level=log_lvl)
  55. while 1:
  56. self._update_all()
  57. self.plt_sim.cla()
  58. self.plt_pm.cla()
  59. self.plt_sim.set_xlim(0, self.map_size[0])
  60. self.plt_sim.set_ylim(0, self.map_size[1])
  61. self.plt_pm.set_xlim(10000, 20000)
  62. self.plt_pm.set_ylim(10000, 20000)
  63. if ground_truth:
  64. for target in self.targets:
  65. self.plt_sim.scatter(
  66. target.position[0], target.position[1], marker='x', s=2, color='r')
  67. for e in self.edges:
  68. t0 = self.trackers[e[0]]
  69. t1 = self.trackers[e[1]]
  70. self.plt_sim.plot([t0.position[0], t1.position[0]],
  71. [t0.position[1], t1.position[1]],
  72. linewidth=1, color='g', alpha=0.5)
  73. for t in self.trackers:
  74. # draw the trackers
  75. self.plt_sim.scatter(t.position[0], t.position[1],
  76. marker='s', s=20, c='b')
  77. self.plt_sim.annotate(t.id, (t.position[0], t.position[1]+10))
  78. # draw the camera coverage
  79. circle = plt.Circle(
  80. (t.position), t.sensor.coverage_radius, fill=False, color='grey', alpha=0.3)
  81. self.plt_sim.add_patch(circle)
  82. for est in t.target_estimates:
  83. det_pos = est[0:2]
  84. # det_abs_pos = det_pos+t.position
  85. self.plt_sim.scatter(det_pos[0], det_pos[1],
  86. marker='^', s=2)
  87. for ind in t.prob_map.prob_map:
  88. new_ind = np.array(ind) # - self.map_size
  89. self.plt_pm.scatter(
  90. new_ind[0], new_ind[1], marker='s', s=1, c='r', alpha=t.prob_map.prob_map[ind])
  91. self.plt_pm.annotate(f"{t.prob_map.prob_map[ind]:.3e}", (new_ind[0], new_ind[1]+5),
  92. fontsize=2)
  93. plt.pause(1/self.rate)