IBM Security QRadar SOAR

 View Only

Create a generic enrichement playbook, and apply it on TOR enrichment integration

By BENOIT ROSTAGNI posted 27 days ago

  

The purpose of this blog is to show how, from any integration that does artifact enrichment from the QRadar SOAR App Exchange, we can  create a framework that will publish the right information, on the eye of the Analyst.

Enrichment can be visible at multiple places:

  • Artifact Properties
  • Artifact Kaban Hit Card
  • Artifact Tag
  • Automated Task with Detail in the note
  • Risk Calculation

How to do that from and out of the box integration?

  • Create a Enrichment: TOR Look-up Playbook
    • Set Activation type Manual
    • Set condition to
      • Artifact Type | has one of | "IP Address"
        AND
      • artifact.description | does not contain | TOR
    • Add TOR function
      • Set inputs to: inputs.tor_search_data = artifact.value
      • Set output to: tor


  • create a global script

    • name: Enrichment: TOR Look-up Post Process

    • description: Prepare the Values for Post Process Enrichment

    • object type: artifact

    • Language: python3

  • with the content:
  1. A simple, direct, short enrichment in ASCII text, mouse-over style
  2. A complete, long, HTML enrichment with all the details needed by analysts
    And add:
  3. TAG for Artifact if you need to tag it
  4. Keyword for the artifact description to identify your short enrichment
  5. Content for Kaban Hits Card: 1-4 lines with Name, Type, Value information
    Imagine what will be the Analyst Decision if he looks into the integration results, and build:
  6. Risk Score of this integration
  7. Risk Weight of this integration
  8. Update the Global Risk Score / Risk Weight Values at each enrichment
  9. Add the global result in a re-usable properties stream.
import json 

# Re-use previous post process by feeding "results" with the "playbook.function.results.xxxxxxx"
results = playbook.functions.results.tor

# sample positive results in replacement for output tests
# results = {'status': 'success', 'value': True, 'data': '{"version":"8.0",\n"build_revision":"5693a6d",\n"relays_published":"2022-03-09 08:00:00",\n"relays":[\n{"or_addresses":["93.95.225.149:443"],"exit_addresses":["93.95.225.149"]}\n],\n"bridges_published":"2022-03-09 08:01:28",\n"bridges":[\n]}\n'}

# Get TOR exit node IP to test
# https://udger.com/resources/ip-list/tor_exit_node

# Validation of results integration / debug control:
# incident.addNote(str(results))

########################### Rich text and Low Text ########################### 
#enrichment complete long text in HTML for an easy reading by analyst in HTML Note:
rich_text = "<h4><u><b>TOR Network Search on {}</b></u></h4> <br><b>Search Status</b>&emsp&emsp&emsp&emsp&ensp: {}<br><b>Search boolean value</b> &ensp: {}<br><b>Search Data</b>&emsp&emsp&emsp&emsp&emsp&ensp: {}<br>".format(artifact.value,results['status'],results['value'],results['data'])
#enrichment abstract short text in ASCI for an easy mouseover type reading by analyst:
low_text = "TOR Network Search Status: {}".format(results['status'])

########################### Artifact Tag ########################### 
tag = []
if results['status'] == 'success':
  tag = [u'TOR']
  
########################### Artifact Description Keyword ########################### 
keyword = u'TOR'

########################### Kaban Hits Card ########################### 
# Create the Kaban Card Hits information - could be 1 to 5 sections or more !
# guide1 https://community.ibm.com/community/user/security/blogs/sam-wang/2021/12/06/decorate-artifacts-using-soar-functions-in-v43
# guide2 https://github.com/ibmresilient/resilient-scripts/blob/master/python3/artifact/add-artifact-hit.py
if results['status'] == 'success':
  publish_hit = 'Yes'
  publish_name = 'Tor Network Status'
  data=json.loads(results['data']) #data was a JSON String in the results, transforming it to JSON
  relay=data['relays'][0]
  exit_addresses = relay['exit_addresses'][0]
  hit =[
            {
                "name": "Status",
                "type": "string",
                "value": results['status']
            },
            {
                "name": "Replay Publish",
                "type": "string",
                "value": data['relays_published']
            },
            {
                "name": "Relays Exit Addresse",
                "type": "string",
                "value": exit_addresses
            }
    ]
