App Connect

App Connect

Join this online user group to communicate across IBM product users and experts by sharing advice and best practices with peers and staying up to date regarding product enhancements.


#Applicationintegration
#App Connect
#AppConnect
 View Only

Uzip the multiple files and writing into destination directory using JCN and FileOutput node

  • 1.  Uzip the multiple files and writing into destination directory using JCN and FileOutput node

    Posted 3 days ago

    Hi SMEs,

    I have a requirement read the zipfile  from from source directory , unzip the files  using JCN and write into destination directory

    Flow design :

    FileInput  ---> JCN ---> FileOutput

    JCN :

    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.util.zip.ZipEntry;
    import java.util.zip.ZipInputStream;
     
    import com.ibm.broker.javacompute.MbJavaComputeNode;
    import com.ibm.broker.plugin.MbBLOB;
    import com.ibm.broker.plugin.MbElement;
    import com.ibm.broker.plugin.MbException;
    import com.ibm.broker.plugin.MbMessage;
    import com.ibm.broker.plugin.MbMessageAssembly;
    import com.ibm.broker.plugin.MbOutputTerminal;
    import com.ibm.broker.plugin.MbUserException;
     
    public class ZipCsvExtractorJCN_JavaCompute extends MbJavaComputeNode {
     
    public void evaluate(MbMessageAssembly inAssembly) throws MbException {
    MbMessage inMessage = inAssembly.getMessage();
     
     
    try {
    MbMessage outMessage = new MbMessage();
    copyMessageHeaders(inMessage, outMessage);
    // 1. Extract BLOB from input message
                MbElement blobElement = inMessage.getRootElement().getLastChild().getFirstElementByPath("BLOB");
                byte[] zipBytes = (byte[]) blobElement.getValue();
     
                // 2. Create ByteArrayInputStream and ZipInputStream
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(zipBytes);
                ZipInputStream zipInputStream = new ZipInputStream(byteArrayInputStream);
     
                ZipEntry entry;
                
                // 3. Loop through entries in the ZIP
                while ((entry = zipInputStream.getNextEntry()) != null) {
                    String entryName = entry.getName();
                    String[] fileNameSplit = null;
     
                    // 3.2. If the entry is a CSV file
                    if (entryName != null && entryName.toLowerCase().endsWith(".csv")) {
     
                        // 3.2.1. Create ByteArrayOutputStream and buffer
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                        byte[] buffer = new byte[4096];
                        int bytesRead;
     
                        // 3.2.2. Loop to read from ZipInputStream into buffer
                        while ((bytesRead = zipInputStream.read(buffer)) != -1) {
                            // 3.2.2.2. Write to ByteArrayOutputStream
                            byteArrayOutputStream.write(buffer, 0, bytesRead);
                        }
     
                        // 3.2.3. Get CSV content as byte array
                        byte[] csvBytes = byteArrayOutputStream.toByteArray();
     
                        // 3.2.4. Create OutputRoot for BLOB
                        
                        MbElement outRoot = outMessage.getRootElement();
     
                        // Create BLOB parser and set CSV content
                        MbElement blobRoot = outRoot.createElementAsLastChild(MbBLOB.PARSER_NAME);
                        blobRoot.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "BLOB", csvBytes);
     
                        
                        // 3.2.5. Propagate
                        MbMessageAssembly outAssembly = new MbMessageAssembly(inAssembly, outMessage);
                        MbMessage localEnv = outAssembly.getLocalEnvironment();
                        MbElement root = localEnv.getRootElement();
                        MbElement fileElem = root
                        .createElementAsLastChild(MbElement.TYPE_NAME, "Destination", null)
                        .createElementAsLastChild(MbElement.TYPE_NAME, "File", null);
                        fileNameSplit = entryName.split("/");
                        fileElem.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Name", fileNameSplit[1]);
                        fileElem.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Directory", "C:\\MY_FILES\\FileOutput");
                        fileElem.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Action", "create");
                        fileElem.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "CreateMissingDirectories", "true");
                        getOutputTerminal("out").propagate(outAssembly);
     
                        // Close output stream
                        byteArrayOutputStream.close();
                    }
     
                    // 3.3. Close current zip entry
                    zipInputStream.closeEntry();
                    Thread.sleep(300);
                }
     
                // 3.5. Close streams
                zipInputStream.close();
                byteArrayInputStream.close();
    }catch (IOException e) {
                throw new MbUserException(this, "evaluate()", "", "", "IOException occurred: " + e.getMessage(), null);
    }
    catch (MbException e) {
    // Re-throw to allow Broker handling of MbException
    throw e;
    } catch (RuntimeException e) {
    // Re-throw to allow Broker handling of RuntimeException
    throw e;
    } catch (Exception e) {
    // Consider replacing Exception with type(s) thrown by user code
    // Example handling ensures all exceptions are re-thrown to be handled in the flow
    throw new MbUserException(this, "evaluate()", "", "", e.toString(), null);
    }
    }
     
    public void copyMessageHeaders(MbMessage inMessage, MbMessage outMessage) throws MbException {
    MbElement outRoot = outMessage.getRootElement();
     
    // iterate though the headers starting with the first child of the root
    // element, stopping before the last child (body)
    MbElement header = inMessage.getRootElement().getFirstChild();
    while (header != null && header.getNextSibling() != null) {
    // copy the header and add it to the out message
    MbElement newHeader = outRoot.createElementAsLastChild(header.getParserClassName());
    newHeader.setName(header.getName());
    newHeader.copyElementTree(header);
    // move along to next header
    header = header.getNextSibling();
    }
    }
     
    /**
    * onPreSetupValidation() is called during the construction of the node
    * to allow the node configuration to be validated.  Updating the node
    * configuration or connecting to external resources should be avoided.
    *
    * @throws MbException
    */
    @Override
    public void onPreSetupValidation() throws MbException {
    }
     
    /**
    * onSetup() is called during the start of the message flow allowing
    * configuration to be read/cached, and endpoints to be registered.
    *
    * Calling getPolicy() within this method to retrieve a policy links this
    * node to the policy. If the policy is subsequently redeployed the message
    * flow will be torn down and reinitialized to it's state prior to the policy
    * redeploy.
    *
    * @throws MbException
    */
    @Override
    public void onSetup() throws MbException {
    }
     
    /**
    * onStart() is called as the message flow is started. The thread pool for
    * the message flow is running when this method is invoked.
    *
    * @throws MbException
    */
    @Override
    public void onStart() throws MbException {
    }
     
    /**
    * onStop() is called as the message flow is stopped. 
    *
    * The onStop method is called twice as a message flow is stopped. Initially
    * with a 'wait' value of false and subsequently with a 'wait' value of true.
    * Blocking operations should be avoided during the initial call. All thread
    * pools and external connections should be stopped by the completion of the
    * second call.
    *
    * @throws MbException
    */
    @Override
    public void onStop(boolean wait) throws MbException {
    }
     
    /**
    * onTearDown() is called to allow any cached data to be released and any
    * endpoints to be deregistered.
    *
    * @throws MbException
    */
    @Override
    public void onTearDown() throws MbException {
    }
     
    }

    I am able to read zip file  , unzipping with JCN . But while writing into destination directory its appending to existing file (creating single DEPT.csv and appending EMP.csv records also).

    I want to create new file in the destination directory same names which zip contains . 

    Please review and let me know where exactly went wrong in my code .

    Regards,

    Anil



    ------------------------------
    Mekala AnilPrasadReddy
    ------------------------------