OPC Unified Architecture (OPC UA) is a cross-platform, open-source, IEC62541 standard for data exchange from sensors to cloud applications developed by the OPC Foundation. Distinguishing characteristics are:
We can use VMware Workstation to run the CS Simulator and test it. Refer to the CS Simulator Installation Guide for further details.
After installing VMware Workstation, we must install the OPC UA Server plugin in simulator to test the connection. Follow these steps to install the plugin:
Step1
Open the simulator for the model you wish to test. Once program loading is finished, click on the logo located in the upper right corner. Select "Settings" from the menu.
Step2
Click ELITECOs on the left, then click the + sign to add the plugin.
Step3
Double-click on the ELITECOs folder to enter. Then, double-click on the fieldbus folder to enter. From there, double-click on the OpcuaServer folder to select the EliOPCUAServer-1.2.6.0.elico file and open it.
Step4
Click the Inactivation tab, then select the restart button and confirm the activation of the plugin.
Step5
Open the simulator on your desktop, click on the "Config" button, select the OPC UA Server from the Plugin menu located at the bottom, and then click on the "Start" button to start the OPC UA Server.
Step6
Wait for the state to change to Running, indicating that the OPC UA Server has been launched successfully.
First, connect to the OPC UA Server using UaExpert to test the connection and view relevant node information. Detailed instructions for installation and usage can be found in the OPCUA Manual.
To obtain the IP address of the OPC UA Server, simply type "ifconfig" in the terminal.
After a successful connection, we can drag the node to the Data Access View for information viewing. The NodeId can be found in the Attributes window.
So how can we read node information and call methods in Python?
FreeOpcUa (official website: http://freeopcua.github.io/) provides some open source tools to help us make OPC UA connections via Python scripts. In the following we will talk about using asyncua (https://github.com/FreeOpcUa/opcua-asyncio):
Python environment setup steps will not be reiterated, assuming the system has Python 3.8 or higher installed (note that the script may encounter issues when run on Python 3.7 or lower). If the environment does not meet the requirements, we suggest downloading and installing the appropriate version of Python.
Step1
The opcua-asyncio package is available PyPi as asyncua package. To install the package execute:
pip install asyncua
Once the installation is completed, the package is ready to be used. To verify the installation, or the report the version if you create a bugreport, the following code can be run in a python interpreter:
import asyncua
print(asyncua.__version__)
If the version number is returned correctly, asyncua is successfully installed, and we can learn how to utilize it from the official documentation (https://opcua-asyncio.readthedocs.io/).
Step2
Now we can create scripts for communicating with OPC UA. Below is a basic template that we can use to retrieve information about nodes and execute method calls.
import asyncio
import logging
from asyncua import Client
logging.basicConfig(level=logging.ERROR)
_logger = logging.getLogger('asyncua')
async def main():
#create connection, please modify the ip of the url before connection
client = Client(url="opc.tcp://192.168.142.128:4840")
client.set_user('elite')
client.set_password('elibot')
await client.connect()
print("Client connected")
try:
#IsPowerON
is_power_on = client.get_node("ns=1;i=54275")
name = (await is_power_on.read_browse_name()).Name
value = (await is_power_on.read_value())
print(f"Before:{name} = {value}")
#PowerON
object_node = client.get_node("ns=1;i=54267")
method_node = client.get_node("ns=6;i=7001")
input_arguments = []
output_arguments = await object_node.call_method(method_node, *input_arguments)
#Wait 3 seconds
await asyncio.sleep(3)
#IsPowerON
is_power_on = client.get_node("ns=1;i=54275")
name = (await is_power_on.read_browse_name()).Name
value = (await is_power_on.read_value())
print(f"After:{name} = {value}")
except Exception:
_logger.exception('error')
finally:
await client.disconnect()
asyncio.run(main())
You only need to change the IP address of the url in client = Client(url="opc.tcp://192.168.142.128:4840") to the IP address of the virtual machine, and then run the script to execute the following commands to query the value of the IsPowerON node, call the PowerON method to turn on the power of the robot in the simulator, wait for 3 seconds, query the value of the IsPowerON node again, and observe the power status of the simulator in the virtual machine, and you will find that the original "Power off" has changed to "Idle". At the same time, the Python terminal will output the following content:
Client connected
Before:IsPowerOn = False
After:IsPowerOn = True
Below we will describe how to write node queries and method calls:
First, observe the following statements
#IsPowerON
is_power_on = client.get_node("ns=1;i=54275")
name = (await is_power_on.read_browse_name()).Name
value = (await is_power_on.read_value())
print(f"Before:{name} = {value}")
To find the desired node, we simply replace the NodeId in client.get_node() and retrieve its name and value. The NodeId can be searched for in the aforementioned UaExpert.
Exercise: Query the NodeId for IsProgramRunning and write a Python program to determine if the robot in the simulator is currently running a program.
We then observe the following statements
#PowerON
object_node = client.get_node("ns=1;i=54267")
method_node = client.get_node("ns=6;i=7001")
input_arguments = []
output_arguments = await object_node.call_method(method_node, *input_arguments)
First, we need to get the object by the NodeId of the object to which the method belongs, then get the method by the NodeId of the method, change the input parameters (or null if the method doesn't need them) and call the method.
Exercise: Query the NodeId of the PowerOFF method and write a Python program to turn off the power of the robot in the simulator.
Let's move on to this Python program.
import asyncio
import logging
from asyncua import Client
logging.basicConfig(level=logging.ERROR)
_logger = logging.getLogger('asyncua')
async def main():
#create connection, please modify the ip of the url before connection
client = Client(url="opc.tcp://192.168.142.128:4840")
client.set_user('elite')
client.set_password('elibot')
await client.connect()
print("Client connected")
try:
#get NodeId
nodeid = await client.nodes.root.get_child(["0:Objects", "1:EliteROBOTS","6:Controller","6:ControllerOperatingInfo","6:Status","6:IsPowerOn"])
print(f"IsPowerON:{nodeid}")
except Exception:
_logger.exception('error')
finally:
await client.disconnect()
asyncio.run(main())
Observe this part of the program.
#get NodeId
nodeid = await client.nodes.root.get_child(["0:Objects","1:EliteROBOTS","6:Controller","6:ControllerOperatingInfo","6:Status","6:IsPowerOn"])
print(f"IsPowerON:{nodeid}")
By running the program, this result can be obtained:
Client connected
IsPowerON:ns=1;i=54275
The obtained result indicates that the NodeId of a node can be acquired through client.nodes.root.get_child(), and the NodeId of other nodes can be obtained by modifying only the BrowsePath parameter. The BrowsePath can be viewed in UaExpert. From the Object layer, one needs to splice the numbers and names in the BrowseName of the node's attributes with ":" and list them in order. Through this method, the NodeId of the node can be queried.
Exercise: Write a Python program to retrieve the NodeId for IsProgramRunning via get_child().
Once we have become proficient in obtaining the NodeId through this method, we can proceed to retrieve the node tree.
import asyncio
import logging
from asyncua import Client, Node, ua
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')
async def browse_nodes(node: Node):
"""
Build a nested node tree dict by recursion (filtered by OPC UA objects and variables).
"""
node_class = await node.read_node_class()
children = []
for child in await node.get_children():
if await child.read_node_class() in [ua.NodeClass.Object, ua.NodeClass.Variable]:
children.append(
await browse_nodes(child)
)
if node_class != ua.NodeClass.Variable:
var_type = None
else:
try:
var_type = (await node.read_data_type_as_variant_type()).value
except ua.UaError:
_logger.warning('Node Variable Type could not be determined for %r', node)
var_type = None
return {
'id': node.nodeid.to_string(),
'name': (await node.read_display_name()).Text,
'cls': node_class.value,
'children': children,
'type': var_type,
}
async def task(loop):
url = "opc.tcp://192.168.142.128:4840"
# url = "opc.tcp://localhost:4840/freeopcua/server/"
try:
client = Client(url=url)
client.set_user('elite')
client.set_password('elibot')
# client.set_security_string()
await client.connect()
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
root = client.nodes.root
_logger.info("Objects node is: %r", root)
# Node objects have methods to read and write node attributes as well as browse or populate address space
_logger.info("Children of root are: %r", await root.get_children())
tree = await browse_nodes(client.nodes.objects)
_logger.info('Node tree: %r', tree)
except Exception:
_logger.exception('error')
finally:
await client.disconnect()
def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(True)
loop.run_until_complete(task(loop))
loop.close()
if __name__ == "__main__":
main()
Exercise: Modify and run the program, then view the results. Consider how the output is generated.