ZBus的创建、启动、生产、回调具体是如何操作的?
- 内容介绍
- 文章标签
- 相关推荐
本文共计1141个文字,预计阅读时间需要5分钟。
javapackage com.accenture.icc.zbus.config;
import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;
package com.accenture.icc.zbus.config;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.zbus.broker.Broker;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.mq.Consumer.ConsumerHandler;
import org.zbus.mq.MqAdmin;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
import com.accenture.icc.pojo.AnalogInputData;
import com.accenture.icc.pojo.DataUnit;
import com.accenture.icc.pojo.TripleDataWrapper;
@Configuration
@PropertySource("classpath:data.properties")
public class ZbusConfig {
private static final Logger logger = LoggerFactory.getLogger(ZbusConfig.class);
@Value("${zbus.mq.sub}")
private String subMq;
@Value("${zbus.mq.recv}")
private String recvMq;
@Value("${zbus.mq.alarm}")
private String alarmMq;
@Value("${zbus.mq.subCommonData}")
private String subIndexDataMq;
@Value("${zbus.mq.recvCommonData}")
private String indexDataMq;
@Value("${csv.power.maxsize}")
private int powerMaxListSize;
@Value("${csv.power.period}")
private int powerPeriod;
@Value("${csv.consumption.maxsize}")
private int consumptionMaxListSize;
@Value("${csv.consumption.period}")
private int consumptionPeriod;
@Autowired
private TripleDataWrapper powerWrapper;
@Autowired
private TripleDataWrapper consumptionWrapper;
@Autowired
private TripleDataWrapper transformerWrapper;
@Autowired
private TripleDataWrapper powerFactorWrapper;
@Autowired
private TripleDataWrapper unbalanceWrapper;
@Autowired
private TripleDataWrapper stationParamWrapper;
/**
* 回调函数
* @param messaging
* @return
*/
@Bean
public ConsumerHandler consumerHandler(SimpMessageSendingOperations messaging) {
ConsumerHandler consumerHandler = new ConsumerHandler() {
@Override
public void handle(Message msg, Consumer consumer) throws IOException {
logger.info("RECEIVING MESSAGE: {}", msg.getBodyString());
String[] fields = msg.getBodyString().split(",");
if(fields.length < 5){
return;
}
String tag = fields[0];
int tableId = 0;
int recordId = 0;
int fieldId = 0;
double value = 0;
String timeStamp = "";
try{
tableId = Integer.parseInt(fields[1]);
recordId = Integer.parseInt(fields[2]);
fieldId = Integer.parseInt(fields[3]);
value = Double.parseDouble(fields[4]);
if(value < 0.01 || value == 128) {
value = 0;
}
timeStamp = fields[6];
} catch (NumberFormatException e) {
logger.info("message format error.");
return;
}
AnalogInputData analogInputData = new AnalogInputData(tag, tableId, recordId, fieldId, value, timeStamp);
if(powerWrapper.containsRecord(recordId)){
List
datalist = powerWrapper.getDataListByRecordid(recordId);
if(datalist.size() == powerMaxListSize) {
datalist.remove(0);
}
datalist.add(analogInputData);
}
if(consumptionWrapper.containsRecord(recordId)){
List
datalist = consumptionWrapper.getDataListByRecordid(recordId); if(datalist.size() == consumptionMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(transformerWrapper.containsRecord(recordId)) { List
datalist = transformerWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(powerFactorWrapper.containsRecord(recordId)) { List
datalist = powerFactorWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(unbalanceWrapper.containsRecord(recordId)) { List
datalist = unbalanceWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } broadcastAnalog(recordId, messaging); } }; return consumerHandler; } @Bean public ConsumerHandler consumerHandlerAlarm(SimpMessageSendingOperations messaging) { ConsumerHandler consumerHandlerAlarm = new ConsumerHandler() { @Override public void handle(Message msg, Consumer consumer2) throws IOException { logger.info("RECEIVING MESSAGE: {}", msg.getBodyString()); String strMsg = msg.getBodyString(); List
本文共计1141个文字,预计阅读时间需要5分钟。
javapackage com.accenture.icc.zbus.config;
import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;
package com.accenture.icc.zbus.config;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.zbus.broker.Broker;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.mq.Consumer.ConsumerHandler;
import org.zbus.mq.MqAdmin;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
import com.accenture.icc.pojo.AnalogInputData;
import com.accenture.icc.pojo.DataUnit;
import com.accenture.icc.pojo.TripleDataWrapper;
@Configuration
@PropertySource("classpath:data.properties")
public class ZbusConfig {
private static final Logger logger = LoggerFactory.getLogger(ZbusConfig.class);
@Value("${zbus.mq.sub}")
private String subMq;
@Value("${zbus.mq.recv}")
private String recvMq;
@Value("${zbus.mq.alarm}")
private String alarmMq;
@Value("${zbus.mq.subCommonData}")
private String subIndexDataMq;
@Value("${zbus.mq.recvCommonData}")
private String indexDataMq;
@Value("${csv.power.maxsize}")
private int powerMaxListSize;
@Value("${csv.power.period}")
private int powerPeriod;
@Value("${csv.consumption.maxsize}")
private int consumptionMaxListSize;
@Value("${csv.consumption.period}")
private int consumptionPeriod;
@Autowired
private TripleDataWrapper powerWrapper;
@Autowired
private TripleDataWrapper consumptionWrapper;
@Autowired
private TripleDataWrapper transformerWrapper;
@Autowired
private TripleDataWrapper powerFactorWrapper;
@Autowired
private TripleDataWrapper unbalanceWrapper;
@Autowired
private TripleDataWrapper stationParamWrapper;
/**
* 回调函数
* @param messaging
* @return
*/
@Bean
public ConsumerHandler consumerHandler(SimpMessageSendingOperations messaging) {
ConsumerHandler consumerHandler = new ConsumerHandler() {
@Override
public void handle(Message msg, Consumer consumer) throws IOException {
logger.info("RECEIVING MESSAGE: {}", msg.getBodyString());
String[] fields = msg.getBodyString().split(",");
if(fields.length < 5){
return;
}
String tag = fields[0];
int tableId = 0;
int recordId = 0;
int fieldId = 0;
double value = 0;
String timeStamp = "";
try{
tableId = Integer.parseInt(fields[1]);
recordId = Integer.parseInt(fields[2]);
fieldId = Integer.parseInt(fields[3]);
value = Double.parseDouble(fields[4]);
if(value < 0.01 || value == 128) {
value = 0;
}
timeStamp = fields[6];
} catch (NumberFormatException e) {
logger.info("message format error.");
return;
}
AnalogInputData analogInputData = new AnalogInputData(tag, tableId, recordId, fieldId, value, timeStamp);
if(powerWrapper.containsRecord(recordId)){
List
datalist = powerWrapper.getDataListByRecordid(recordId);
if(datalist.size() == powerMaxListSize) {
datalist.remove(0);
}
datalist.add(analogInputData);
}
if(consumptionWrapper.containsRecord(recordId)){
List
datalist = consumptionWrapper.getDataListByRecordid(recordId); if(datalist.size() == consumptionMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(transformerWrapper.containsRecord(recordId)) { List
datalist = transformerWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(powerFactorWrapper.containsRecord(recordId)) { List
datalist = powerFactorWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(unbalanceWrapper.containsRecord(recordId)) { List
datalist = unbalanceWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } broadcastAnalog(recordId, messaging); } }; return consumerHandler; } @Bean public ConsumerHandler consumerHandlerAlarm(SimpMessageSendingOperations messaging) { ConsumerHandler consumerHandlerAlarm = new ConsumerHandler() { @Override public void handle(Message msg, Consumer consumer2) throws IOException { logger.info("RECEIVING MESSAGE: {}", msg.getBodyString()); String strMsg = msg.getBodyString(); List

