JXTA, P2P编程技术例程(2)

类别:Java 点击:0 评论:0 推荐:
 

创建与发布通告

就如我们早先说的那样,JXTA虚拟网络依靠JXTA ID 去鉴别网络资源。而这些资源的发现是通过通告。net.jxta.id包 包含了ID类, 同事包含了创建各种ID的工厂(factory)。

在JXTA中, net.jxta.document.Document是一个数据通用容器。一个在JXTA中的文档被MIME媒体类型的内容所定义。这样,文档就类似于HTTP流(stream。 JXTA并不尝试去解释文档的内容:这个内容是一个应用层协议的一部分)。 一个通告是由一个类似于XML结构化文档的可嵌套的多层元素组成的StructuredDocument,它可以使一个文档在它的数据没有物理表示(physical representation)的时候被操作。

就如和其他任何StructureDocument一样,一个通告可以被XML或者简单文本格式表所表示。一个通告包括它所要通告的资源的ID, 通告的类型, 以及过期的时间绝对值。 JXTA API提供一个方便的工厂, AdvertisementFactory ,来创建各种通告类型。 Listing 16.3 说明了一个ModuleClassAdvertisement通过这个工厂的创建。注意ModuleClassID被加入到通告中的方式)

Listing 16.3 Creating and Advertising a Module Class

private void doAdvertise() {

        ModuleClassAdvertisement classAd =

               (ModuleClassAdvertisement)AdvertisementFactory.newAdvertisement(

                       ModuleClassAdvertisement.getAdvertisementType());

        ModuleClassID classID = IDFactory.newModuleClassID();

        classAd.setModuleClassID(classID);

        classAd.setName(ServiceConstants.CLASS_NAME);

        classAd.setDescription("A prime number crunching service.");

        try {

               discoSvc.publish(classAd, DiscoveryService.ADV);

               discoSvc.remotePublish(classAd, DiscoveryService.ADV);

               System.out.println("Published module class adv.");

        } catch (IOException e) {

               System.out.println("Trouble publishing module class adv: " +

                      e.getMessage());

        }

JXTA net.jxta.discovery.DiscoveryService是一个Net Peer Group 提供的组服务(group service)。 它提供发布发现本科与远程2种模式。本地模式在Peer本地的缓冲中去发现通告, 本地发布就是让通告放入本地的缓冲中。远程就是在整个peer组中去发现与发布。因此,请求消息通过JXTA虚拟网络在我们早先描述的协议的情况下进行传播,在它们到底的时候就对这些请求的进行回应。 因此,远程发现是一个异步的过程,找到在网络上所要的通告类型也需要一定时间。Listing16.3说明ModuleClassAdvertisement的远程与本地2种发布方式。

类型前面的过程,我们通过IDFactory类创建一个ModuleSpec ID, 并从AdvertisementFactory获得与它相应的通告。(见listing 16.4)

Listing 16.4 Creating a New ModuleSpecAdvertisement

        ModuleSpecAdvertisement specAd =

               (ModuleSpecAdvertisement)AdvertisementFactory.newAdvertisement(

                       ModuleSpecAdvertisement.getAdvertisementType());

        ModuleSpecID specID = IDFactory.newModuleSpecID(classID);

        specAd.setModuleSpecID(specID);

        specAd.setName(ServiceConstants.SPEC_NAME);

        specAd.setDescription("Specification for a prime number crunching service");

        specAd.setCreator("Sams Publishing");

        specAd.setSpecURI("http://www.samspulishing.com/p2p/primecruncher");

specAd.setVersion("Version 1.0");

我们应该记得ModuleSpecAdvertisement定义了一个电报协议,或者说是一个网络行为,来访问一个服务,因此我们需要提供一个PipeAdvertisement 作为一个到ModuleSpecAdvertisement的参数。 因为module的通告将被放在网络上peer的缓冲之中, 那么我们必须确认每一个ModuleSpecAdvertisement 在同一个PIPE中。 这样,我们必须将PIPE的通告到永久存储器并且在创建新管道的时候一直从这个存储器中读取数据。(如果这个通告没有被存储到磁盘, 那么重新创建一个新的。)

Listing 16.5 Creating a Pipe Advertisement

        PipeAdvertisement pipeAd = null;

        try {

               FileInputStream is = new FileInputStream(PIPE_ADV_FILE);

               pipeAd = (PipeAdvertisement)AdvertisementFactory. newAdvertisement(

                    new MimeMediaType("text/xml"), is);

               is.close();

        } catch (IOException e) {

               pipeAd = (PipeAdvertisement)AdvertisementFactory. newAdvertisement(

                    PipeAdvertisement.getAdvertisementType());

               PipeID pid = IDFactory.newPipeID(group.getPeerGroupID());

               pipeAd.setPipeID(pid);

               //save pipeAd in file

               Document pipeAdDoc = pipeAd.getDocument(new MimeMediaType ("text/xml"));

               try {

                     FileOutputStream os = new FileOutputStream(PIPE_ADV_FILE);

                     pipeAdDoc.sendToStream(os);

                     os.flush();

                     os.close();

                     System.out.println("Wrote pipe advertisement to disk.");

               } catch (IOException ex) {

                     System.out.println("Can't save pipe advertisement to file " +

                            PIPE_ADV_FILE);

                     System.exit(-1);

               }

         }

下面的代码段在磁盘上保存一个管道广告为XML格式, 例如,运行这个代码得到后面的XML文档。

<?xml version="1.0"?>

<!DOCTYPE jxta:PipeAdvertisement>

<jxta:PipeAdvertisement xmlns:jxta="http://jxta.org">

        <Id>

urn:jxta:uuid-59616261646162614E5047205032503382CCB236202640F5A242ACE15A8F9D7C04

        </Id>

        <Type>

              JxtaUnicast

        </Type>

</jxta:PipeAdvertisement>

随后我们将PipeAdvertisement 作为一个参数传到ModuleSpecAdvertisement,如Listing 16.6

Listing 16.6 Adding the PipeAdvertisement as a Parameter to the ModuleSpecAdvertisement

specAd.setPipeAdvertisement(pipeAdv);

这时,我们已经准备好将ModuleSpecAdvertisement发布到本地和远程了。

Listing 16.7 Local and Remote Publishing of a ModuleSpecAdvertisement

        try {

               discoSvc.publish(specAd, DiscoveryService.ADV);

               discoSvc.remotePublish(specAd, DiscoveryService.ADV);

               System.out.println("Published module spec adv");

        } catch (IOException e) {

               System.out.println("Trouble publishing module spec adv: " +

                       e.getMessage());

}

(在listing 16.8中,我们最终在这个管道通告上建立了一个InputPipe。

Listing 16.8 InputPipe Creation from a PipeAdvertisement

        //create an input pipe based on the advertisement

        try {

               inputPipe = pipeSvc.createInputPipe(pipeAd);

               System.out.println("Created input pipe");

        } catch (IOException e) {

               System.out.println("Can't create input pipe. " + e.getMessage());

        }

}

这些是发布一个新的JXTA服务的需要的所有步骤。 我们知道一个module的类通告说明了module在peer组中的功能:它是一个非常抽象的概念,有点类似于JAVA的定义API的接口,但是不提供实现。 一个module的说明通告在另一方面又阐明了一个电报协议来访问一个服务。在这种情况下,这个电报协议包括了一个能让其他peer能给他发送消息的InputPipe, 正是这些消息包含了需要的2个边界数值)

