Friday, April 26, 2013

Multiple Profiles and Shared Bundles With Eclipse P2 : Case Study


WSO2 Carbon is an OSGi based server framework. Number of WSO2 middleware products use Carbon as their base platform. Carbon make use of Eclipse Equinox as its OSGi framework implementation and use Eclipse P2 as its provisioning framework.

Problem Description

Some of the Carbon products has their own deployment patterns during actual production deployments. However to give a smooth evaluation experience the same product is available as a 'ready to go' all in one zip distribution. For an example, WSO2 Business Activity Monitor(BAM) allows enterprises to batch process their collected enterprise data and present it by means of human readable graphs/etc. A real production deployment includes, three main parts.

Figure 1



















1. Receiver component responsible for receiving the events
2. Storage and Analyzer will analyze the stored data in batch mode.
3. presentation component will present the processed data by means of dashboard elements.

Even though these components are distinct by their functionality and by their deployment, they are all part of an one product. The first experience of the middleware developer should be seamless, means Business Activity monitor product should include all these components. However having all the components in the product resolves to a larger memory footprint during the run-time. During an actual deployment users have to deploy three seperate BAM instances even though they are interested only in one functionality of the product at a given time - they can't deploy only the receiver bit of BAM.

All of WSO2's Carbon based middleware products are available in the cloud as a multitenanted PAAS offering. This resolves to a atleast one product instance from each of the product types (Enterprise Service Bus, Application Server,etc). If we take per product distribution size to be ~100MB, then ten product instances will resolve in to 1GB. However we want to make our cloud offering available to users as a downloadable archive. This will enable them to easily download/setup their own private PAAS and evalute it - means total distribution size matters.

Solution 1 : Handling multiple profiles

Figure 2




















There is one important aspect in the above described BAM scenario. All the different deployment entities can share the same configuration. Being differnt component of BAM server, the configuration requirement of the three different components are more or less the same. If we can selectively activate some of these functionalities using a switch mechanism, that would do the trick. The Eclipse P2 has a concept called profiles. Once you provision your OSGi application using P2, the application get assigned a profile. During the server startup, only the bundles that were provisioned under a particular profile get started. We created seperate profiles for each BAM component, namely receiver, analyzer and dashboard. During the server start-up user can select the profile by means of a system property. The default profile contains functionality of all of the BAM components just like good old days. The notion of profile is a logical partition that works with server provisioning framework. Each of these profiles share the same set of bundles in a shared repository, hence there is no increase in the distribution size.

Solution 2 : Shared bundle pool during run-time

Figure3


The solution for the second problem, can be addressed using P2 profiles and shared bundle pools as well. However there is a suttle difference in the approach. Unlike the BAM use case, each of these different products have their own configuration area and data/persistance locations. We can tackle this requirement by completely removing the bundle pool location from the product distribution and placing it outside of all products. Each product will have their own configuration/distribution area and all the products will point to the same bundle location, hence distribution size will be much much less. The use case demands runtime isolation (running two or more products parallely using the same bundle location) and we can successfully achieve the same using P2 profiles and bundle pooling.

Pitfalls

P2 bundle pooling functionality coupled with roaming (the ability to move your provisioned application from your original location - relies on relative paths) seems to be broken. However with the help from P2 dev-list we figured out a workaround.

Images
http://cdn3.iconfinder.com/data/icons/humano2/128x128/mimetypes/application-x-jar.png
wso2 BAM product presentation slides


Monday, April 15, 2013

MQTT Transport for Axis2/Synapse

MQTT is an application layer broker based pub/sub protocol built on top of TCP/IP. It is very lightweight and hence ideal for usage in network/memory/computing power constrained environments. If I quote the MQTT spec, the protocol features,
  1. Pub/Sub message pattern
  2. Small transport overhead, fixed length header being only 2 bytes
  3. Supports three QOS for message delivery, at-least once, exactly once and at-most once
  4. etc.
The ability to operate under constrained environment make this protocol an ideal candidate for embedded device communication/pervasive computing.  However smaller footprint is always better. Why not MQTT in enterprise integration ? :)

There are number of MQTT broker implementations out there (you can find the original spec here). Right now, MQTT transport is undergoing OASIS standardization process. Mosquitto is one such broker implementation of MQTT. Eclipse Paho project provides MQTT client library in Java. 

Apache Synapse is  a popular open source enterprise service bus (ESB). Synapse mediation engine relies on Axis2 transport framework to receive messages. Axis2/Synapse  supports many transports out  of the box. I set out to implement a MQTT transport for Axis2/Synapse so that MQTT transport is available for EIP scenarios/etc with Synapse ESB.

