Asynchronous Delivery

OGSA-DAI provides several ways of delivering input and output data asynchronously. For example, data can be delivered to or from a URL or file, between a client and a session, or directly between multiple data service resources. This section will show you how to use this functionality.

1. Delivering data to and from a URL

Data can be delivered to and from a URL using the deliverToURL and deliverFromURL activities. Using these activities, the results of a database query could be written to a remote FTP server, a file containing WebRowSet XML data could be read from a URL and bulkloaded into a database.

Delivery to a URL works in a similar way via the deliverToURL activity and its corresponding DeliverToURL client activity class:

DeliverToURL deliver = new DeliverToURL(destinationURL);
deliver.setInput(anotherActivity.getOutput());

Note that the http protocol does not support the writing of data, however the ftp protocol does.

For a full example, see OGSA-DAI/examples/src/uk/org/ogsadai/examples/clienttoolkit/XSLTransformWithDelivery.java

2. Delivering data to and from a file

Data can also be delivered to and from a file using the deliverToFile and deliverFromFile activities. In a similar way to the URL activities, these activities can be used to store the result data produced by another activity or to read input data for an activity pipeline.

Delivery to a file works in a similar way via the deliverToFile activity and its corresponding DeliverToFile client activity class:

DeliverToFile deliver = new DeliverToFile( "C:/path/to/myfile.txt" );
deliver.setInput(anotherActivity.getOutput());

Note that, by default, these activities are disabled in activity configuration documents used to configure OGSA-DAI services, due to the security issue arising from enabling access to anywhere on the data service file system.

3. Delivering data to and from a GridFTP location

Data can also be delivered to and from a remote GridFTP server using the deliverToGFTP and deliverFromGFTP activities.

The DeliverToGFTP activity has one input and no output while the DeliverFromGFTP activity has one output and no input.

4. Pulling data from a session stream back to a client

Using the outputStream activity it is possible to pull data from a session stream to a client. When a request containing an outputStream activity is sent to a data service resource, a new session stream will be created. The output of another activity must be connected to the outputStream activity and this data will be written to the session stream. The OGSA-DAI data transport functionality can then be used to stream data from that session stream to the client.

The complexities of the data transport functionality are encapsulated by the client toolkit DTOutputStream class. This is a great way to run a query that returns a large amount of data without either the server or client failing with a java.lang.OutOfMemoryError.

  1. Start by setting up a DataService then sending a request to create a new session. This session will later contain the session stream.
    ActivityRequest request = new ActivityRequest();
    request.setSessionRequirements(new JoinNewSession());
    Response response = service.perform(request);
    Session session = response.getSession();
    
  2. Now prepare a second request containing a pipeline of activities ending with an DTOutputStream. Set the session requirements for the request so that it will join the existing session created in step 1.
    SQLQuery sqlQuery = new SQLQuery("select * from littleblackbook");
    WebRowSet rowset = new WebRowSet(sqlQuery.getOutput());
    DTOutputStream outputStream = new DTOutputStream(rowset.getOutput());
    
    ActivityRequest queryRequest = new ActivityRequest();
    queryRequest.add(sqlQuery);
    queryRequest.add(rowset);
    queryRequest.add(outputStream);
    queryRequest.setSessionRequirements(new JoinExistingSession(session));
    
  3. Perform the request. The data service resource will wait for the client to try to pull data from the session stream before initiating the actual request processing.
    service.perform(queryRequest);
    
  4. Use the getResultSet() method on the DTOutputStream to get the results as a JDBC ResultSet object:
    ResultSet rs = outputStream.getResultSet();
    

    This will create a ResultSet object that will be of type ResultSet.TYPE_FORWARD_ONLY. When the next row is requested from the ResultSet object, the data transport operations of the data service will be used to retrieve the data.

For an example, see OGSA-DAI/examples/src/uk/org/ogsadai/examples/clienttoolkit/ProcessingSQLQueryLargeResults.java

5. Pushing data from a client into a session stream

Using the inputStream activity it is possible to push data from a client into a session stream. The inputStream activity then reads the data and makes it available to a connected activity. When a request containing an outputStream activity is sent to a data service resource, a new session stream will be created. The data transport functionality can then be used to stream data from a client into the session stream.

  1. Start by setting up a DataService then sending a request to create a new session. This session will later contain the session stream.
    ActivityRequest request = new ActivityRequest();
    request.setSessionRequirements(new JoinNewSession());
    Response response = service.perform(request);
    Session session = response.getSession();
    
  2. Prepare a second request containing an DTInputStream connected to an SQLUpdate activity. Data that is transported into the session stream will then be piped into the SQL update. Set the session requirements for the request so that it will be joined to the session created in step 1.
    DTInputStream inputStream = new DTInputStream();
    SQLUpdate update = new SQLUpdate("insert into littleblackbook values (?,?,?,?)");
    update.setParameter(1, inputStream.getOutput());
    update.setParameter(2, inputStream.getOutput());
    update.setParameter(3, inputStream.getOutput());
    update.setParameter(4, inputStream.getOutput());
    
    ActivityRequest updateRequest = new ActivityRequest();
    updateRequest.addActivity(inputStream);
    updateRequest.addActivity(update);
    updateRequest.setSessionRequirements(new JoinExistingSession(session));
    
  3. Perform the request. The data service resource will wait for the client to push data into the session stream before initiating the actual request processing.
    service.perform(updateRequest);
    
  4. Use the getDataTransport() method on the DTInputStream to get a DataTransport object, which allows the client to push data into the session stream.
    DataTransport putter = inputStream.getDataTransport();
    
  5. Use the DataTransport object to push data into the session stream. The inputStream activity will then stream that data into the parameters of the SQL update:
    putter.putBlock(new Integer(10010));
    putter.putBlock("Albert Einstein");
    putter.putBlock("Patent Office, Bern");
    putter.putBlock("0123456789");
    
  6. Finally, use the putClosingBlock() method to tell the DTInputStream that all blocks have been sent:
    putter.putClosingBlock();
    

