通信人家园

标题: IBM Streams流事件处理平台  [查看完整版帖子] [打印本页]

时间:  2016-4-22 16:57
作者: lotus302     标题: IBM Streams流事件处理平台

InfoSphere Streams 旨在从一个几分钟到几小时的窗口中的移动信息(数据流)中揭示有意义的模式。该平台能够获取低延迟洞察,并为注重时效的应用程序(比如欺诈检测或网络管理)获取更好的成果,从而提供业务价值。InfoSphere Streams 还可合并多个流,使您能够从多个流中获取新洞察,如图 3 所示。

                       图 3. 合并的流处理
InfoSphere Streams 的主要设计目的是:
InfoSphere Streams 提供了一种编程模型和 IDE 来定义数据来源,还提供了已融合到处理执行单元中的称为运算符的软件分析模块。它还提供了基础架构来支持从这些组件合成可扩展的流处理应用程序。主要平台组件包括:

流处理语言
SPL,InfoSphere Streams 的编程语言,是一种分布式数据流合成语言。它是一种类似 C++ 或 Java™ 的可扩展且全功能的语言,支持用户定义的数据类型。您可以使用 SPL 或原生语言(C++ 或 Java)编写自定义函数。也可以使用 C++ 或 Java 编写用户定义的运算符。
InfoSphere Streams 持续应用程序会描述一个导向图,该图由各个互联且处理多个数据流的运算符组成。数据流可来自系统外部,或者在应用程序内部生成。SPL 程序的基本构建块包括:
图 4 演示了 SPL 程序的 InfoSphere Streams 运行时视图:

                    图 4. InfoSphere 运行时执行

一个运算符表示一个可重用的流转换器,将一些输入流转换为输出流。在 SPL 程序中,运算符调用可实现预算法的特定用途,使用分配的特定的输入和输出流,以及在本地指定的参数和逻辑。每次运算符调用都会对输入和输出流命名。各种内置的 InfoSphere Streams 运算符提供了许多强大的功能:
[size=1.166em]一个流连接到一个运算符的位置称为端口。许多运算符(例如 Functor)有一个输入端口和一个输出端口,但运算符也可以没有输入端口(比如 Source)和没有输出端口(比如 Sink),或者拥有多个输入或输出端口(比如 Split 和 Join)。清单 1 给出了 Sink 的一个 SPL 示例,它有一个输入端口并将输出元组写入到一个磁盘文件中。
清单 1. Sink 示例() as Sink = FileSink(StreamIn) {    param    file : "/tmp/people.dat";    format : csv;    flush : 20u;}
[size=1.166em]在 清单 1 中,file 是一个强制性参数,提供了输出文件的路径。flush 参数用于清除给定数量的元组后的输出。format 参数指定了输出文件的格式。
[size=1.166em]组合运算符是一个运算符集合。它表示对原始(非组合)运算符或组合(嵌套)运算符的一个子图的一种封装。它类似于过程语言中的宏。
[size=1.166em]一个应用程序由一个没有输入或输出端口的主要组合运算符表示。数据可流入和流出,但不会流到一个图表内的流上,而且流可导出到在同一个实例中运行的其他应用程序和从这些应用程序导入。清单 2 中的代码给出了主要组合运算符的框架。
清单 2. 主要组合运算符的结构composite Main {    graph    stream ... {    }    stream ... {    }    ...}
[size=1.166em]作为一个示例,我们来看一个简单的流应用程序 WordCount,它统计一个文件中的行数和字数。该程序由以下流图组成:
[size=1.166em]在介绍 WordCount 的主要组合运算符之前,我将定义一些帮助器。我将为一行的统计数据使用 LineStat 类型。此外,我需要构建一个countWords(rstring line) 函数来统计一行中的字数,需要使用一个 addM(mutable LineStat x, LineStat y) 函数来添加两个LineStat 值并存储结果。清单 3 定义了这些帮助器。
清单 3. WordCount 帮助器定义type LineStat = tuple<int32 lines, int32 words>;     int32 countWords(rstring line) {        return size(tokenize(line, " \t", false));    }     void addM(mutable LineStat x, LineStat y) {        x.lines += y.lines;        x.words += y.words;    }
[size=1.166em]现在可以定义主要组合运算符了,如清单 4 所示。
清单 4. WordCount 的主要组合运算符composite WordCount {     graph    stream<rstring line> Data = FileSource() {        param file : getSubmissionTimeValue("file");        format : line;    }     stream<LineStat> OneLine = Functor(Data) {        output OneLine : lines = 1, words = countWords(line);    }     () as Counter = Custom(OneLine) {         logic state : mutable LineStat sum = { lines = 0, words = 0 };        onTuple OneLine : addM(sum, OneLine);        onPunct OneLine : if (currentPunct() == Sys.FinalMarker)         println(sum);         } }

开发环境
[size=1.166em]InfoSphere Streams 提供了一个敏捷开发环境,该环境由 Eclipse IDE、Streams Live Graph 视图和一个流调试器组成。该平台还包含用于加速和简化特定功能或行业的解决方案开发的工具包:
此外,您可定义您自己的工具包,提供可重用的运算符和函数集,并创建跨领域和特定于领域的加速器。它们可包含原始和组合运算符,也可同时使用原生和 SPL 函数。






通信人家园 (https://www.txrjy.com/) Powered by C114