0

工具使用-ROS在RVIZ中关于InteractivaMarker的实例(python 版 pong)

 2 years ago
source link: https://zongweizhou1.github.io/2019/06/26/rviz-pong-python/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

工具使用-ROS在RVIZ中关于InteractivaMarker的实例(python 版 pong)

发布 : 2019-06-26 浏览 :
#!/usr/bin/env python
import rospy # python script必须导入该包
import numpy as np # python 环境的包,用于数学运算
from visualization_msgs.msg import * # 一般的消息类型, 包括Marker也属于此包
from interactive_markers.interactive_marker_server import * # 可交互的消息类型,
# 包括 InteractiveMarker
# InteractiveMarkerServer 这是个构造函数,具体见上一篇blog
# InteractiveMarkerControl
# InteractiveMarkerFeedback
from geometry_msgs.msg import Pose # 用于修改marker的position
# const 
FIELD_HEIGHT = 8.0 # 游戏桌面的高度
FIELD_WIDTH = 12.0 # 游戏桌面的宽度
BORDER_SIZE = 0.5 # 边缘的宽度
PADDLE_SIZE = 2.0 # Paddle的长度
UPDATE_RATE = 1.0/30 # 刷新频率
PLAYER_X = FIELD_WIDTH * 0.5 + BORDER_SIZE # 双方paddle的位置
AI_SPEED_LIMIT = 0.25 # ball的初始运动速度

Marker封装

因为该项目中多次创建marker,设置各种属性,代码重复量较大,所以我把marker的创建设置单独提了出来。值得注意的是:C++中pushback操作是进行了copy, 而python中list的append是存储的指针,后面修改了该量,前面已经append的也会发生改变,所以不同的marker在python中需要新创建,而不能像C++那样直接原marker上修改。

class WrapMarker:
def __init__(self):
self.marker = Marker()

def set_color(self, r=1.0, g=1.0, b=1.0, a=1.0): # 设置颜色
self.marker.color.r = r
self.marker.color.g = g
self.marker.color.b = b
self.marker.color.a = a

def set_type(self, type_name): # 设置类型, Marker.CUBE
self.marker.type = type_name

def set_scale(self, x=0.0, y=0.0, z=0.0): # 设置尺度
self.marker.scale.x = x
self.marker.scale.y = y
self.marker.scale.z = z

def set_position(self, x=0.0, y=0.0, z=0.0): # 设置位置
self.marker.pose.position.x = x
self.marker.pose.position.y = y
self.marker.pose.position.z = z

def get_marker(self): # 得到marker实例
return self.marker

当然, marker还有许多其它的属性可以设置,比如pose的方向等,这里仅定义了能用到的函数。

每一个选手都要存储一些自己的状态量,比如当前位置,自己的分数,自己状态,所以定义一个结构体

class PlayerContext:
def __init__(self, pos=0, active=False, score=0):
self.pos=pos
self.active = active
self.score = score

游戏主代码

我们先来介绍下游戏主要的部分,也是最顶层的东西,然后再介绍每一块的功能实现。

class PongGame:
def __init__(self, ns='pong'):
self.server_ = InteractiveMarkerServer(ns) # 创建一个交互的后台进程,命名空间ns
self.player_contexts_ = [PlayerContext() for _ in range(2)] # 两名选手
# 下面几个量定义球的状态
self.last_ball_pos_x_ = 0 # 球在上一时刻的位置x
self.last_ball_pos_y_ = 0 # 球在上一时刻的位置y
self.ball_pos_x_ = 0 # 球当前的位置x
self.ball_pos_y_ = 0 # 球当前的位置y
self.ball_dir_x_ = 0 # dir两个量是归一化后的球的运动方向
self.ball_dir_y_ = 0
self.speed_ = 0 # 球的运动速度

self.makeFieldMarker() # 初始化运动场地
self.makePaddleMarkers() # 初始化选手的paddle
self.makeBallMarker() # 初始化球的位置

self.reset() # 重置
self.updateScore() # 每个选手的分数更新
rospy.Timer(rospy.Duration(UPDATE_RATE), self.spinOnce)
# 这是一个时钟循环函数,每隔固定的间隔调用函数spinOnce, 所以刷新频率也是这里设置的

def spinOnce(self, event): # 注意这里有两个输入参数, event是TimerEvent类型
if self.player_contexts_[0].active or self.player_contexts_[1].active:
ball_dx, ball_dy = self.speed_ * self.ball_dir_x_, self.speed_ * self.ball_dir_y_ # 下一步球的绝对位移
self.ball_pos_x_ += ball_dx # 下一步球的位置
self.ball_pos_y_ += ball_dy