The sample message flow,


Setting up the environment and running the code,

1. Download the MQTT broker from Mosquitto.org and install
2. Download Apache Synapse and install. Due to some API compatibility reasons, the code only works with trunk version of Synapse (1.7.0-SNAPSHOT) as of this writing.
3. Deploy the provided proxy service configuration 'SampleProxy.xml'
The provided proxy service read from a Mosquitto broker topic and writes to a publicly hosted MQTT broker topic.



         
Next configure the MQTT transport definition in axis2.xml transports section. Here we configure the transport listener and transport sender.

        
                localhost
                1883
                esb.test.listener
                esb.test2
        
    


    

simply copy replace the original axis2.xml file found in synapse distribution with provided axis2.xml.
Now add the eclipse Paho MQTT client jar and the axis2-mqtt transport jar to synapse classPath. That is to $synapse_home/lib/

4. Start Synapse and Mosquitto broker
5. Publish message to topic1 using provided client. Edit the endpoint addresses and payload accordingly.

Once you publish the message to the topic, the listening ESB transport will get notified and proxy service will write the message to the target endpoint.

6. Read/Observer the output in topic2 using the provided receiver. The online hosted MQTT broker is configured as the target endpoint. The http bridge allows you to see the currently published topics to the broker.

I have only demonstrated, MQTT -- > MQTT scenario. The Axis2 transport framework allows you to transport switch messages. Hence X --> MQTT / MQTT--> X is possible. (X is a Axis2 transport). The code is very much in its initial state. The next step would be to improve this and contributing to Axis2 project.

Thursday, April 11, 2013

Dealing with Occasional Message Bursts : Message Broker Use Case

Recently I came across the following requirement,