else:
  publish_hit = False
  publish_name = ''
  hit =[]

########################### Calculating Risk Score ########################### 
risk_score = 0  # High Risk has high number 0-5, risk of the finding
risk_weight = 0 # Hight Weight multiply the risk value 0-5, risk of the integration
if results['status'] == 'success':
  risk_score = 5
  risk_weight = 1
  
########################### Publication ########################### 
# Select where you want to publish the enrichment
default_publication = { "artifact_description": "Yes", "artifact_hit": "Yes", "artifact_tag": "Yes", "task_note": "Yes", "note": "No", "risk_score": "Yes" }

########################### Playbook Properties ########################### 
# Create a playbook properties to transmit those information to be published in a publishing Script
# those properties could be used like playbook.properties.enrichment.rich_text to acces the rich text value
# Note, I had to dump the original json to allow transmission because we had conflict with quotes & dbl-quotes in this json, may need to json.loads if re-using it 
playbook.addProperty("enrichment", {"publication": default_publication, "rich_text": str(rich_text), "low_text": str(low_text), "task_name": "TOR Network Search Status on {}".format(artifact.value),'tag': tag, "keyword": keyword, "publish_hit": publish_hit, "publish_name": publish_name, "hit": hit, "risk_score": risk_score, "risk_weight": risk_weight, "results": json.dumps(results)})


  • Create a new global script that will publish the result of the post previous process. This script will be re-usable on ALL Enrichment
    • Name: Artifact Enrichment Global publication
    • Description: using playbook.addProperty("enrichement", {"publication": default_publication, "rich_text": str(rich_text), "low_text": str(low_text), "task_name": "TOR Network Search Status on {}".format(artifact.value),'tag': tag, "keyword": keyword, "publish_hit": publish_hit, "publish_name": publish_name, "hit": hit, "risk_score": risk_score, "risk_weight": risk_weight, "results": json.dumps(results)})
    • Object Type: Artifact
    • Language: Python 3
  • With the content:
# Function to edit and replace a string limited keyword with values in the artifact description 
def artifact_description(keyword, text):
  key_start = u"[{}: ".format(keyword)
  key_end = u" :{}]\n".format(keyword)
  desc = u""
  d = artifact.description
  log.info(d)
  if d is not None:
    original = d.content
    if key_start not in original:
      original += u"{}{}".format(key_start,key_end)
  else:
    original = u"{}{}".format(key_start,key_end)
  desc += key_start
  desc += text
  desc += key_end
  start_cut = original.find(key_start)
  end_cut = original.find(key_end) + len(key_end)
  artifact.description = original.replace(original[start_cut:end_cut], desc)
  return

# Function to publish a Kaban Hits Card
def artifact_hit(publish_hit, publish_name, hit):
  if publish_hit == 'Yes':
    log.info("************* original *********")
    log.info(hit)
    log.info(type(hit))
    log.info("************* in function *********")
    log.info(type(list(hit)))
    artifact.addHit(publish_name, list(hit))
  return

