Friday, October 19, 2012

Stream Definitions

In this post I would like to explain more about WSO2 BAM 2.0. This blog post will be mainly focused about the concept of Data Streams used in WSO2 BAM and also in WSO2 CEP.

Introduction Data Streams

As mentioned in the previous post, Data Bridge is the generalized form of receiving data from an external data source and the generalized form of storing them to the secondary storage. In earlier versions of BAM it was tried to send data from external data source to the Data Receiver via different types of protocols. The problem was the lack of throughput achieved by them. BAM was expected to be used under the heavy load of data from high trafficked Enterprise Service Buses and web services. The scale of data was categorized as Big data due to its size and unstructured nature and need to be handled with a data transfer protocol scalable in throughput.
Earlier techniques that were used were mainly based on data protocols with key-value pair couples. And the data overhead of its container was a main concern. So the designers came up with the idea that if the types of data to be transferred is required to be known only at the beginning of the transmission there is no need to specify the types of data every time it is specified. In other words key-value pairs are redundant with information. And also they found if the data to be transmitted very frequently, in very small chunks, can be converted to a sequence of large packets, with aggregated data, transmitted periodically, the overhead can be significantly reduced. This way of thought brought them to the concept of Data Streams.
Data Streams is an implementation based on Apache Thrift which is a binary protocol that fulfills the above given requirements. With the performance tests performed they found that Thrift has the highest throughput within the available technologies suited for the given requirement. Some of implementation details of Data Streams in Data Agents can be found here. This will be discussed in detail in future.

Stream Concept

In this Stream concept the data sender has to agree on a set of data types it wish to send in an each data event. So when the first message is sent to the Data Receiver, the set of types of it wish to send, is sent with the message defining the Stream. This is known as the Stream Definition. Each Stream Definition is unique in the pair of Stream Name and the Stream Version. Stream Name corresponds to the Cassandra Column Family the stream of data to be stored. So when different stream definitions are required to be used to store several data streams into the same column family, different stream versions should be used with the same stream name corresponding to the column family. After the stream definition is sent once in each stream, the types of data transferring will not be mentioned in later messages sent to Data Receiver. Only the data values will be sent hereafter as chunks to the Data Receiver where the data is read as the given stream definition. Unlike in protocols like HTTP, where every data type is sent as string, in Thrift for each field the space allocated in each message is only the required number of bits. This is also an advantage related to a high throughput.

Stream Definition Example

Although a Data Stream can be defined using Java in code, there is another way of defining a Stream as a Java string with the format of a JSON object. For the ease of understanding the concept I am going to give a sample code defining a Stream Definition.


{
  'name': 'stream.name',
  'version': '1.0.0',
  'nickName': 'stream nick name',
  'description': 'description of the stream',
  'metaData':[
          {'name':'meta_data_1','type':'STRING'},
          {'name':'meta_data_2','type':'INT'}
  ],
  'correlationData':[
          {'name':'correlation_data_1','type':'STRING'},
          {'name':'correlation_data_2','type':'DOUBLE'}
  ],
  'payloadData':[
          {'name':'payload_data_1','type':'BOOL'},
          {'name':'payload_data_2','type':'LONG'}
  ]
}

The column family created for this stream will be "stream_name" in "EVENT_KS" by replacing dot (".") with an underscore ("_"). The default version should be "1.0.0" and it can be incremented when another stream is required to be added to the same Cassandra column family or if the existing stream is to be edited. The important thing to note here is that a stream cannot be deleted or edited at the moment but when required, another stream should be created with the same name but with a different version.
Here "metadata" corresponds to the meta information related to the stream. e.g.: character set encoding and message type. "correlationData" corresponds to the data required to correlate between different monitoring points such as the "activity ID" of a message flow. All other content related to the payload of the message such as SOAP header of the message, SOAP body of the message and properties intercepted from the message should be specified as "payloadData". Their type should be specified as the "type" in each field. Valid types are as follows.
  1. String
  2. Int
  3. Long
  4. Double
  5. Float
  6. Bool

This post is intended to get only the theoretical background in Stream Definitions used in WSO2 BAM 2.0. I hope to discuss coding in future posts.

2 comments:

  1. Nice blog on data agents also touching some internal details.

    ReplyDelete
  2. This article was really helpful to me. I was wondering how Streams and CFs are mapped, and the answer is right here. :)

    ReplyDelete