out_border, t = self.reflect(self.ball_pos_y_, self.last_ball_pos_y_, FIELD_HEIGHT*0.5) # 判断球是否超出范围,超出的话超出了多少,这个t就是超出比例,用于reflect时退回超出部分
if out_border:# 超出范围
self.ball_pos_x_ -= t * ball_dx # 退回
self.ball_pos_y_ -= t * ball_dy
self.ball_dir_y_ *= -1.0 # 这部分判断是超出y方向的边界,所以换方向应该是y分量取反
ball_dx, ball_dy = self.speed_ * self.ball_dir_x_, self.speed_*self.ball_dir_y_ # 重新计算x,y轴运动量
self.ball_pos_x_ += t * ball_dx # 新的位置
self.ball_pos_y_ += t * ball_dy

player = 1 if self.ball_pos_x_ > 0 else 0 # 判断当前球在哪一侧,右边运动员为1
# reflect on paddles
if abs(self.last_ball_pos_x_) < FIELD_WIDTH * 0.5 and abs(self.ball_pos_x_) >= FIELD_WIDTH * 0.5: # 上一时刻在paddle的内测,当前时刻在paddle的外侧,这时候需要判断是否打在paddle上
if self.ball_pos_y_ > self.player_contexts_[player].pos - (PADDLE_SIZE + BORDER_SIZE) *0.5 and \
self.ball_pos_y_ < self.player_contexts_[player].pos + (PADDLE_SIZE + BORDER_SIZE)*0.5: # 在paddle区间上,认为打在paddle上,进行反弹,反弹和打到上下边缘一样
_, t = self.reflect(self.ball_pos_x_, self.last_ball_pos_x_, FIELD_WIDTH*0.5)
self.ball_pos_x_ -= t* ball_dx
self.ball_pos_y_ -= t* ball_dy
# change direction based on distance to paddle center, 一定规律修改反射方向
offset = (self.ball_pos_y_ - self.player_contexts_[player].pos)/PADDLE_SIZE
self.ball_dir_x_ *= -1.0
self.ball_dir_y_ += offset * 2.0
self.normalizeVel() # 速度归一化
if abs(self.ball_dir_y_) > np.sqrt(2)/2.0:
# 处理速度异常,如果速度过大,就重置45度角
self.ball_dir_x_ = 1.0 if self.ball_dir_x_ > 0 else -1.0
self.ball_dir_y_ = 1.0 if self.ball_dir_y_ > 0 else -1.0
self.normalizeVel()
ball_dx, ball_dy = self.speed_*self.ball_dir_x_, self.speed_*self.ball_dir_y_
self.ball_pos_x_ += t * ball_dx
self.ball_pos_y_ += t * ball_dy
# not hit the paddles, 球没有被paddle接到,这个时候认为一个回合结束
if abs(self.ball_pos_x_) >= FIELD_WIDTH*0.5 + 1.5 * BORDER_SIZE:
_, t = self.reflect(self.ball_pos_x_, self.last_ball_pos_x_, FIELD_WIDTH*0.5+1.5*BORDER_SIZE)
self.ball_pos_x_ -= t * ball_dx
self.ball_pos_y_ -= t * ball_dy
self.updateBall() # 更新求得位置
self.player_contexts_[1-player].score += 1 # 对方得分
self.updateScore() # 更新分数
self.server_.applyChanges() # 让交互后台开始动起来
self.reset() # 重置桌面,paddle位置并没变化
rospy.sleep(rospy.Duration(1.0))
else:
self.updateBall() # ball在运动中,更新位置即可

self.last_ball_pos_x_ = self.ball_pos_x_ # 更新上一时刻球的位置
self.last_ball_pos_y_ = self.ball_pos_y_

# control computer player 控制AI选手
if not self.player_contexts_[0].active or not self.player_contexts_[1].active:
player = 1 if self.player_contexts_[0].active else 0
delta = self.ball_pos_y_ - self.player_contexts_[player].pos
delta = max(min(delta, AI_SPEED_LIMIT), -AI_SPEED_LIMIT)
# paddle的速度是有上限的,所以只要求得速度够快,AI就会失败,这个其实应该加些扰动
# AI 选手自动更新paddle位置,此时只有某些特殊情形下AI才会输
self.setPaddlePos(player, self.player_contexts_[player].pos + delta)

self.speed_ += 0.0003
self.server_.applyChanges()

TimerEvent的属性:

last_expected: 上一次调用的期望时间

last_real: 上一次调用的真实时间

current_expected: 当前调用的期望时间

current_real: 当前调用的真实时间

last_duration: 两次调用的时间间隔

静态场景包括桌面的搭建,paddle和ball的位置。

def makeFieldMarker(self):
init_marker = InteractiveMarker()
init_marker.header.frame_id = "base_link"
init_marker.name = "field"

