Lab 7 Details for MPCS 51050

Each lab will consist of a small problem and details of  how to proceed. You need to submit labs to the TAs for grading--see submission instructions below.  Generally, unless otherwise specified, you will have one week to complete each assigned lab.

See the syllabus for information on grading.  Turning in lab assignments is required.  Submit your assignments to the subversion repository according to the directions on the syllabus page.

You must write these solutions in Java leveraging both Camel and ActiveMQ.

Lab 7   Due: 5:00 pm, Friday, May 29, 2015

Problem (Producing Messages to a Queue, Consuming Messages from a Queue and publishing to a Topic, and Subscribing to Topics):

In this lab, you will use Camel's DSL to create a Producer program that consumes data from files in an input directory (similarly to Lab 6) and for each file writes that data onto a Point-To-Point Message Queue.  You will also write a Consumer Program that reads those messages from that Point-To-Point Queue and publishes those messages out to a Pub/Sub Topic using a Content-Based Router.  Finally, you will create a Subscriber Program that will read the messages off the Topics and will forward the messages on to a combined output queue.

What you need to implement:

We are assuming that you have thoroughly read chapters 1-3 and 7-8 of Ibsen & Anstey, Camel in Action, prior to working on this lab.  You will also find Appendix A on the Simple expression language helpful for this lab as well.

Copy your previous lab 6 project named "MPCS-Lab6-Producer" in eclipse to a new project named "MPCS-Lab7-Producer" (as you did in Lab6).  You can do this by selecting your lab 6 project "chapter1-file-copy", pressing "Ctrl-C" (or right-clicking and choosing "Copy"), and then pasting a copy of that project by right clicking in the Package Explorer and choosing "Paste", and in the Copy Project dialog renaming the copy to "MPCS-Lab7-Producer" as you paste.

Now do the same thing again, this time copying your MPCS-Lab7-Producer project you just pasted and creating a new copy of that project and calling that "MPCS-Lab7-Consumer".  Now do the same thing again, this time copying your MPCS-Lab7-Producer project you just pasted and creating a new copy of that project and calling that "MPCS-Lab7-Subscriber".  Now you have three new projects in the Package Explorer:  your new MPCS-Lab7-Producer project, your new MPCS-Lab7-Consumer project, and your MPCS-Lab7-Subscriber project.  (Of course they are all three identical copies for the moment). 