Processing Messages from an InputPipe(从InputPipe里处理消息)

下一步是实现让质数查询peer处理接受到的消息, 我们将操作进来的消息,算出需要的质数序列,并且送出响应。

Listing 16.9 Processing Messages on an InputPipe

private void startService() {

        while (true) {

               Message msg = null;

               try {

                       msg = inputPipe.waitForMessage();

               } catch (InterruptedException ex) {

                       inputPipe.close();

                       return;

               }

               String highInt = msg.getString(ServiceConstants.HIGH_INT);

               String lowInt = msg.getString(ServiceConstants.LOW_INT);

               if (highInt != null || lowInt != null) {

                      processInput(highInt, lowInt);

               }

        }

}

就如以前说的,net.jxta.endpoint.Message对象被EndpointService送到2个peer之间)  (一个消息包括了一套MessageElements, 说明了一个目的地使它的路径通过JXTA网络更加方便。一个消息元素可以是任何字节数组, 消息也包括以字符串方式提取元素的能力。 当一个新的消息元素被指明的时候, 它可以与一个MIME类型相关联,就如一个作为元素值的字符串。在这个方法的实现中,我们参考ServiceConstants.HIGH_INT 和 ServiceConstants.LOW_INT的键值提取消息元素, 如果这2个元素都是有效的字符串,我们将他们传入一个私有方法中:processInput()

processInput()对执行这个算法起作用, 产生一个包括所有质数的列表(在LOW_INT 和HIGH_INT 之间), 为了节约空间,我们不会将这个部分的代码写在这里。

本文地址:http://com.8s8s.com/it/it16680.htm