黑猴子的家KafkaStreams案例,如何打造长尾词营销策略?
- 内容介绍
- 文章标签
- 相关推荐
本文共计435个文字,预计阅读时间需要2分钟。
Code → GitHub https://github.com/liufengji/kafka_api.git
1.需求分析:实时处理,单词带有
2.Code → GitHub https://github.com/liufengji/kafka_api.git
1.需求分析 + 实时处理,单词语义分析
Code-amp;amp;gt;GitHubgithub.com/liufengji/kafka_api.git1、需求分析
实时处理单词带有”>>>”前缀的内容。例如输入”victor>>>mayy”,最终处理成“mayy”
2、创建主类 (创建一个工程,并添加jar包)
import java.util.Properties;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorSupplier;import org.apache.kafka.streams.processor.TopologyBuilder;public class Application { public static void main(String[] args) { // 定义输入的topic String from = "first"; // 定义输出的topic String to = "second"; // 设置参数 Properties settings = new Properties(); //给应用程序设计一个名字 settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter"); //连接哪台主机 settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); StreamsConfig cOnfig= new StreamsConfig(settings); // 构建拓扑 TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", from) .addProcessor("PROCESS", new ProcessorSupplier() { @Override public Processor get() { // 具体分析处理 return new LogProcessor(); } }, "SOURCE") .addSink("SINK", to, "PROCESS"); // 创建kafka stream KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); }}3、具体业务处理
import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorContext;public class LogProcessor implements Processor { private ProcessorContext context;台 @Override public void init(ProcessorContext context) { this.cOntext= context; } @Override public void process(byte[] key, byte[] value) { String input = new String(value); // 如果包含“>>>”则只保留该标记后面的内容 if (input.contains(">>>")) { input = input.split(">>>")[1].trim(); // 输出到下一个topic context.forward("logProcessor".getBytes(), input.getBytes()); }else{ context.forward("logProcessor".getBytes(), input.getBytes()); } } @Override public void punctuate(long timestamp) { } @Override public void close() { }}4、运行程序,在node3上启动生产者
[[email protected] kafka]$ bin/kafka-console-producer.sh --broker-list node1:9092 \--topic first> hello>>>world> hei>>>victor> h>>>victor> z>>>victor5、在node2上启动消费者
[[email protected] kafka]$ bin/kafka-console-consumer.sh \--zookeeper node1:2181 \--from-beginning \--topic secondworldvictorvictorvictor本文共计435个文字,预计阅读时间需要2分钟。
Code → GitHub https://github.com/liufengji/kafka_api.git
1.需求分析:实时处理,单词带有
2.Code → GitHub https://github.com/liufengji/kafka_api.git
1.需求分析 + 实时处理,单词语义分析
Code-amp;amp;gt;GitHubgithub.com/liufengji/kafka_api.git1、需求分析
实时处理单词带有”>>>”前缀的内容。例如输入”victor>>>mayy”,最终处理成“mayy”