An organization has a message delivery network and an application that makes use of those messages (process them). The delivery network directly communicates with the message processing application. Things seems to be working fine on a regular day. However if the load(incurred by the delivery network) is high, some of the messages may get lost before they get processed (overflow of applications' task queue).

Message brokers are designed to cater the exact scenario. Enterprises rely heavily on their legacy systems and changing all existing integration points may not be the right solution and will never work. In the above scenario, we can place a message broker in between the message producer and message consumer. By introducing a message broker we,

1. Decouple producer/consumer in time and space (two parties need not to be functioning at the same time and the don't have to know each others existence )   
2. Reliable message processing/delivery by means of persistence and transactions.



I implemented the POC scenario using WSO2 ESB and WSO2 MB. Please find the configuration files and source code in the this location. ESB used for its JMS transport support (you don't have to change the producer code).

How to use POC config/code;

1. replace the the original axis2.xml and jndi.properties file found in WSO2ESB distribution with provided ones. Here I have enabled JMS sender in axis2.xml and configured connection factory params in jndi.properties file.
2. Create a proxy service in WSO2 ESB using provided StockQuoteProxy.xml. This proxy will accept HTTP request and write them in to a MB queue.
3. Create a new queue named 'StockQuotesQueue' in WSO2MB: this is the queue we are writing to and reading from.
4. Start server and run the servers, send some soap messages to proxy services, so that those get written to MB. 
5. Run the Receiver to retrieve messages from MB queue.

please configure ports and hosts accordingly. Refer WSO2 MB documentation as well.

Application Profiling for Memory and Performance

Inspired by the experiences gathered during recent application profiling sessions at WSO2, we did a webinar on memory and performance tuning. Below are the slides.




Tuesday, February 26, 2013

Understanding Java NIO : My Notes

Why this blog post?

Recently I was working with a team to improve the mediation performance of Apache Synapse ESB. Java NIO was new to me when i took up the challenge. I spent few days learning I/O concepts and NIO particularly. Soon after my background reading period, I was pulled out to optimize Carbon kernel. Tough luck!. Then again I learnt few things and thought of compiling a blog post based on the initial research.

Basic steps in a request/response system.

In a typical request/response system eg: servers, messages undergo the following steps,

1. Input read/listening
2. Input Decode
3. Perform business Logic
4. output encode
5. output write/sending












The steps can overlap depending on the implementation details.

Blocking I/O



















In a blocking I/O scenario, as the name implies, the processing thread is blocked on the message processing until the output is written back to the wire. In a typical scenario, each client connection get assigned to a separate thread. Since the threads are dependent on I/O performance, no matter how efficient you process the message, the system throughput is dependent on the I/O behaviour. If the network latency is very high, the system won’t scale with regard to number of requests it can serve even with a lower concurrency level.



Sample Code for Blocking I/O



        ServerSocket serverSocket = null;
        serverSocket = new ServerSocket(4444);
        Socket clientSocket = null;
        clientSocket = serverSocket.accept();
        PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
        BufferedReader in = new BufferedReader(
                new InputStreamReader(clientSocket.getInputStream()));
        String inputLine, outputLine;
        CustomProtocol customProtocol = new CustomProtocol();

        while ((inputLine = in.readLine()) != null) {
            outputLine = customProtocol.processInput(inputLine);
            out.println(outputLine);



Non Blocking I/O

In a non blocking I/O scenario, the partial read of the input is possible and the acceptor triggers an event when the input is available. Because of the eventing model, now the processing thread doesn’t have to blocking wait on the input.

Eg: A typical client worker thread blocks on I/O like below,

while (in.readLine()) != null)

Since input/output readiness is triggered by events, the worker threads can go back to the worker thread pool and handle another request/task which is in a ready state. Reactor acts as the event dispatcher. All the other components are handlers that registers themselves with the reactor for interested event.  For an example, Acceptor registers itself with the reactor for the event, ‘Operation_Accept’.















Some core semantics


Selectors are responsible for querying available events and keeping track of calling objects (Listeners). For an example non blocking socket channel registers itself with the selector for "OP_ACCEPT" event.

If a connection occurs (that is ACCEPT) the selector will pick it up and appropriate listener will get called.
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
        selectionKey.attach(new Acceptor(serverSocketChannel, selector));



During an event trig, the selector will return all the relevant selection keys, and we can retrieve the attached listener for the key and execute.

 
 while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                    SelectionKey selectionKey = (SelectionKey)it.next();
                    // Our attached object implements runnable
                    Runnable runnable = (Runnable)selectionKey.attachment();
                    runnable.run();

Scaling
The above architecture diagram only depicts the one thread version of reactor pattern. It is possible to scale the above system by means of multiple reactors, thread pools for worker threads, etc. The configuration highly dependent on the target hardware platform.


Libraries support for NIO
Even though one can write a complete service platform using the basic NIO constructs found in Java, there are open-source libraries that abstract out some of the underlying complexities. For an example Apache Http-core project provides abstraction layer for HTTp based NIO usage. Apache synapse make use of Http-core for NIO transport.

Main resource
Scalable IO in Java (presentation slides) - by Doug Lea, State University of New York at Oswego


Thursday, February 14, 2013

Fine Grained XACML Based Authorization With FuseSource ESB and WSO2 IS

Authorization - The bad way

In a typical organization that makes use of Service Oriented Architecture (fully/partially) to drive their IT requirements, chances are more to find tens/hundreds of services scattered across departments. If the legacy services are works fine for the organization, changing them is not a wise decision. However one always has to consider the maintainability of the system in the long run. If the services are coherent, that is, if they are doing only what they are supposed to do(known as only the business logic) then the changes are that, they may never wanted to be modified ever.
However we are not living in a ideal world. If i take the authorization as an example, I have seen people burning down authorization logic within their service implementation. This is a bad practice, since.

1. If the authorization mechanism(verifying against a JDBC user/permission store) changes you have to change the service implementation or introduce a hack to cover it up.
2. If the authorization policy(from now on we allow clients from X organization with security lever Y to access the service) changes then you have to change the actual service.

In simple words its not future proof. If the organization wants to monitor the authorization activities they you might end up modifying all your existing services.


What is XACML 

With XACML we can declaratively define our authorization policy using XML. Once you come up with the policy XACML engine evaluates the incoming message against the store XACML policy and spits the final decision (allow/deny). Since XACML is accepted by OASIS and is the de-facto standard which is supported by many industry leaders, there is little chance for vendor locking in the long run.

Sample Scenario

   
1. Clients sends in the request to access back-end service/resource. Here the request is directed to Fuse ESB proxy service. 
2. The proxy service intercepts(acts as the Policy Enforcement Point - PEP) the message and the entitlement bean extracts message properties and create the XACML request, which then forwarded to XACML engine. (Here we are using the XACML engine provided by WSO2 Identity Server).
3. Identity server validates (acts as the Policy Decision Point- PDP) the request against its stored XACML policy/policies and respond back with the decision.
4. Based on the received decision proxy service either forward the original client request to actual backend service or sends and not-authorized fault message to the client.
5. In the former case in the step 4, client receives the response from the actual back-end.

Note: The actual implementation (code) setup and the execution will be explained in a later post.