Hi SMEs,
I have a requirement read the zipfile from from source directory , unzip the files using JCN and write into destination directory
Flow design :
FileInput ---> JCN ---> FileOutput
JCN :
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.MbBLOB;
import com.ibm.broker.plugin.MbElement;
import com.ibm.broker.plugin.MbException;
import com.ibm.broker.plugin.MbMessage;
import com.ibm.broker.plugin.MbMessageAssembly;
import com.ibm.broker.plugin.MbOutputTerminal;
import com.ibm.broker.plugin.MbUserException;
public class ZipCsvExtractorJCN_JavaCompute extends MbJavaComputeNode {
public void evaluate(MbMessageAssembly inAssembly) throws MbException {
MbMessage inMessage = inAssembly.getMessage();
try {
MbMessage outMessage = new MbMessage();
copyMessageHeaders(inMessage, outMessage);
// 1. Extract BLOB from input message
MbElement blobElement = inMessage.getRootElement().getLastChild().getFirstElementByPath("BLOB");
byte[] zipBytes = (byte[]) blobElement.getValue();
// 2. Create ByteArrayInputStream and ZipInputStream
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(zipBytes);
ZipInputStream zipInputStream = new ZipInputStream(byteArrayInputStream);
ZipEntry entry;
// 3. Loop through entries in the ZIP
while ((entry = zipInputStream.getNextEntry()) != null) {
String entryName = entry.getName();
String[] fileNameSplit = null;
// 3.2. If the entry is a CSV file
if (entryName != null && entryName.toLowerCase().endsWith(".csv")) {
// 3.2.1. Create ByteArrayOutputStream and buffer
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
byte[] buffer = new byte[4096];
int bytesRead;
// 3.2.2. Loop to read from ZipInputStream into buffer
while ((bytesRead = zipInputStream.read(buffer)) != -1) {
// 3.2.2.2. Write to ByteArrayOutputStream
byteArrayOutputStream.write(buffer, 0, bytesRead);
}
// 3.2.3. Get CSV content as byte array
byte[] csvBytes = byteArrayOutputStream.toByteArray();
// 3.2.4. Create OutputRoot for BLOB
MbElement outRoot = outMessage.getRootElement();
// Create BLOB parser and set CSV content
MbElement blobRoot = outRoot.createElementAsLastChild(MbBLOB.PARSER_NAME);
blobRoot.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "BLOB", csvBytes);
// 3.2.5. Propagate
MbMessageAssembly outAssembly = new MbMessageAssembly(inAssembly, outMessage);
MbMessage localEnv = outAssembly.getLocalEnvironment();
MbElement root = localEnv.getRootElement();
MbElement fileElem = root
.createElementAsLastChild(MbElement.TYPE_NAME, "Destination", null)
.createElementAsLastChild(MbElement.TYPE_NAME, "File", null);
fileNameSplit = entryName.split("/");
fileElem.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Name", fileNameSplit[1]);
fileElem.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Directory", "C:\\MY_FILES\\FileOutput");
fileElem.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Action", "create");
fileElem.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "CreateMissingDirectories", "true");
getOutputTerminal("out").propagate(outAssembly);
// Close output stream
byteArrayOutputStream.close();
}
// 3.3. Close current zip entry
zipInputStream.closeEntry();
Thread.sleep(300);
}
// 3.5. Close streams
zipInputStream.close();
byteArrayInputStream.close();
}catch (IOException e) {
throw new MbUserException(this, "evaluate()", "", "", "IOException occurred: " + e.getMessage(), null);
}
catch (MbException e) {
// Re-throw to allow Broker handling of MbException
throw e;
} catch (RuntimeException e) {
// Re-throw to allow Broker handling of RuntimeException
throw e;
} catch (Exception e) {
// Consider replacing Exception with type(s) thrown by user code
// Example handling ensures all exceptions are re-thrown to be handled in the flow
throw new MbUserException(this, "evaluate()", "", "", e.toString(), null);
}
}
public void copyMessageHeaders(MbMessage inMessage, MbMessage outMessage) throws MbException {
MbElement outRoot = outMessage.getRootElement();
// iterate though the headers starting with the first child of the root
// element, stopping before the last child (body)
MbElement header = inMessage.getRootElement().getFirstChild();
while (header != null && header.getNextSibling() != null) {
// copy the header and add it to the out message
MbElement newHeader = outRoot.createElementAsLastChild(header.getParserClassName());
newHeader.setName(header.getName());
newHeader.copyElementTree(header);
// move along to next header
header = header.getNextSibling();
}
}
/**
* onPreSetupValidation() is called during the construction of the node
* to allow the node configuration to be validated. Updating the node
* configuration or connecting to external resources should be avoided.
*
* @throws MbException
*/
@Override
public void onPreSetupValidation() throws MbException {
}
/**
* onSetup() is called during the start of the message flow allowing
* configuration to be read/cached, and endpoints to be registered.
*
* Calling getPolicy() within this method to retrieve a policy links this
* node to the policy. If the policy is subsequently redeployed the message
* flow will be torn down and reinitialized to it's state prior to the policy
* redeploy.
*
* @throws MbException
*/
@Override
public void onSetup() throws MbException {
}
/**
* onStart() is called as the message flow is started. The thread pool for
* the message flow is running when this method is invoked.
*
* @throws MbException
*/
@Override
public void onStart() throws MbException {
}
/**
* onStop() is called as the message flow is stopped.
*
* The onStop method is called twice as a message flow is stopped. Initially
* with a 'wait' value of false and subsequently with a 'wait' value of true.
* Blocking operations should be avoided during the initial call. All thread
* pools and external connections should be stopped by the completion of the
* second call.
*
* @throws MbException
*/
@Override
public void onStop(boolean wait) throws MbException {
}
/**
* onTearDown() is called to allow any cached data to be released and any
* endpoints to be deregistered.
*
* @throws MbException
*/
@Override
public void onTearDown() throws MbException {
}
}
I am able to read zip file , unzipping with JCN . But while writing into destination directory its appending to existing file (creating single DEPT.csv and appending EMP.csv records also).
I want to create new file in the destination directory same names which zip contains .
Please review and let me know where exactly went wrong in my code .
Regards,
Anil
------------------------------
Mekala AnilPrasadReddy
------------------------------