For an example, see OGSA-DAI/examples/src/uk/org/ogsadai/examples/clienttoolkit/ParameterisedUpdateViaDataTransport.java

Pulling data between two data service resources

Data can be pulled between two data service resources using the outputStream activity in conjunction with the deliverFromDT activity. In this example we refer to the data service resource that produces the data as the source and the data service resource that consumes the data as the sink.

Pulling data between two data service resources
Pulling data between two data service resources.

The figure above illustrates the delivery scenario. Three requests are involved:

  1. The first request is sent to the source data service resource. It contains a pipeline connecting an SQL query to a WebRowSet transformation and on to an output stream. The request is joined to a new session within which the output stream activity will create a session stream.
  2. The second request is sent to the sink data service resource. It contains a DeliverFromDT activity which will connect to the session stream at the source data service resource and stream the result data between the data service resources. This activity could then be connected to another activity which could consume this data, but in this example it is left unconnected as an end-point. Hence, the transported data would be returned to the client in the response to the second request.
  3. The third request is sent to the source data service resource. It simply terminates the session that was used to store the session stream.

To realise this scenario using the client toolkit, follow these steps:

  1. First create two client DataService objects for the source and sink data services.
    DataService sourceService =
        GenericServiceFetcher.getInstance().getDataService(
          sourceHandle, sourceResourceID);
    DataService sinkService =
        GenericServiceFetcher.getInstance().getDataService(
          sinkHandle, sinkResourceID);
    
  2. Next assemble the activity request for the source data service. Session requirements must be specified to ensure that a new session is created.
    SQLQuery sqlQuery =
        new SQLQuery("select * from littleblackbook where id<100");
    WebRowSet rowset = new WebRowSet(sqlQuery.getOutput());
    DTOutputStream outputStream = new DTOutputStream();
    outputStream.setInput(rowset.getOutput());
    ActivityRequest sourceRequest = new ActivityRequest();
    sourceRequest.add(sqlQuery);
    sourceRequest.add(rowset);
    sourceRequest.add(outputStream);
    sourceRequest.setSessionRequirements(new JoinNewSession());
    
  3. Now perform the request at the source data service resource and retrieve details of the session that the request has been joined to.
    Response response = sourceService.perform(sourceRequest);
    Session session = response.getSession();
    
  4. At this stage the session stream has been created at the source data service resource. Next a request must be assembled to instruct the sink data service resource to read the result data from the session stream.
    DeliverFromDT deliverFromDT = new DeliverFromDT();
    deliverFromDT.setDataTransportInput(outputStream.getDataTransport());
    deliverFromDT.setDataTransportMode(DataTransportMode.BLOCK);
    ActivityRequest sinkRequest = new ActivityRequest();
    sinkRequest.add(deliverFromDT);
    
  5. We can now send the request to the sink data service resource and display the response, which should contain the transported data.
    response = sinkService.perform(sinkRequest);
    System.out.println(response.getAsString());
    
  6. Finally, terminate the session that was created at the source data service resource.
    sourceRequest = new ActivityRequest();
    sourceRequest.setSessionRequirements(new TerminateSession(session));
    sourceService.perform(sourceRequest);
    

For a full example. see OGSA-DAI/examples/src/uk/org/ogsadai/examples/clienttoolkit/DataTransportExample.java

Pushing data between two data service resources

Data can also be pushed between two data service resources. To achieve this, the inputStream activity can be used in conjunction with the deliverToDT activity. In this example we refer to the data service resource that produces the data as the source and the data service resource that consumes the data as the sink. The figure below illustrates such a scenario.

Pushing data between two data service resources
Pushing data between two data service resources.

In this case, the first request must be sent to the sink data service resource since it instructs the creation of the session and session stream. The second request is then sent to the source data service resource containing a DeliverToDT activity which will push the result data to the session stream at the sink data service resource. This will cause the first request to start processing, reading the pushed data and writing it into a local file.

When working with data delivery between data service resources, care must be taken to consider the synchronous or asynchronous nature of requests. In the above example, the first request is an asynchronous request with no result data to deliver in the response. So when it is performed, the client code will not block until the request is finished. If this request was instead a synchronous request with result data to return, the perform invocation would block, so the client would need to use a different thread for sending the second request to the source data service resource.