The data files you will be working with are CSV files that each contain trade data that contains a Ticker, a Bid Price, a Bid Quantity, and an Ask Price and Ask Quantity.  Very simply, bid and ask in trading stocks are terms that refer to the bid and asking prices. The bid is the price offered by a buyer, while the ask price refers to the price a seller is interested in selling the stock for. The bid price and the ask price are never the same. The ask price is always a little higher than the bid price (which makes sense...if you have a concert ticket for which you paid $10.00, you'd love to sell it for $20.00 so you can make a profit).  Each line in the CSV files looks something like this:

MSFT,22.81,118,22.82,68

So in this example, the ticker would be "MSFT", the Bid Price would be 22.81, the Bid Quantity would be 118 (shares), the Ask Price would be 22.82, and the Ask Quantity would be 68 (shares).

Now, in a terminal, navigate to your your MPCS-Lab7-Producer directory under your eclipse workspace, and change to the data/inbox subdirectory.  Mine looks like this:

~/workspace/MPCS-Lab7-Producer/data/inbox

Remove any files that are there. 

You can download a tarball with data files of  tics for IBM, MSFT, and ORCL in CSV format here:  lab7.csv.files.tgz.  Untar these files in the data/inbox of your MPCS-Lab7-Producer project.

Once you have untarred the files, do an "ls" in your ~/workspace/MPCS-Lab7-Producer/data/inbox directory.  You should see 30 files that look something like this:

$ ls
17-05-14_14-24-08.csv 17-05-14_14-24-13.csv 17-05-14_14-24-26.csv 17-05-14_14-24-31.csv 17-05-14_14-24-43.csv 17-05-14_14-24-48.csv
17-05-14_14-24-09.csv 17-05-14_14-24-14.csv 17-05-14_14-24-27.csv 17-05-14_14-24-32.csv 17-05-14_14-24-44.csv 17-05-14_14-24-49.csv
17-05-14_14-24-10.csv 17-05-14_14-24-15.csv 17-05-14_14-24-28.csv 17-05-14_14-24-33.csv 17-05-14_14-24-45.csv 17-05-14_14-24-50.csv
17-05-14_14-24-11.csv 17-05-14_14-24-16.csv 17-05-14_14-24-29.csv 17-05-14_14-24-34.csv 17-05-14_14-24-46.csv 17-05-14_14-24-51.csv
17-05-14_14-24-12.csv 17-05-14_14-24-17.csv 17-05-14_14-24-30.csv 17-05-14_14-24-35.csv 17-05-14_14-24-47.csv 17-05-14_14-24-52.csv

Your exact filenames will be slightly different, but you should have 30 files there.  Now, cat out one of the files, and you will see something like this:

MSFT,22.81,118,22.82,68

Each of the 30 files will contain a single line of data similar to the above.  These 30 data files will constitute your Producer's "input".  Your Producer will roll through these 30 files, and load the content of each file onto an ActiveMQ message queue.

In your MPCS-Lab7-Producer, you are to establish a Camel Route that [References are to sections of Camel in Action in brackets]:

1.  Consumes the input directory "data/input" [CIA 2.3 & 7.1-7.2]
2.  Logs a string of this format: "RECEIVED: ${file:name}"
[CIA Appendix A]
3.  Unmarshals the data read [CIA 3.4]
4.  Runs the CSV translator on the data [CIA 3.4.2]
5.  Splits the body (so that the individual lines go on the queue as individual messages)
[CIA 3.4.2]
6.  Creates a new Processor() that prints out the header's CamelFileName along with the stock in the file in this format: 
"MESSAGE FROM FILE: 17-05-14_14-24-13.csv is heading to MPCS_51050_LAB7 Queue for Stock: IBM" during its processing [CIA 2.5.2].  Note that in your processor you may use the syntax (assume your new Processor is accepting a parameter of "Exchange e"):  "e.getIn().getBody(String.class).split("\t")" to break apart the actual content of the body of each file into a String[].  Then just pull out the stock name using standard Java code.
6.  Sends the output to the destination endpoint:   jms:queue:MPCS_51050_LAB7 [CIA 3.4.2]

Review especially the File Component sections on configuration options in CIA 7.2.1, especially understanding the "noop" option and the "fileName" option.  You  may find that setting "noop=true" will make your testing a bit easier as Camel will not remove the input files as it processes them (which relieves you from having to recreate them).

Once you have developed your code, test it out (this may require several iterations to figure it all out) and you should see that you have published 30 messages to your MPCS_5050_LAB7 queue.  Once you see that you are doing that, congratulate yourself, and take a break.  This exercise should be fairly easy as it's similar to the producer you wrote in Lab 6, with the exception of the new Processor you are inserting.  Here is what your Producer output should look like:

A message on the MPCS_51050_LAB7 queue should look something like this:


Next, you will begin work on your Consumer, in the project you created called MPCS-Lab7-Consumer.  Your Consumer will consume the messages off your MPCS_5050_LAB7 queue.  As it takes off each message, the message will be removed from the queue (automatically).  Your Consumer will then publish each message out to an appropriate destination Pub/Sub Topic.  Your route will be fairly simple.  In your Consumer route you will need to do the following [References to sections of Camel in Action in brackets]:

1.  Consume all 30 messages from the MPCS_51050_LAB7 queue [CIA 7.3]
2.  Log the string "RECEIVED:  jms queue: ${body} from file: ${header.CamelFileNameOnly}" [CIA Appendix A]
3.  Create a Content-Based Router to send MSFT messages an MSFT topic named "jms:topic:MPCS_51050_LAB7_TOPIC_MSFT", send ORCL messages to an ORACLE topic
named "jms:topic:MPCS_51050_LAB7_TOPIC_ORCL", and send IBM messages to an IBM topic named "jms:topic:MPCS_51050_LAB7_TOPIC_IBM". [CIA 2.5, especially 2.5.1]
    Note:  You can use the regular expression ".when(body().regex(".*MSFT.*"))" to pull just IBM messages in your CBR.
4.  For each stock (CBR), direct the output to a topic.  So MSFT messages would be routed to the "jms:topic:MPCS_51050_LAB7_TOPIC_MSFT" Topic, ORCL messages would be routed to the
"jms:topic:MPCS_51050_LAB7_TOPIC_ORCL" topic, etc.

Finally, you are to modify your MPCS-Lab7-Subscriber program so that it reads off the various Topics (you will use your Subscriber to actually "see" the messasges on the Topics).  We are going to assume that the Subscriber needs to transform each message into the following format for some (undetermined) downstream processing (those are "pipe" symbols separating the fields):

    Stock: MSFT|BidPrice: 22.81|BidQuantity: 118|AskPrice: 22.82|AskQuantity: 68

Your Subscriber program will define a route that will read from each of the topics your consumer created, and write modified results to a combined output queue called "jms:queue:MPCS_51050_LAB7_ALL", such that the Subscriber will create 3 routes (for each stock type) that will:

1.  Read from each of the Topic queues created by the Consumer, e.g., "
MPCS_51050_LAB7_TOPIC_MSFT",  "MPCS_51050_LAB7_TOPIC_ORCL", etc.
2.  Log activity (say for the IBM queue):  .log("SUBSCRIBER RECEIVED: jms IBM queue: ${body} from file: ${header.CamelFileNameOnly}")
3.  For each route that reads from a stock topic, write a new Processor that will get the body of the message into a String[] array from the exchange and split it by tabs, e.g.:
    String[] array = exchange.getIn().getBody(String.class).split("\t") [CIA 3.2.1]
4.  Pull out Strings for the stock (e.g., array[0].substring(1) will give you the String stockName (e.g., "MSFT"), etc.), the bidPrice (e.g., String bidPrice = array[1];), bidQuantity, askPrice, askQuantity
5.  Modify the exchange so that the new message contains data in the following output format:
        "Stock: MSFT|BidPrice: 22.81|BidQuantity: 30|AskPrice: 22.82|AskQuantity: 28"
    Note:  You can use the StringBuilder and the String.append() method to create the output body, as demonstrated in CIA Listing 3.1 to generate this format.
6.  You should replace the exchange received in your Processor with the modified StringBuilder output, using syntax as demonstrated in CIA Listing 3.1, e.g.,:
    exchange.getIn().setBody(myStringBuilder.toString());

When you are all done, your Queues should look like this (MPCS_51050_LAB7 all consumed and MPCS_51050_LAB7_ALL with 30 messages on it):



And a message in the MPCS_51050_LAB7_ALL queue would look like:


Notes and Hints:

VERY IMPORTANT:  When you start test running your Subscriber, you should start the Subscriber before running the Consumer.  So the order of execution of your projects should be:

1.  Producer
2.  Subscriber
3.  Consumer

If you don't start the Subscriber before the Consumer, your Consumer will blast through the messages befure your Subscriber gets a chance to suscribe to the topics.  Therefore, we suggest the following runtimes for the context
(after context.start();) in each project:

Producer:  Thread.sleep(2000);
Subscriber:  Thread.sleep(15000);
Consumer:  Thread.sleep(10000);

Submitting:
Submit your assignments to the subversion repository in the pre-existing folder named "labN" (where N is the homework number). Please include a README text file that contains any instructions for the TAs to assist with grading, and design notes are often the most useful thing you can provide. We do not usually need any info on how to compile your code unless your code layout is arcane.