log.info("################## Starting Artifact Enrichment Global publication ##################")
# sample playbook properties
# enrichment = {'publication': {'artifact_description': 'Yes', 'artifact_hit': 'Yes', 'artifact_tag': 'Yes', 'task_note': 'Yes', 'note': 'No', 'risk_score': 'Yes'}, 'rich_text': 'TOR Network Search on 91.90.123.20 \n Search Status : success \n Search boolean value : True \n Search Data :   {"version":"8.0",\n"build_revision":"5693a6d",\n"relays_published":"2022-03-09 13:00:00",\n"relays":[\n{"or_addresses":["91.90.123.20:32356"],"exit_addresses":["91.90.123.20"]}\n],\n"bridges_published":"2022-03-09 13:31:28",\n"bridges":[\n]}\n',  'low_text': 'TOR Network Search Status: success', 'task_name': 'TOR Network Search Status on 91.90.123.20', 'tag': ['TOR'], 'keyword': 'TOR', 'publish_hit': 'Yes', 'publish_name': 'Tor Network Status', 'hit': [{'name': 'Status', 'type': 'string', 'value': 'success'}, {'name': 'Replay Publish', 'type': 'string', 'value': '2022-03-09 13:00:00'}, {'name': 'Relays', 'type': 'string', 'value': "[{'or_addresses': ['91.90.123.20:32356'], 'exit_addresses': ['91.90.123.20']}]"}], 'risk_score': 5, 'risk_weight': 1, 'results': '{"status": "success", "value": true, "data": "{\\"version\\":\\"8.0\\",\\n\\"build_revision\\":\\"5693a6d\\",\\n\\"relays_published\\":\\"2022-03-09 13:00:00\\",\\n\\"relays\\":[\\n{\\"or_addresses\\":[\\"91.90.123.20:32356\\"],\\"exit_addresses\\":[\\"91.90.123.20\\"]}\\n],\\n\\"bridges_published\\":\\"2022-03-09 13:31:28\\",\\n\\"bridges\\":[\\n]}\\n"}'}
enrichment = playbook.properties.enrichment

# write artifact description
if (enrichment["publication"]["artifact_description"]) == "Yes":
  log.info(" ============= artifact_description ============= ")
  log.info(enrichment["keyword"])
  artifact_description(enrichment["keyword"], enrichment["low_text"])
  
# write artifact hit
if (enrichment["publication"]["artifact_hit"]) == "Yes":
  log.info(" ============= artifact_hit ============= ")
  log.info(enrichment["publish_hit"])
  log.info(enrichment["publish_name"])
  log.info(enrichment["hit"])
  artifact_hit(enrichment["publish_hit"], enrichment["publish_name"], enrichment["hit"])
  
# write artifact tag
if (enrichment["publication"]["artifact_tag"]) == "Yes":
  log.info(" ============= TAG ============ ")
  log.info(enrichment["publication"]["artifact_tag"])
  log.info(enrichment["tag"])
  if enrichment["tag"] is not []:
    artifact.addTags(enrichment["tag"])
    a = "List all tags: " + str(artifact.getAllTags())
    log.info(a)
    
# write incident note
if (enrichment["publication"]["note"]) == "Yes":
  log.info(" ============= note ============ ")
  log.info(enrichment["rich_text"])
  incident.addNote(helper.createRichText(enrichment["rich_text"]))
  
# Risk Calculation
if (enrichment["publication"]["risk_score"]) == "Yes":
  log.info(" ============= risk_score ============= ")
  log.info(enrichment["risk_score"])
  log.info(enrichment["risk_weight"])
  if incident.properties.risk_score_sum == None:
    incident.properties.risk_score_sum = 0
  if incident.properties.risk_weight_sum == None:
    incident.properties.risk_weight_sum = 0
  if (enrichment["risk_score"] is not None) and (enrichment["risk_weight"] is not None):
    incident.properties.risk_score_sum = incident.properties.risk_score_sum + enrichment["risk_score"] * enrichment["risk_weight"]
    incident.properties.risk_weight_sum = incident.properties.risk_weight_sum + enrichment["risk_weight"]
    if incident.properties.risk_weight_sum > 0:
      log.info(incident.properties.risk_score_sum)
      log.info(incident.properties.risk_weight_sum)
      incident.properties.risk_score = round((incident.properties.risk_score_sum / incident.properties.risk_weight_sum) * 10)
      log.info((incident.properties.risk_score / 10))
    artifact_description("RISK", "Score: {} Weight {}".format(enrichment["risk_score"],enrichment["risk_weight"]))

