Stream源码导出

Spring Cloud Stream(全称SCS)提供更多了一连串事先表述的注释来新闻稿输入型和输入型Channel,业务控制系统如前所述那些Channel与最新NSS展开通讯,而不是间接与具体内容的最新NSS展开通讯。追踪SCS的源码就会发现,Stream有很多外部倚赖,最主要的就是Messaging和Integration两个项目,所以在传授SCS源码前,有必要性先如是说一下Messaging和Integration与SCS管理体系的关系。

SCS的最终目标是建立两套标准化的如前所述注释的最新消息推送监督机制,过滤器开发者间接与下层最新消息控制系统展开技术细节可视化,而Messaging组件正是Spring架构中用以做标准化最新消息程式设计数学模型的,在Messaging中最关键性的计算机程序是Message,标识符如下表所示:

在Messaging组件中最新消息地下通道MessageChannel是两个USB类,用于推送Message最新消息,能认知为Messaging组件中的标准USB,近似于J2EE中的ServletUSB,具体内容同时实现类能同时实现具体内容最新消息地下通道。上面是MessageChannel的标识符:

在Messaging组件中,最新消息地下通道的子USBSubscribableChannel承继了MessageHandler最新消息CPU:

由MessageHandler或者说地消费需求/处置最新消息:

Integration如前所述Spring架构能同时实现轻量的最新输入输入,也是对Messaging的扩充同时实现,支持透过新闻稿转接器与SCS软件控制系统。它同时实现了最新消息 过 滤 、 消 息 转 换 、 消 息 聚 合 和 消 息 分 割 等 功 能 , 提 供 了 对MessageChannel 和 MessageHandler 的 实 现 , 包 括 DirectChannel 、ExecutorChannel、PublishSubscribeChannel,以及MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter等。上面如是说Integration 中 的 两 种 消 息 分 发 器 :DirectChannel 和PublishSubscribeChannel。

从标识符由此可知,DirectChannel外部的UnicastingDispatcher类别递送器寄送相关联最新消息地下通道的MessageChannel中,从英文名字也能看出来,UnicastingDispatcher是两个多播的递送器,只能选择两个最新消息地下通道。而PublishSubscribeChannel使用BroadcastingDispatcher作为该台最新消息递送器,会把最新消息递送给所有的MessageHandler。

SCS在Integration的软件控制系统上展开了PCB,透过注释的方式和标准化的API展开最新消息的推送和消费需求,下层最新NSS的同时实现技术细节由各最新NSS的Binder完成,同时,透过与Spring Boot的ExternalizedConfiguration资源整合,SCS提供更多了BindingProperties等外部化实用性类,那些具体内容的实用性信息将存取到具体内容的最新NSS的实用性类中。

SCS的构架时序

上面是SCS的构架时序,我们会从几个层级分别传授其中密切相关的源码和它们之间的可视化关系。

应用层

SCS为用户提供更多了三个存取最新消息地下通道的默认同时实现。

● Sink:透过指定消费需求最新消息的最终目标来标识最新消息消费需求者。

● Source:与Sink相反,用于标识最新消息生产者。

● Processor:软件控制系统了Sink和Source的机能,用于标识最新消息生产者和消费需求者。

对 应 用 而 言 , 想 要 启 动 SCS 的 功 能 , 需 要 先 启 动 注 解 。

@EnableBinding注释是Stream架构运转的起点,透过这个注释能同时实现动态注册BeanDefinition,它会将最新消息地下通道存取到自己修饰的最终目标实例上,从而让那些实例具备与最新消息队列展开可视化的能力。

BindingServiceConfiguration的 作 用 是 完 成BindingService、InputBindingLifecycle、OutputBindingLifecycle等重要Bean的初始化及相关实用性文件加载。

● BindingBeansRegistrar的作用是注册新闻稿地下通道的USB类的BeanDefinition,从而获取那些USB类的实例,并使用那些实例展开最新消息的推送和接收,

registerBindingTargetBeanDefinitions方法会调用ReflectionUtils类完成扫描所有被注释@Input和@Output标注了的方法,然后注册BeanDefinition。