control = InteractiveMarkerControl()
control.always_visible = True
for i in range(4):
wrap_marker = WrapMarker()
wrap_marker.set_type(Marker.CUBE)
wrap_marker.set_color()
if i<2: # i=0 上边, i=1 下边
wrap_marker.set_scale(FIELD_WIDTH + 6.0 * BORDER_SIZE, BORDER_SIZE, BORDER_SIZE)
y = FIELD_HEIGHT * 0.5 + BORDER_SIZE if i == 0 else -(FIELD_HEIGHT * 0.5 + BORDER_SIZE)
wrap_marker.set_position(y=y)
else: # i=2 右边, i=3 左边
wrap_marker.set_scale(BORDER_SIZE, FIELD_HEIGHT + 3.0 * BORDER_SIZE, BORDER_SIZE)
x = FIELD_WIDTH*0.5 + 2.5 * BORDER_SIZE if i==2 else -(FIELD_WIDTH*0.5 + 2.5 * BORDER_SIZE)
wrap_marker.set_position(x=x)

control.markers.append(wrap_marker.get_marker())
init_marker.controls.append(control)
self.server_.insert(init_marker) # 一个interactiveMarker接收一个消息

def makeBallMarker(self):
int_marker = InteractiveMarker()
int_marker.header.frame_id = "base_link"
int_marker.name = 'ball'

control = InteractiveMarkerControl()
control.always_visible = True
control.interaction_mode = InteractiveMarkerControl.NONE
control.orientation.w = 1
control.orientation.y = 1

wrap_marker = WrapMarker()
wrap_marker.set_color()
wrap_marker.set_type(Marker.CYLINDER)
wrap_marker.set_scale(BORDER_SIZE, BORDER_SIZE, BORDER_SIZE) # 圆柱

control.markers.append(wrap_marker.get_marker())
int_marker.controls.append(control)
self.server_.insert(int_marker)

def makePaddleMarkers(self):
def create_int_marker(name, visible=False, control_mode=0, PPX=PLAYER_X,
r=0.0, g=1.0, b=1.0, a=1.0, sx=0., sy=0., sz=0., px=0, py=0, pz=0):
int_marker = InteractiveMarker()
int_marker.header.frame_id = "base_link"
int_marker.name = name

control = InteractiveMarkerControl()
control.always_visible = visible
control.interaction_mode = control_mode
control.orientation.w = 1
control.orientation.z = 1

wrap_marker = WrapMarker()
wrap_marker.set_type(Marker.CUBE)
wrap_marker.set_color(r=r, g=g, b=b, a=a)
wrap_marker.set_scale(sx, sy, sz)
wrap_marker.set_position(x=px, y=py, z=pz)
control.markers.append(wrap_marker.get_marker())
int_marker.controls.append(control)
int_marker.pose.position.x = PPX
return int_marker

paddle0_marker = create_int_marker("paddle0", visible=False, control_mode=InteractiveMarkerControl.MOVE_AXIS,
r=1., g=1, b=1, a=0, sx=BORDER_SIZE+0.1, sy=PADDLE_SIZE+0.1, sz=BORDER_SIZE+0.1,
PPX=-PLAYER_X, px=0, py=0, pz=0)
self.server_.insert(paddle0_marker, self.processPaddleFeedback)

paddle1_marker = create_int_marker("paddle1", visible=False, control_mode=InteractiveMarkerControl.MOVE_AXIS,
r=1., g=1, b=1, a=0, PPX=PLAYER_X, px=0, py=0, pz=0,
sx = BORDER_SIZE+0.1, sy=PADDLE_SIZE+0.1, sz=BORDER_SIZE+0.1)
self.server_.insert(paddle1_marker, self.processPaddleFeedback)

paddle0_marker_disp = create_int_marker("paddle0_display", visible=True, control_mode=InteractiveMarkerControl.NONE,
r=0.5, g=1, b=0.5, a=1, PPX=-PLAYER_X, px=0, py=0, pz=0,
sx = BORDER_SIZE, sy=PADDLE_SIZE, sz=BORDER_SIZE)
self.server_.insert(paddle0_marker_disp)

paddle1_marker_disp = create_int_marker("paddle1_display", visible=True, control_mode=InteractiveMarkerControl.NONE,
r=0.5, g=0.5, b=1.0, a=1, PPX=PLAYER_X, px=0, py=0, pz=0,
sx = BORDER_SIZE, sy=PADDLE_SIZE, sz=BORDER_SIZE)
self.server_.insert(paddle1_marker_disp)

