본문 바로가기
ros2_python/Ros2 msg,srv,action,parm

ROS2 service blocking 피하는 방법

by 알 수 없는 사용자 2023. 7. 1.

동작 방식 :

1.

main_job

sub_job

service_call_job 이 동시에 돌아감.

 

2.

main_job

sub_job

은 1초마다 프린트함.

 

3.

service_call_job 는 서비스 콜을 동작하는 task

req를 인자로 받고 "로스Client.call_async(req)" 를 쏜 후,

spin_until_future_complete <- 를 통해서 future가 done() 이 될때 까지 대기

done()이 되는 시간이 5초 이상이면 response를 포기하고 "def service_call" Task 종료 처리

 

---------- server는 req만 받고 response 안보내고 무한 슬립하는 코드

---------- client는 0.1초 간격으로 call 요청을 함.

따라서 5 초 후 부터는 Timeout 에러처리가 10개씩 나옴.

 

 

결론 :

call을 받으면 인라인함수(개념상) 에 create_task를 날립니다.

인라인 함수에서는 await 합니다.

service_call에서는 call_async() 함수를 이용해 request를 날린 후,

해당 노드를 executor를 이용해서 spin_until_future_complete 돌립니다. 이 때 쓰레드를 만들어서 돌립니다.

그 후 future.done() 함수를 체킹하는 루프를 돌려 return 값으로 response롤 반환합니다.

5초이네 response를 반환하지 않으면 "Response Timeout 으로 인해 종료처리함."라는 스트링을 반환합니다.

 

import time

import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts

class MyServer(Node):
    def __init__(self):
        super().__init__('my_server')
        self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)

    def add_two_ints_callback(self, request, response):
        response.sum = request.a + request.b
        time.sleep(9999999)
        self.get_logger().info('Received request: {0} + {1}'.format(request.a, request.b))
        self.get_logger().info('Sending response: {0}'.format(response.sum))
        return response

def main(args=None):
    rclpy.init(args=args)
    server = MyServer()

    rclpy.spin(server)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

 

# import contextvars
import functools
import threading
import time
import asyncio

import rclpy
from rclpy.node import Node, Client
from example_interfaces.srv import AddTwoInts


class SimClient:
    def __init__(self):
        self.node:Node
        self.cli:Client
        self.service_queue:asyncio.Queue= asyncio.Queue()
        self.i=0

    def spin_thread_until__future_complete(self, p_node, p_future):
        rclpy.spin_until_future_complete(p_node,p_future)
        print(f"rclpy.spin_until_future_complete 스핀 종료 ! ")
    async def service_reg(self):
        rclpy.init()
        self.node = rclpy.create_node('my_client')
        self.cli = self.node.create_client(AddTwoInts, 'add_two_ints')

        while not self.cli.wait_for_service(timeout_sec=1.0):
            await asyncio.sleep(0.1)
            self.node.get_logger().info('Service not available, waiting...')

    async def service_call(self,param_req):

        # todo 1. rclpy에서 제공하는 Executor를 이용해서 동작하는방법
        _executor = rclpy.executors.MultiThreadedExecutor()
        _executor.add_node(self.node)
        future = self.cli.call_async(param_req)
        _executor_thread = threading.Thread(target=_executor.spin_until_future_complete, args=(future,))
        _executor_thread.start()

        # todo 2. 기존의 threading 방식만 사용하는 방법
        # future = self.cli.call_async(param_req)
        # spin_thread = threading.Thread(target=self.spin_thread_until__future_complete, args=(self.node, future))
        # spin_thread.start()

        return await self.check_future_done(future)


    async def check_future_done(self,_future):
        limit=5.0
        time_out=time.time() + limit

        while rclpy.ok():
            await asyncio.sleep(0.0)
            if time.time() > time_out:
                print(f"{limit}초 동안 response 가 없으므로 타임아웃 ")
                return "Response Timeout 으로 인해 종료처리함."
            if _future.done(): # rclpy.spin() 을 돌리는 상황이면 done() 체킹을 꼭 해줘야함. 왜냐면 done() 상황이 아닌 future는 "future.result()"값을 None 으로 뱉어냄.
                try:
                    response = _future.result()
                except Exception as e:
                    print('Service call failed %r' % (e,))
                else:
                    # print(f'response 값 : {response}')
                    return response
                break

    def main(self):

        _loop = asyncio.get_event_loop()
        _loop.run_until_complete(self.run_task())

    async def run_task(self):

        await asyncio.gather(
            self.main_job(),
            self.sub_job(),
            self.service_call_job(),
            asyncio.sleep(0.0)
        )

    async def main_job(self):
        past=time.time()
        while True:
            await asyncio.sleep(1)
            used_time = time.time() - past

            print(f'main_job role 실행중 걸린시간 {used_time}')

            past = time.time()

    async def sub_job(self):
        past = time.time()
        while True:
            await asyncio.sleep(1)
            used_time = time.time() - past

            print(f'sub_job role 실행중 걸린시간 {used_time}')

            past = time.time()

    async def service_call_job(self):
        past = time.time()
        request = AddTwoInts.Request()
        request.a = 1
        asyncio.create_task(self.service_reg())

        # async def avoid_block(request, used_time): # RESPONSE 가 필요 없는 service call 인경우 이거 사용하면 됨.
        #     _res = await self.service_call(request)
        #     print(f"Service 리턴 값 {_res} 걸린시간 {used_time}")

        while True:
            await asyncio.sleep(0.1)
            used_time = time.time() - past

            request.b = self.i
            self.i += 1
            # asyncio.create_task(avoid_block(request, used_time)) # RESPONSE 가 필요 없는 service call 인경우 이거 사용하면 됨.
            _res = await self.service_call(request)
            print(f"Service 리턴 값 {_res} 걸린시간 {used_time}")

            past = time.time()


if __name__ == '__main__':
    aa = SimClient()

    aa.main()

    # T=100
    # for _ in range(0,T):
    #     __past = time.time()
    #     aa.main()
    #     used_time = time.time() - __past
    #     ave=used_time/T
    # print(f'평균 걸린 시간 {ave}') # <- 같은 Service 네임이라 등록 타이밍이 겹치면 wait 가 좀 걸림

 

 

댓글