registerBindingTargetsQualifiedBeanDefinitions 是 在 注 册registerBindingTargetBeanDefinitions 时 使 用 的 工 厂 类BeanDefinition,这个工厂类用以生成registerBindingTargetBeanDefinition注册的Bean实例,

Stream层

Stream 层 的 BindableProxyFactory 被 初 始 化 为 一 个rootBeanDefinition,并注册为两个FactoryBean,这样Spring容器就可 以 获 得

registerBindingTargetBeanDefinitions 方 法 中 所 注 册 的Bean实例(MessageChannel对象实例)。BindableProxyFactory能说是SCS同时实现地下通道USB类新闻稿及相关类别的核心类,

afterPropertiesSet方法会处置所有被@Input和@Output注释的函数 , 并 将 生 成 函 数 返 回 类 型 实 例 存 储 在 BoundTargetHolder 中 ,getBindingTargetName方法会返回

SubscribableChannelBindingTargetFactory 实 例 , 它 会 在createOutput方法中返回两个DirectChannel实例,该实例会被存储起来供BindableProxyFactory使用。

名称为output的BeanDefinition将BindableProxyFactory设置成其实例工厂类,并将outputMessagefunction方法设置成其实例的工厂函数(BeanFactoryMethod)。当Spring容器创建该实例时,会调用BindableProxyFactory 的 outputMessagefunction 方 法 , 由 于BindableProxyFactory同时实现了MethodlnterceptorUSB,所以就调用了其invoke方法。invoke方法会从BindableProxyFactory缓存的Channel实例中匹配符合的实例方法,并反射调用。

BindingService是Stream层获取存取器和执行存取任务的两个重要类,首先我们看BindingService的bindProducer方法,

在 BindingService 实 现 中 , getBinder 方 法 最 终 会 调 用DefaultBinderFactory中的getBinder方法同时实现,我们能看到,DefaultBinderFactory的作用就是获取具体内容的Binder同时实现并提供更多给相应的MessageChannel实例。DefaultBinderFactory的初始化倚赖于BinderTypeRegistry获得的BinderType列表。DefaultBinderFactory的getBinder同时实现中会调用BinderConfiguration获取相关联的Binder实例 , 通 过 跟 踪 BinderConfiguration 的 初 始 化 过 程 , 可 以 发 现BinderConfiguration 是 在

BinderFactoryConfiguration 执 行getBinderConfiguration方法时将bindingServiceProperties变量中的BinderProperties与BinderTypeRegistry中的BinderType结合,PCB成BinderConfiguration对象。BinderPropertiesPCB了Stream从application.yml文件中读取的关于Binder的实用性信息,而BinderType则 是 具 体 Binder 的 实 现 类 信 息 。DefaultBinderFactory 的getBinderInstance

这 里 的 getBinderInstance 方 法 中 会 生 成 一 个

ConfigurableApplicationContext 来 创 建 Binder 实 例 , 在 创 建ConfigurableApplicationContext实例时,它会将BinderConfiguration设置到SpringApplicationBuilder中。

ConfigurableApplicationContext调用getBinder方法时,会使用BinderConfiguration的属性和实用性生成BinderConfiguration中设置的具体内容类别的Binder同时实现。如果你使用的Binder是RabbitMQ,那么相关联 的 RabbitServiceAutoConfiguration 会 自 动 初 始 化 并 加 载RabbitMessageChannelBinder实例。

在 Stream 层 对 Binder 实 例 的 初 始 化 工 作 都 完 成 后 , 再 回 到BindingService 的 bindProducer 方 法 实 现 , 它 会 调 用

AbstractMessagChannlBinder 的 doBindProducer 方 法 ,

从源码由此可知,ProvisioningProvider是两个USB,不同的Binder实 现 可 以 根 据 接 口 实 现 各 自 不 同 的 ProducerDestination 和ConsumerDestination,标识符如下表所示:

doBindProducer会调用

createProducerMessageHandler方法创建MessageHandler实例,MessageChannel会使用SendingHandlerPCB后的MessageHandler实例,当有output最新消息时,将最新消息推送给最终的Binder实例。

透过上面的步骤,基本上在Stream层就完成了对生产者的存取操作,消费需求者的存取就是将SubscribableChannel与具体内容的最新消息队列同时实现连接,doBindConsumer与doBindProducer流程类似。

