# 介绍
抽象MQ配置及SDK,底层适配内存队列、redis、kafka、rocketmq以及阿里云、腾讯云等云产商的MQ产品,实现一套代码配置驱动适配不同的MQ的底层实现。
# 使用说明
# 添加依赖
<dependency>
<groupId>com.mendmix</groupId>
<artifactId>mendmix-amqp-adapter</artifactId>
<version>[最新版本]</version>
</dependency>
<!--根据provider按需依赖即可,如:-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
</dependency>
# 配置说明
配置项 | 必填 | 说明 |
---|---|---|
mendmix.amqp.provider | 是 | 可选:kafka,rocketmq,rabbitmq,redis,eventbus,qcloud-cmq,aliyun-ons... |
mendmix.amqp.namespace | 否 | 命名空间,配置了命名空间则自动拼接在topic上 |
mendmix.amqp.groupName | 否 | 分组,默认是应用serviceId |
mendmix.amqp.producer.enabled | 否 | 是否启用生产者客户端 |
mendmix.amqp.consumer.enabled | 否 | 是否启用消费者客户端 |
mendmix.amqp.consumer.processThreads | 否 | 异步处理线程数 |
mendmix.amqp.consume.maxRetryTimes | 否 | 最大重试次数 |
mendmix.amqp.consume.maxInterval.ms | 否 | 忽略处理最大时间间隔 |
mendmix.amqp.loghandler.enabled | 否 | 是否启用日志 |
mendmix.amqp.loghandler.threads | 否 | 处理日志线程 |
mendmix.amqp.loghandler.queueSize | 是 | 日志缓冲队列 |
mendmix.amqp.loghandler.ignoreTopics | 否 | 忽略日志的topics,多个',' 隔开 |
# 配置
基于redis(依赖redisTemplate)
mendmix.amqp.provider=redis
mendmix.amqp.producer.enabled=true
mendmix.amqp.consumer.enabled=true
kafka
mendmix.amqp.provider=kafka
mendmix.amqp.producer.enabled=true
mendmix.amqp.consumer.enabled=true
mendmix.amqp.kafka[bootstrap.servers]=127.0.0.1:9092
mendmix.amqp.kafka[enable.auto.commit]=true
mendmix.amqp.kafka[auto.offset.reset]=latest
# 用法
发送
UserInfoParam userInfoParam = BeanUtils.copy(param, UserInfoParam.class);
userInfoParam.setId(accountId);
userInfoParam.setName(staffNo);
MQInstanceDelegate.send(new MQMessage(MqTopicNames.CREATE_USER, userInfoParam));
消费
@Service
@MQTopicRef(MqTopicNames.CREATE_USER)
public class CreateUserMessageHandler implements MessageHandler {
private @Autowired UserInfoService userService;
@Override
public void process(MQMessage message) throws Exception {
UserInfoParam param = message.toObject(UserInfoParam.class);
userService.addUser(param);
}
}