# write Task Note: this is done later using task util functions 



Almost all is done, to create specific tasks for each artifact analyzed by an enrichment method, we are going to use the Task Helper Functions integration.


  • Create a condition point:
    • Name: publication.task_note
    • Condition Settings: First true condition
    • Conditions 1:
      • Name: playbook.properties.enrichment.publication.task_note = 'Yes'
      • Condition type: Script Builder
      • Condition:
log.info("*************** task note  *************")
if playbook.properties.enrichment.publication.task_note == 'Yes':
  result = True
else:
  result = False
    • Condition 2:
      • Else (by default) to an End Point
    • Create all conditions on the condition point, and assign them when creating the link to the target object.

  • Add the Create Custom Task function:

    • with an input script in python 2 below, and a Function output: create_custom_task
# inputs.incident_id
# inputs.task_name (optional)
# inputs.task_utils_payload (optional)

# Values to change on this case; normally all is set in post process of the function enrichment
log.info("*************** we are in preprocess of task creation *************")
log.info(playbook.properties.enrichment.publication.task_note)
#log.info(playbook.properties.enrichment)
#log.info(playbook.properties.enrichment.task_name)

task_phase = "Automation"
task_custom_name = playbook.properties.enrichment.task_name
task_instructions = "Results: {}".format(playbook.properties.enrichment.low_text)

log.info("*************** task_phase ***************")
log.info(task_phase)
log.info("*************** task_custom_name")
log.info(task_custom_name)
log.info("*************** task_instructions")
log.info(task_instructions)

# No Change to be made below

#######################################
### Define pre-processing functions ###
#######################################
payload = {
"required": False,
"instr_text": task_instructions,
"phase_id": task_phase
}

log.info("*************** payload")
log.info(payload)

def dict_to_json_str(d):
  """Function that converts a dictionary into a JSON stringself.
     Supports basestring, bool and int.
     If the value is None, it sets it to False"""

  json_str = '"{ {0} }"'
  json_entry = '"{0}":{1}'
  json_entry_str = '"{0}":"{1}"'
  entries = [] 
  
  for entry in d:
    key = entry
    value = d[entry]
    
      
    if value is None:
      value = False
      
    
    if isinstance(value, basestring):
      entries.append(json_entry_str.format(key, value))
    
    elif isinstance(value, bool):
      value = 'true' if value == True else 'false'
      entries.append(json_entry.format(key, value))
    
    else:
      entries.append(json_entry.format(key, value))
  
  return '{' + ','.join(entries) + '}'

# prepare a JSON payload using above code; 
inputs.task_utils_payload = dict_to_json_str(payload)

# Take the incident id from this incident
inputs.incident_id = incident.id

# If you specified a value in the Activity Field then use it for task_name
inputs.task_name = task_custom_name
    • Add the Add Note function:

    • with an input script in python 3 below, and a Function output: add_task_note
inputs.incident_id = incident.id 
inputs.task_utils_note_type = "html"
inputs.task_utils_note_body = playbook.properties.enrichment.rich_text
inputs.task_name = playbook.properties.enrichment.task_name

    • Add the Close Task function:
    • with an input script in python 3 below, and a Function output: close_task
inputs.task_name = playbook.properties.enrichment.task_name
inputs.incident_id = incident.id


Full Playbook:


This playbook can be downloaded here

Tips

Create local script to verify the good work when you are creating
Use log.info in those scripts
Use incident.addNote also to trace full json
To see the log, on SOAR server, run sudo tail -f /var/log/resilient-scripting/resilient-scripting.log


Tips (until v45)

Create a fake output on a decision point to embed elements that exist only in scripts and allow a full package export with all dependencies. (not needed after v45)


Download

Download those playbooks and presentation at
https://ibm.box.com/v/SOARfromWFtoPLAYBOOKS
https://ibm.biz/vMSUPlaybook

0 comments
15 views

Permalink