如何使用ROS1的rosbag进行详细操作并利用Python脚本合并多个bag文件?

2026-04-19 21:542阅读0评论SEO资源
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2408个文字,预计阅读时间需要10分钟。

如何使用ROS1的rosbag进行详细操作并利用Python脚本合并多个bag文件?

目录 + rosbag 命令列表 + record 命令使用 + info 指令使用 + play 指令使用 + fix 指令使用 + filter 指令使用 + compress 指令使用

2.使用Python合并包代码 + 在使用ros时,经常会用到rosbag来录制或回放数据

目录
  • rosbag 命令列表
    • record 命令使用
    • info 指令使用
    • play 指令使用
    • fix 指令使用
    • filter 指令使用
    • compress 指令使用
  • 2. 使用Python合并包代码

    在使用ros的时候经常会用到rosbag来录制或者回放算法,是个非常有用的工具。

    rosbag 命令列表

    命令作用record录制一个包,并且指定topicinfo总结一个包的详细信息play回放一个或者多个包,并且可以指定topiccheck确定一个包是否可以在当前系统中播放,或者是否可以迁移fix修复一个包使得它能在当前系统中播放filter通过python脚本转换包内容compress压缩包decompress解压缩包reindex重新索引一个或者多个损坏的包

    record 命令使用

    record <topic-names> # 指定一个或者多个topic来录制

    $ rosbag record body_status gnss_imu

    record -a, --all # 录制所有topic

    $ rosbag record -a

    record -e, --regex # 录制使用正则表达式匹配的topic

    $ rosbag record -e "/(.*)_log/point"

    rocord -p, -publish # (Melodic 新特性)当开始录制包时,发布一个topic=”begin_write“

    $ rosbag record -p

    record -x, --exclude # 录制时排除正则表达式匹配的topic

    $ rosbag record -e "/(.*)_log/point" -x "(.*)/point"

    record -q, --quiet # 录制时抑制指定topic的log输出

    $ rosbag record -q /body_status

    record -d, --duration # 指定topic录制最大时常

    $ rosbag record -d 30 /body_status $ rosbag record -d 3m /body_status $ rosbag record -d 3h /body_status

    record -o, --output-prefix # 输出包名前加上前缀

    $ rosbag record -o output /topic_name /topic_name2

    record -O, --output-name # 指定输出包名

    $ rosbag record -O output.bag /topic_name /topic_name2

    record --split # 每隔设定的时常或者大小分割包录制

    $ rosbag record --split --size=1024 /topic_name $ rosbag record --split --duration=30 /topic_name $ rosbag record --split --duration=5m /topic_name $ rosbag record --split --duration=2h /topic_name

    record --max-splits # (Kinetic 新特性)分割包的最大个数

    $ rosbag record --split --max-splits 3 --duration=30 /topic_name

    record -b, --buffsize # 使用内部缓冲区(默认值:256MB,无限为0)

    $ rosbag record -b 1024 /topic_name

    record --chunksize # 开辟缓冲区,比buffsize更为先进(默认值: 768KB,无限为0)

    $ rosbag record --chunksize=1024 /topic_name

    record -l, --limit # 在topic中只录制指定数量的消息

    $ rosbag record -l 1000 /topic_name

    record --node # 记录指定节点订阅的所有消息

    $ rosbag record --node=/node

    record -j, --bz2 # 设置录制以bz2格式压缩

    $ rosbag record -j /topic_name

    record --lz4 # 设置录制以lz4格式压缩

    $ rosbag record --lz4 /topic_name

    • record --tcpnodelay # 订阅主题时使用TCP_NODELAY传输 参考资料
    • record --udp # 订阅主题时使用UDP传输

    info 指令使用

    info <bag-files> # 查看一个或者多个包的详细信息

    $ rosbag info test.bag

    info -y, --yaml # 以yaml格式输出一个或者多个包的详细信息

    $ rosbag info -y test.bag

    info -k, --key # 查看以yaml格式输出,包内key值对应的数据

    $ rosbag info -y -k topics test.bag

    play 指令使用

    play <bag_name> # 播放一个或者多个数据包

    $ rosbag play test.bag

    play -p, --prefix # 为所有输出主题添加前缀

    $ rosbag play -p test test.bag

    play -q, --quiet # 抑制控制台输出

    $ rosbag play -q test.bag

    play -i, --immediate # 播放包不等待

    $ rosbag play -i test.bag

    play --pause # 开始播放包时暂停

    $ rosbag play --pause test.bag

    play --queue # 播放包时以设置的队列大小播放 默认值为100

    $ rosbag play --queue=1000 test.bag

    play --clock # 自动发布一个clocktopic

    $ rosbag play --clock test.bag

    play --hz # 以指定赫兹发布clock topic

    $ rosbag play --hz=1 --clock test.bag

    play -d, --delay # 指定延迟播放的时间

    $ rosbag -d 5 test.bag

    play -r, --rate # 指定播放速度

    $ rosbag -r 10 test.bag

    play -s, --start # 指定开始播放时间

    $ rosbag play -s 5 test.bag

    play -u,--duration # 设定播放时间

    $ rosbag play -u 240 test.bag

    play --skip-empty # 设置跳过没有消息的数据的时间

    $ rosbag play --skip-empty=1 test.bag

    play -l, --loop # 循环播放

    $ rosbag play -l test.bag

    play -k, --keep-alive # 播放时保持活跃状态

    $ rosbag play -k test.bag

    play --try-future-version # 即使不知道版本号也可以打开bag

    $ rosbag play --try-future-version test.bag

    play --topics # 指定对应topics进行播放

    $ rosbag play test.bag --topics /topic1 /topic2

    play --pause-topics # 当回放数据时,暂停指定topics

    $ rosbag play test.bag --topics /topic1 /topic2 --pause-topics

    play --bags # 指定多个bags进行回放

    $ rosbag play --bags=test.bag

    play --wait-for-subscribers # 再发布之前,等待每个主题至少有一个订阅者play --rate-control-topic= # 观看给定的主题,如果上一次发布时间超过了<速率控制最大延迟>,请等待该主题再次发布后再继续播放play --rate-control-max-delay= # 暂停前与<速率控制主题>的最大时间差 check 指令使用 check <file> # 确定一个包在当前系统中是否可以播放。

    $ rosbag check test.bag

    check -g, --genrules # 生成一个名为RULEFILE的迁移规则文件

    $ rosbag check -g diagnostics.bmr test.bag

    check -a, --append # 加载后附加到现有数据迁移规则文件的末尾。

    $ rosbag check -a -g diagnostics.bmr test.bag

    check -n, --noplugins # 不加载规则文件

    $ rosbag check -n test.bag

    fix 指令使用

    fix <in-bag> <out-bag> [rules.bmr] # 使用规则文件修复数据包

    $ rosbag fix old.bag repaired.bag myrules.bmr

    fix -n, --noplugins # 修复数据包不带规则文件

    如何使用ROS1的rosbag进行详细操作并利用Python脚本合并多个bag文件?

    $ rosbag fix -n old.bag repaired.bag

    filter 指令使用

    filter <in-bag> <out-bag> <expression> # 使用给定的Python表达式转换数据文件。

    $ rosbag filter my.bag only-tf.bag "topic == '/tf'"

    compress 指令使用

    compress <bag-files> # 使用BZ2格式压缩包

    $ rosbag compress *.bag

    compress --output-dir=DIR # 指定路径输出文件

    $ rosbag compress *.bag --output-dir=compressed

    compress -f, --force # 如果已经存在包,则强制覆盖

    $ rosbag compress -f *.bag

    compress -q, --quiet # 抑制非关键信息

    $ rosbag compress -q *.bag

    compress -j, --bz2 # 使用bz2格式压缩包

    $ rosbag compress -j *.bag

    compress --lz4 # 使用lz4格式压缩包

    $ rosbag compress --lz4 *.bag

    decompress 指令使用 decompress <bag-files> # 解压缩包

    $ rosbag decompress *.bag

    decompress --output-dir=DIR # 指定解压缩的路径

    $ rosbag decompress --output-dir=uncompressed *.bag

    decompress -f, --force # 如果已经存在包,则强制覆盖

    $ rosbag decompress -f *.bag

    decompress -q, --quiet # 抑制非关键信息

    $ rosbag decompress -q *.bag

    reindex 指令使用 reindex <bag-files> # 修复坏包

    $ rosbag reindex *.bag

    reindex --output-dir=DIR # 指定路径

    $ rosbag reindex --output-dir=reindexed *.bag

    reindex -f, --force # 如果已经存在包,则强制覆盖

    $ rosbag reindex -f *.bag

    reindex -q, --quiet # 抑制非关键信息

    $ rosbag reindex -q *.bag

    2. 使用Python合并包代码

    #! /usr/bin/env python3 import os import time import argparse from rosbag import Bag, Compression def parse_compression(compression): if compression == "none" or compression == "NONE": compression = Compression.NONE elif compression == "bz2": compression = Compression.BZ2 elif compression == "lz4": compression = Compression.LZ4 return compression def get_files_list(arg_input, arg_select): files = None if " " in arg_input: files = arg_input.split(" ") elif "," in arg_input: files = arg_input.split(",") elif os.path.exists(args.input) or "*" in args.input: rfind_index = arg_input.rfind("/") dir_path = arg_input[:rfind_index] files = os.listdir(dir_path) files.sort() # 名称排序 files = [dir_path + "/" + file for file in files] if arg_select: files = select_bags(files) print("bags list:") for i in range(len(files)): print(str(i) + "." + files[i]) return files def select_bags(files): for i in range(len(files)): print(str(i) + "." + files[i]) print("Please input bag numbers to merge. split by , or space", end=":") s_b = input() s_b_list = [] if "," in s_b: s_b_list = s_b.split(",") elif " " in s_b: s_b_list = s_b.split(" ") files = [files[int(x)] for x in s_b_list] return files def show_process_bar(total, i, start): a = "*" * i b = "." * (total - i) c = (i / total) * 100 dur = time.perf_counter() - start print("\r{:^3.0f}%[{}->{}]{:.2f}s".format(c,a,b,dur), end="") def merge_bags(args): print("start merge bags.") compression = parse_compression(args.compression) print(f"bag's compression mode is {compression}.") files = get_files_list(args.input, args.select) with Bag(args.output, "w", compression=compression) as o: start = time.perf_counter() for i in range(len(files)): show_process_bar(len(files), i+1, start) with Bag(files[i], "r") as ib: for topic, msg, t in ib: o.write(topic, msg, t) show_process_bar(len(files), i+1, start) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Merge one or more bag files to one file.") parser.add_argument("-o", "--output", help="Output bag's file path.", default="output.bag", required=True) parser.add_argument("-i", "--input", help="Input files bags name or path, split by , .", required=True) parser.add_argument("-c", "--compression", help="Compress the bag by bz2 or lz4", default="lz4", choices=["none", "lz4", "bz2"]) parser.add_argument("-s", "--select", help="Select bags to merge.", default=False) parser.add_argument("-v", "--verbose", help="Show the verbose msg.") args = parser.parse_args() merge_bags(args)

    Reference:

    wiki.ros.org/rosbag/Commandline

    到此这篇关于ROS1rosbag的详细使用,并且使用python来合并bag包的文章就介绍到这了,更多相关ROS1rosbag使用内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!

    本文共计2408个文字,预计阅读时间需要10分钟。

    如何使用ROS1的rosbag进行详细操作并利用Python脚本合并多个bag文件?

    目录 + rosbag 命令列表 + record 命令使用 + info 指令使用 + play 指令使用 + fix 指令使用 + filter 指令使用 + compress 指令使用

    2.使用Python合并包代码 + 在使用ros时,经常会用到rosbag来录制或回放数据

    目录
    • rosbag 命令列表
      • record 命令使用
      • info 指令使用
      • play 指令使用
      • fix 指令使用
      • filter 指令使用
      • compress 指令使用
    • 2. 使用Python合并包代码

      在使用ros的时候经常会用到rosbag来录制或者回放算法,是个非常有用的工具。

      rosbag 命令列表

      命令作用record录制一个包,并且指定topicinfo总结一个包的详细信息play回放一个或者多个包,并且可以指定topiccheck确定一个包是否可以在当前系统中播放,或者是否可以迁移fix修复一个包使得它能在当前系统中播放filter通过python脚本转换包内容compress压缩包decompress解压缩包reindex重新索引一个或者多个损坏的包

      record 命令使用

      record <topic-names> # 指定一个或者多个topic来录制

      $ rosbag record body_status gnss_imu

      record -a, --all # 录制所有topic

      $ rosbag record -a

      record -e, --regex # 录制使用正则表达式匹配的topic

      $ rosbag record -e "/(.*)_log/point"

      rocord -p, -publish # (Melodic 新特性)当开始录制包时,发布一个topic=”begin_write“

      $ rosbag record -p

      record -x, --exclude # 录制时排除正则表达式匹配的topic

      $ rosbag record -e "/(.*)_log/point" -x "(.*)/point"

      record -q, --quiet # 录制时抑制指定topic的log输出

      $ rosbag record -q /body_status

      record -d, --duration # 指定topic录制最大时常

      $ rosbag record -d 30 /body_status $ rosbag record -d 3m /body_status $ rosbag record -d 3h /body_status

      record -o, --output-prefix # 输出包名前加上前缀

      $ rosbag record -o output /topic_name /topic_name2

      record -O, --output-name # 指定输出包名

      $ rosbag record -O output.bag /topic_name /topic_name2

      record --split # 每隔设定的时常或者大小分割包录制

      $ rosbag record --split --size=1024 /topic_name $ rosbag record --split --duration=30 /topic_name $ rosbag record --split --duration=5m /topic_name $ rosbag record --split --duration=2h /topic_name

      record --max-splits # (Kinetic 新特性)分割包的最大个数

      $ rosbag record --split --max-splits 3 --duration=30 /topic_name

      record -b, --buffsize # 使用内部缓冲区(默认值:256MB,无限为0)

      $ rosbag record -b 1024 /topic_name

      record --chunksize # 开辟缓冲区,比buffsize更为先进(默认值: 768KB,无限为0)

      $ rosbag record --chunksize=1024 /topic_name

      record -l, --limit # 在topic中只录制指定数量的消息

      $ rosbag record -l 1000 /topic_name

      record --node # 记录指定节点订阅的所有消息

      $ rosbag record --node=/node

      record -j, --bz2 # 设置录制以bz2格式压缩

      $ rosbag record -j /topic_name

      record --lz4 # 设置录制以lz4格式压缩

      $ rosbag record --lz4 /topic_name

      • record --tcpnodelay # 订阅主题时使用TCP_NODELAY传输 参考资料
      • record --udp # 订阅主题时使用UDP传输

      info 指令使用

      info <bag-files> # 查看一个或者多个包的详细信息

      $ rosbag info test.bag

      info -y, --yaml # 以yaml格式输出一个或者多个包的详细信息

      $ rosbag info -y test.bag

      info -k, --key # 查看以yaml格式输出,包内key值对应的数据

      $ rosbag info -y -k topics test.bag

      play 指令使用

      play <bag_name> # 播放一个或者多个数据包

      $ rosbag play test.bag

      play -p, --prefix # 为所有输出主题添加前缀

      $ rosbag play -p test test.bag

      play -q, --quiet # 抑制控制台输出

      $ rosbag play -q test.bag

      play -i, --immediate # 播放包不等待

      $ rosbag play -i test.bag

      play --pause # 开始播放包时暂停

      $ rosbag play --pause test.bag

      play --queue # 播放包时以设置的队列大小播放 默认值为100

      $ rosbag play --queue=1000 test.bag

      play --clock # 自动发布一个clocktopic

      $ rosbag play --clock test.bag

      play --hz # 以指定赫兹发布clock topic

      $ rosbag play --hz=1 --clock test.bag

      play -d, --delay # 指定延迟播放的时间

      $ rosbag -d 5 test.bag

      play -r, --rate # 指定播放速度

      $ rosbag -r 10 test.bag

      play -s, --start # 指定开始播放时间

      $ rosbag play -s 5 test.bag

      play -u,--duration # 设定播放时间

      $ rosbag play -u 240 test.bag

      play --skip-empty # 设置跳过没有消息的数据的时间

      $ rosbag play --skip-empty=1 test.bag

      play -l, --loop # 循环播放

      $ rosbag play -l test.bag

      play -k, --keep-alive # 播放时保持活跃状态

      $ rosbag play -k test.bag

      play --try-future-version # 即使不知道版本号也可以打开bag

      $ rosbag play --try-future-version test.bag

      play --topics # 指定对应topics进行播放

      $ rosbag play test.bag --topics /topic1 /topic2

      play --pause-topics # 当回放数据时,暂停指定topics

      $ rosbag play test.bag --topics /topic1 /topic2 --pause-topics

      play --bags # 指定多个bags进行回放

      $ rosbag play --bags=test.bag

      play --wait-for-subscribers # 再发布之前,等待每个主题至少有一个订阅者play --rate-control-topic= # 观看给定的主题,如果上一次发布时间超过了<速率控制最大延迟>,请等待该主题再次发布后再继续播放play --rate-control-max-delay= # 暂停前与<速率控制主题>的最大时间差 check 指令使用 check <file> # 确定一个包在当前系统中是否可以播放。

      $ rosbag check test.bag

      check -g, --genrules # 生成一个名为RULEFILE的迁移规则文件

      $ rosbag check -g diagnostics.bmr test.bag

      check -a, --append # 加载后附加到现有数据迁移规则文件的末尾。

      $ rosbag check -a -g diagnostics.bmr test.bag

      check -n, --noplugins # 不加载规则文件

      $ rosbag check -n test.bag

      fix 指令使用

      fix <in-bag> <out-bag> [rules.bmr] # 使用规则文件修复数据包

      $ rosbag fix old.bag repaired.bag myrules.bmr

      fix -n, --noplugins # 修复数据包不带规则文件

      如何使用ROS1的rosbag进行详细操作并利用Python脚本合并多个bag文件?

      $ rosbag fix -n old.bag repaired.bag

      filter 指令使用

      filter <in-bag> <out-bag> <expression> # 使用给定的Python表达式转换数据文件。

      $ rosbag filter my.bag only-tf.bag "topic == '/tf'"

      compress 指令使用

      compress <bag-files> # 使用BZ2格式压缩包

      $ rosbag compress *.bag

      compress --output-dir=DIR # 指定路径输出文件

      $ rosbag compress *.bag --output-dir=compressed

      compress -f, --force # 如果已经存在包,则强制覆盖

      $ rosbag compress -f *.bag

      compress -q, --quiet # 抑制非关键信息

      $ rosbag compress -q *.bag

      compress -j, --bz2 # 使用bz2格式压缩包

      $ rosbag compress -j *.bag

      compress --lz4 # 使用lz4格式压缩包

      $ rosbag compress --lz4 *.bag

      decompress 指令使用 decompress <bag-files> # 解压缩包

      $ rosbag decompress *.bag

      decompress --output-dir=DIR # 指定解压缩的路径

      $ rosbag decompress --output-dir=uncompressed *.bag

      decompress -f, --force # 如果已经存在包,则强制覆盖

      $ rosbag decompress -f *.bag

      decompress -q, --quiet # 抑制非关键信息

      $ rosbag decompress -q *.bag

      reindex 指令使用 reindex <bag-files> # 修复坏包

      $ rosbag reindex *.bag

      reindex --output-dir=DIR # 指定路径

      $ rosbag reindex --output-dir=reindexed *.bag

      reindex -f, --force # 如果已经存在包,则强制覆盖

      $ rosbag reindex -f *.bag

      reindex -q, --quiet # 抑制非关键信息

      $ rosbag reindex -q *.bag

      2. 使用Python合并包代码

      #! /usr/bin/env python3 import os import time import argparse from rosbag import Bag, Compression def parse_compression(compression): if compression == "none" or compression == "NONE": compression = Compression.NONE elif compression == "bz2": compression = Compression.BZ2 elif compression == "lz4": compression = Compression.LZ4 return compression def get_files_list(arg_input, arg_select): files = None if " " in arg_input: files = arg_input.split(" ") elif "," in arg_input: files = arg_input.split(",") elif os.path.exists(args.input) or "*" in args.input: rfind_index = arg_input.rfind("/") dir_path = arg_input[:rfind_index] files = os.listdir(dir_path) files.sort() # 名称排序 files = [dir_path + "/" + file for file in files] if arg_select: files = select_bags(files) print("bags list:") for i in range(len(files)): print(str(i) + "." + files[i]) return files def select_bags(files): for i in range(len(files)): print(str(i) + "." + files[i]) print("Please input bag numbers to merge. split by , or space", end=":") s_b = input() s_b_list = [] if "," in s_b: s_b_list = s_b.split(",") elif " " in s_b: s_b_list = s_b.split(" ") files = [files[int(x)] for x in s_b_list] return files def show_process_bar(total, i, start): a = "*" * i b = "." * (total - i) c = (i / total) * 100 dur = time.perf_counter() - start print("\r{:^3.0f}%[{}->{}]{:.2f}s".format(c,a,b,dur), end="") def merge_bags(args): print("start merge bags.") compression = parse_compression(args.compression) print(f"bag's compression mode is {compression}.") files = get_files_list(args.input, args.select) with Bag(args.output, "w", compression=compression) as o: start = time.perf_counter() for i in range(len(files)): show_process_bar(len(files), i+1, start) with Bag(files[i], "r") as ib: for topic, msg, t in ib: o.write(topic, msg, t) show_process_bar(len(files), i+1, start) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Merge one or more bag files to one file.") parser.add_argument("-o", "--output", help="Output bag's file path.", default="output.bag", required=True) parser.add_argument("-i", "--input", help="Input files bags name or path, split by , .", required=True) parser.add_argument("-c", "--compression", help="Compress the bag by bz2 or lz4", default="lz4", choices=["none", "lz4", "bz2"]) parser.add_argument("-s", "--select", help="Select bags to merge.", default=False) parser.add_argument("-v", "--verbose", help="Show the verbose msg.") args = parser.parse_args() merge_bags(args)

      Reference:

      wiki.ros.org/rosbag/Commandline

      到此这篇关于ROS1rosbag的详细使用,并且使用python来合并bag包的文章就介绍到这了,更多相关ROS1rosbag使用内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!