Apache Pulsar的Adaptor适配器有哪些应用场景?
- 内容介绍
- 文章标签
- 相关推荐
本文共计1121个文字,预计阅读时间需要5分钟。
Pulsar Adaptor for Kafka,适配器将Pulsar作为Apache Kafka使用Java客户端API编写的应用程序程序提供了一种简单解决方案。在生产者端,若想保持原有kafka的代码架构不变,则可直接切换到Pulsar平台。
一、Pulsar Adaptor on Kafka 适配器
Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。
在生产者中,如果想不改变原有kafka的代码架构,就切换到Pulsar的平台中,那么Pulsar adaptor on kafka就变的非常的有用了,它可以帮助我们在不改变原有kafka的代码基础上,即可接入pulsar,但是需要注意,相关配置信息需要进行一些调整,例如:地址与topic。
1.1 需要导入Pulsar兼容kafka的依赖包
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-kafka</artifactId> <version>2.8.0</version> </dependency>1.2 编写生产者
public class KafkaAdaptorProducer { public static void main(String[] args) throws Exception { //1. 创建kafka生产者的核心类对象: KafkaProducer //1.1: 创建生产者配置对象: 设置相关配置 Properties props = new Properties(); props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650"); props.put("acks", "all"); // 消息的确认方案 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // key序列化类型 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value 序列化类型Producer<String, String> producer = new KafkaProducer<>(props); Producer<String, String> producer = new KafkaProducer<>(props); //2. 发送数据 for (int i = 0; i < 10; i++) { //2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据 ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i)); producer.send(producerRecord).get(); } //3. 释放资源 producer.close(); } }1.3 编写消费者
public class KafkaAdaptorConsumer { public static void main(String[] args) { //1. 创建kafka的消费者的核心对象: KafkaConsumer //1.1: 创建消费者配置对象, 并设置相关的参数: Properties props = new Properties(); props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650"); props.setProperty("group.id", "test"); // 消费者组的 id props.setProperty("enable.auto.commit", "true"); // 是否启动消费者自动提交消费偏移量 props.setProperty("auto.commit.interval.ms","1000");//每间隔多长时间提交一次偏移量:单位 毫秒 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key 反序列化 props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // val 发序列化 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //2. 给消费者设置订阅topic: consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1")); //3. 循环获取相关的消息数据 while (true) { //3.1: 从kafka中获取消息数据: 参数表示等待超时时间 // 注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 一个对象就是一条消息 for (ConsumerRecord<String, String> record : records) { String massage = record.value(); System.out.println("消息数据为:"+massage); } } } }二、Pulsar Adaptor on Spark 适配器
Pulsar 的 Spark Streaming 接收器是一个自定义的接收器,它使用 Apache Spark Streaming 能够从Pulsar接收原始数据。
应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可以通过多种方式对其进行处理
2.1 导入相关的依赖包
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-spark</artifactId> <version>2.8.0</version> </dependency>2.2 编写spark的流式代码
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.spark.SparkStreamingPulsarReceiver; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import java.util.HashSet; import java.util.Set; /** * @Author: huangyibo * @Date: 2022/6/6 22:40 * @Description: */ public class SparkStreamingAdaptor { public static void main(String[] args) throws InterruptedException { String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650"; String topic = "persistent://public/default/test_src"; String subs = "test_sub"; //1. 创建Java Spark Streaming 对象 SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Adaptor"); JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10)); //2. 设置数据源: 从Pulsar中读取数据 ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData<>(); Set<String> set = new HashSet<>(); set.add(topic); pulsarConf.setTopicNames(set); pulsarConf.setSubscriptionName(subs); SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(serviceUrl, pulsarConf, new AuthenticationDisabled()); JavaReceiverInputDStream<byte[]> lineStream = streamingContext.receiverStream(pulsarReceiver); //3. 对接收到数据进行处理 JavaDStream<String> stream = lineStream.map((Function<byte[], String>) String::new); //4. 输出操作 stream.print(); //5. 启动 streamingContext.start(); streamingContext.awaitTermination(); } }本文共计1121个文字,预计阅读时间需要5分钟。
Pulsar Adaptor for Kafka,适配器将Pulsar作为Apache Kafka使用Java客户端API编写的应用程序程序提供了一种简单解决方案。在生产者端,若想保持原有kafka的代码架构不变,则可直接切换到Pulsar平台。
一、Pulsar Adaptor on Kafka 适配器
Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。
在生产者中,如果想不改变原有kafka的代码架构,就切换到Pulsar的平台中,那么Pulsar adaptor on kafka就变的非常的有用了,它可以帮助我们在不改变原有kafka的代码基础上,即可接入pulsar,但是需要注意,相关配置信息需要进行一些调整,例如:地址与topic。
1.1 需要导入Pulsar兼容kafka的依赖包
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-kafka</artifactId> <version>2.8.0</version> </dependency>1.2 编写生产者
public class KafkaAdaptorProducer { public static void main(String[] args) throws Exception { //1. 创建kafka生产者的核心类对象: KafkaProducer //1.1: 创建生产者配置对象: 设置相关配置 Properties props = new Properties(); props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650"); props.put("acks", "all"); // 消息的确认方案 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // key序列化类型 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value 序列化类型Producer<String, String> producer = new KafkaProducer<>(props); Producer<String, String> producer = new KafkaProducer<>(props); //2. 发送数据 for (int i = 0; i < 10; i++) { //2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据 ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i)); producer.send(producerRecord).get(); } //3. 释放资源 producer.close(); } }1.3 编写消费者
public class KafkaAdaptorConsumer { public static void main(String[] args) { //1. 创建kafka的消费者的核心对象: KafkaConsumer //1.1: 创建消费者配置对象, 并设置相关的参数: Properties props = new Properties(); props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650"); props.setProperty("group.id", "test"); // 消费者组的 id props.setProperty("enable.auto.commit", "true"); // 是否启动消费者自动提交消费偏移量 props.setProperty("auto.commit.interval.ms","1000");//每间隔多长时间提交一次偏移量:单位 毫秒 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key 反序列化 props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // val 发序列化 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //2. 给消费者设置订阅topic: consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1")); //3. 循环获取相关的消息数据 while (true) { //3.1: 从kafka中获取消息数据: 参数表示等待超时时间 // 注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 一个对象就是一条消息 for (ConsumerRecord<String, String> record : records) { String massage = record.value(); System.out.println("消息数据为:"+massage); } } } }二、Pulsar Adaptor on Spark 适配器
Pulsar 的 Spark Streaming 接收器是一个自定义的接收器,它使用 Apache Spark Streaming 能够从Pulsar接收原始数据。
应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可以通过多种方式对其进行处理