首先透过ProvisioningProvider的

provisionConsumerDestination方法创建ConsumerDestination,然后调用createConsumerEndpoint方法创建MessageProducer实例,最后生成DefaultBinding实例,

Message/Integrate/最新NSSBinder层

从@Output注释能看到,Stream架构会使用MessageChannel推送消 息 。通 过 BindingService 的 doBindProducer 方 法 创 建 并 绑 定SendingHandler对象,然后调用handleMessageInternal方法,它会将最新消息再推送给delegate对象处置。上面是SendingHandler对象的handleMessageInternal方法的标识符同时实现:

delegate是之前在BindingServer中抽象类

AbstractMessageChannelBinder执行的createProducerMessageHandler方法返回的生产者MessageHandler实例。对于RabbitMQ Binder来说,就是rmqpOutboundEndpoint对象,该实 例 将 最 终 调 用 其 handlerMessage 方 法 , 该 方 法 进 一 步 调 用RabbitTemplate的send方法。

最新消息的接收过程

最新消息的接收过程能分为两个阶段:第两个阶段是从RabbitMQ到SubscribableChannel的过程。我们从@Input注释能看到,Stream架构 会 使 用 SubscribableChannel 接 收 消 息 。第 二 个 阶 段 是 注 解@StreamListener告诉SubscribableChannel如何将最新消息推送给相关联的Sink接收端相关联的回调方法。

Spring的RabbitMQ使用InternalConsumer作为默认的最新消息消费需求方,当接收到相关联最新消息后,会调用handleDelivery方法将RabbitMQ最新消息推送给BlockingQueueConsumer中的队列。上面是handleDelivery的源码同时实现。

AsyncMessageProcessingConsumer类是Runnable类别的,它会消费需求 阻 塞 队 列 , 并 将 消 息 传 给 AmqpInboundChannelAdapter 。

AmqpInboundChannelAdapter 实 例 是 在 BindingService 构 造createConsumerEndpoint时创建的consumerEndpoint,并将它与相关联的Channel存取。上面是AmqpInboundChannelAdapter的关键性标识符,即processMessage方法,它会调用MessagingTemplate对象的send方法将最新消息推送给SubscribableChannel组件。

上面就是最新消息处置的第二个阶段,就是将SubscribableChannel中的 消 息 发 送 给 指 定 的 方 法 , 主 要 靠 @StreamListener 注 解 实 现 。

@StreamListener是注释在消费需求方法上的注释,用以接收输入型地下通道的消 息 , Stream 定 义 了

StreamListenerAnnotationBeanPostProcessor类,用以处置项目中的@SteamListener注释。

StreamListenerAnnotationBeanPostProcessor同时实现了BeanPostProcessorUSB,用以在Bean初始化之前和之后两个时间点对Bean实例展开处置。

postProcessAfterlnitialization是在Bean实例初始化之后被调用 的 方 法 , 它 会 遍 历 Bean 实 例 中 的 所 有 函 数 , 处 理 那 些 被@StreamListener注释修饰的函数。

afterSingletonsInstantiated方法会遍历mappedListenerMethods 对 应 的 所 有 Entry 对 象 , 为 每 一 个StreamListenerHandlerMethodMapping 创 建 一 个 MessageHandler 实例。然后根据条件生成DispatchingStreamListenerMessageHandler并注册给SubscribableChannel。

下 面 是

StreamListenerAnnotationBeanPostProcessor 的 代 码 同时实现:

当 SubscribableChannel 接 收 到 消 息 后 , 会 调 用

DispatchingStreamListenerMessageHandler类的handleRequestMessage方法,该方法会调用ConditionalStreamListenerHandler的handleMessage方法。

findMatchingHandlers方法根据

ConditionalStreamListenerHandler 的 Expression 实 例 来 判 断ConditionalStreamListenerHandler是否适合处置当前这个最新消息,最终最新消息经过InvocableHandlerMethod传递给相关联的函数。SCS消费需求最新消息的整体流程如下表所示图所示。

1.本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2.分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3.不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4.本站提供的源码、模板、插件等其他资源,都不包含技术服务请大家谅解!
5.如有链接无法下载或失效,请联系管理员处理!
6.本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!