这个里面其实需要特别注意。我们会发现每一个paddle都定义了两个交互marker,其中绑定回调函数的透明度为0,即看不到,而没绑回调函数的透明度为1.这是因为当使用鼠标控制InteractiveMarker时,marker是随着鼠标运动的,没办法控制她在某个区域内运动,即在这个例子中没法限制该marker在桌面内。

比如我们将paddle0设置代码修改为:

paddle0_marker = create_int_marker("paddle0", visible=False, control_mode=InteractiveMarkerControl.MOVE_3D,
r=1., g=1, b=1, a=1.0, sx=BORDER_SIZE+0.1, sy=PADDLE_SIZE+0.1, sz=BORDER_SIZE+0.1,
PPX=-PLAYER_X, px=0, py=0, pz=0)
self.server_.insert(paddle0_marker, self.processPaddleFeedback)

然后,就会发现paddle0会随着鼠标3D空间跑。

rviz_pong_3d

另外 visible这个参数不是说该marker是否可见,而是说当别的control发生时,该marker是否可见,比如将paddle0的visible=Flase, 那么当拖动paddle1时, paddle0就看不见了。可以试试看。

这部分主要是paddle marker发生变化是对应的paddle的位置如何变化,值得注意的是可见的paddle是有边界限制的,即必须在球台上。

def setPaddlePos(self, player, pos):
if player > 1:
return
pos = min(pos, (FIELD_HEIGHT - PADDLE_SIZE) * 0.5)
pos = max(pos, -(FIELD_HEIGHT - PADDLE_SIZE) * 0.5)
self.player_contexts_[player].pos = pos

pose = Pose()
pose.position.x = -PLAYER_X if player==0 else PLAYER_X
pose.position.y = pos

marker_name = "paddle0" if player==0 else "paddle1"
self.server_.setPose(marker_name, pose) # 影子和对应的paddle都变化
self.server_.setPose(marker_name+"_display", pose)

def processPaddleFeedback(self, feedback):
if feedback.marker_name == 'paddle0':
player=0 # 当前选择的是那个选手
elif feedback.marker_name == 'paddle1':
player = 1
else:
return

control_marker_name = feedback.marker_name
pose = feedback.pose

self.setPaddlePos(player, pose.position.y)
if feedback.event_type == InteractiveMarkerFeedback.MOUSE_DOWN:
self.player_contexts_[player].active = True # 表示paddle是否按下
if feedback.event_type == InteractiveMarkerFeedback.MOUSE_UP:
self.player_contexts_[player].active = False

重新开始是将球放回中间位置,然后求得初始速度重新设定。

def reset(self):
self.speed_ = 6. * UPDATE_RATE
self.ball_pos_x_, self.ball_pos_y_ = 0.0, 0.0
self.ball_dir_x_ = 1.0 if self.ball_pos_x_ > 0 else -1.0
self.ball_dir_y_ = 1.0 if np.random.randint(100)%2==0 else -1.0
self.normalizeVel()

def normalizeVel(self):
l = np.sqrt(self.ball_dir_x_**2 + self.ball_dir_y_**2)
self.ball_dir_x_ /= l
self.ball_dir_y_ /= l

这部分主要更新球的状态,当前分数和判断是否碰到边界。

def reflect(self, pos, last_pos, limit):
if pos > limit or pos < -limit:
t = (pos - np.sign(pos) * limit) / (pos - last_pos)
return True, t
return False, None

def updateBall(self):
pose = Pose()
pose.position.x, pose.position.y = self.ball_pos_x_, self.ball_pos_y_
self.server_.setPose("ball", pose)

def updateScore(self):
int_marker = InteractiveMarker()
int_marker.header.frame_id = "base_link"
int_marker.name = 'score'
control = InteractiveMarkerControl()
control.always_visible = True
for i in range(2):
wrap_marker = WrapMarker()
wrap_marker.set_type(Marker.TEXT_VIEW_FACING)
wrap_marker.set_color()
wrap_marker.set_scale(1.5, 1.5, 1.5)
wrap_marker.set_position(x=(2*i-1)*(FIELD_WIDTH * 0.5 + BORDER_SIZE), y=FIELD_HEIGHT*0.5 + 4* BORDER_SIZE)
marker = wrap_marker.get_marker()
marker.text = str(self.player_contexts_[i].score)
control.markers.append(marker)

int_marker.controls.append(control)
self.server_.insert(int_marker)
if __name__=='__main__':
rospy.init_node('pong')
pong_game = PongGame()
rospy.spin() # 这个必须调用,只有它调用的时候, Timer才会定时启动刷新
rospy.loginfo("Exiting..")

实现起来还是比较简单的。通过将C++代码转成python代码,更好的理解了中间代码执行过程。尤其值得注意的地方就是该代码设置paddle过程中分别配置了影子paddle,用于处理接收响应。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK